marchwarden/researchers/web/trace.py
Jeff Smith b510902af3 Mirror trace steps to operational logger
The trace JSONL captures every step of a research call (search,
fetch, iteration boundaries, synthesis), but the structured
operational log only fired at research_started / research_completed,
giving administrators no real-time visibility into agent progress.

Have TraceLogger.log_step also emit a structlog event using the
same action name, fields, and step counter. trace_id and researcher
are already bound in contextvars by WebResearcher.research, so
every line carries them automatically — no plumbing needed.

Volume control: a curated set of milestone actions logs at INFO
(start, iteration_start, synthesis_start/complete/error, budget_-
exhausted, complete). Chatty per-tool actions (web_search,
fetch_url and their *_complete pairs) log at DEBUG. Default
MARCHWARDEN_LOG_LEVEL=INFO shows ~9 lines per call;
MARCHWARDEN_LOG_LEVEL=DEBUG shows everything.

This keeps dev stderr readable while making full step visibility
one env var away — and OpenSearch can ingest at DEBUG always.

Verified end-to-end: Utah peak query at INFO produces 9 milestone
log lines, at DEBUG produces 13.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 16:22:13 -06:00

134 lines
4 KiB
Python

"""Trace logger for the web researcher.
Produces JSONL audit logs keyed by trace_id. Each research() call
gets its own trace file at {trace_dir}/{trace_id}.jsonl.
Every line is a self-contained JSON object representing one step
in the research loop.
"""
import json
import os
import time
import uuid
from pathlib import Path
from typing import Any, Optional
from obs import get_logger
# Actions that get promoted to INFO in the operational log. Everything
# else logs at DEBUG so the default INFO level shows ~6-8 milestones per
# research call instead of 20+ chatty per-step events. Set
# MARCHWARDEN_LOG_LEVEL=DEBUG to see all steps.
_INFO_ACTIONS = frozenset(
{
"start",
"iteration_start",
"synthesis_start",
"synthesis_complete",
"synthesis_error",
"synthesis_build_error",
"budget_exhausted",
"complete",
}
)
_log = get_logger("marchwarden.researcher.trace")
class TraceLogger:
"""Logs research steps to a JSONL file.
Usage:
logger = TraceLogger()
logger.log_step("search", query="Utah crops", decision="relevant query")
logger.log_step("fetch_url", url="https://...", content_hash="sha256:abc...")
logger.close()
"""
def __init__(
self,
trace_id: Optional[str] = None,
trace_dir: Optional[str] = None,
):
"""Initialize a trace logger.
Args:
trace_id: UUID for this trace. Generated if not provided.
trace_dir: Directory for trace files. Defaults to ~/.marchwarden/traces/
"""
self.trace_id = trace_id or str(uuid.uuid4())
self.trace_dir = Path(
trace_dir or os.path.expanduser("~/.marchwarden/traces")
)
self.trace_dir.mkdir(parents=True, exist_ok=True)
self.file_path = self.trace_dir / f"{self.trace_id}.jsonl"
self._step_counter = 0
self._file = None
@property
def _writer(self):
"""Lazy file handle — opens on first write."""
if self._file is None:
self._file = open(self.file_path, "a", encoding="utf-8")
return self._file
def log_step(self, action: str, decision: str = "", **kwargs: Any) -> dict:
"""Log one step in the research loop.
Args:
action: What happened (e.g. "search", "fetch_url", "synthesize").
decision: Why this action was taken or what was concluded.
**kwargs: Additional fields (url, query, content_hash, etc.)
Returns:
The logged entry dict.
"""
self._step_counter += 1
entry = {
"step": self._step_counter,
"action": action,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"decision": decision,
}
entry.update(kwargs)
self._writer.write(json.dumps(entry, default=str) + "\n")
self._writer.flush()
# Mirror the trace step into the operational logger so admins
# can watch progress in real time. trace_id and researcher are
# already bound in contextvars by WebResearcher.research, so
# they automatically appear on every line.
log_method = _log.info if action in _INFO_ACTIONS else _log.debug
log_method(action, step=self._step_counter, decision=decision, **kwargs)
return entry
def read_entries(self) -> list[dict]:
"""Read all entries from the trace file.
Useful for replay and testing.
"""
if not self.file_path.exists():
return []
entries = []
with open(self.file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
entries.append(json.loads(line))
return entries
def close(self):
"""Close the trace file handle."""
if self._file is not None:
self._file.close()
self._file = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False