marchwarden/researchers/web/trace.py

198 lines
6.8 KiB
Python
Raw Permalink Normal View History

"""Trace logger for the web researcher.
Produces JSONL audit logs keyed by trace_id. Each research() call
gets its own trace file at {trace_dir}/{trace_id}.jsonl.
Every line is a self-contained JSON object representing one step
in the research loop.
"""
import json
import os
import time
import uuid
from pathlib import Path
from typing import Any, Optional
from obs import get_logger
# Actions that get promoted to INFO in the operational log. Everything
# else logs at DEBUG so the default INFO level shows ~6-8 milestones per
# research call instead of 20+ chatty per-step events. Set
# MARCHWARDEN_LOG_LEVEL=DEBUG to see all steps.
_INFO_ACTIONS = frozenset(
{
"start",
"iteration_start",
"synthesis_start",
"synthesis_complete",
"synthesis_error",
"synthesis_build_error",
"budget_exhausted",
"complete",
}
)
_log = get_logger("marchwarden.researcher.trace")
# Action pairings for duration tracking. When a starter action fires
# we record a monotonic start time keyed by the starter name. When the
# matching completer fires we compute the elapsed duration and attach
# it as a field on the completer's entry, then clear the start.
#
# Synthesis has two possible completers (success or error), both
# pointing back to synthesis_start.
_DURATION_PAIRS: dict[str, str] = {
"web_search_complete": "web_search",
"fetch_url_complete": "fetch_url",
"synthesis_complete": "synthesis_start",
"synthesis_error": "synthesis_start",
"complete": "start",
}
_STARTER_ACTIONS = frozenset(_DURATION_PAIRS.values())
class TraceLogger:
"""Logs research steps to a JSONL file.
Usage:
logger = TraceLogger()
logger.log_step("search", query="Utah crops", decision="relevant query")
logger.log_step("fetch_url", url="https://...", content_hash="sha256:abc...")
logger.close()
"""
def __init__(
self,
trace_id: Optional[str] = None,
trace_dir: Optional[str] = None,
):
"""Initialize a trace logger.
Args:
trace_id: UUID for this trace. Generated if not provided.
trace_dir: Directory for trace files. Defaults to ~/.marchwarden/traces/
"""
self.trace_id = trace_id or str(uuid.uuid4())
self.trace_dir = Path(
trace_dir or os.path.expanduser("~/.marchwarden/traces")
)
self.trace_dir.mkdir(parents=True, exist_ok=True)
self.file_path = self.trace_dir / f"{self.trace_id}.jsonl"
self.result_path = self.trace_dir / f"{self.trace_id}.result.json"
self._step_counter = 0
self._file = None
# action_name -> monotonic start time, populated by starter
# actions and consumed by their matching completer (Issue #35).
self._pending_starts: dict[str, float] = {}
@property
def _writer(self):
"""Lazy file handle — opens on first write."""
if self._file is None:
self._file = open(self.file_path, "a", encoding="utf-8")
return self._file
def log_step(self, action: str, decision: str = "", **kwargs: Any) -> dict:
"""Log one step in the research loop.
Args:
action: What happened (e.g. "search", "fetch_url", "synthesize").
decision: Why this action was taken or what was concluded.
**kwargs: Additional fields (url, query, content_hash, etc.)
Returns:
The logged entry dict.
"""
self._step_counter += 1
entry = {
"step": self._step_counter,
"action": action,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"decision": decision,
}
entry.update(kwargs)
# Duration tracking (Issue #35). Record start times for starter
# actions; when the matching completer fires, attach elapsed time
# to both the trace entry and the operational log line.
now = time.monotonic()
if action in _STARTER_ACTIONS:
self._pending_starts[action] = now
duration_extras: dict[str, Any] = {}
if action in _DURATION_PAIRS:
starter = _DURATION_PAIRS[action]
start = self._pending_starts.pop(starter, None)
if start is not None:
elapsed = now - start
if action == "complete":
duration_extras["total_duration_sec"] = round(elapsed, 3)
else:
duration_extras["duration_ms"] = int(elapsed * 1000)
entry.update(duration_extras)
self._writer.write(json.dumps(entry, default=str) + "\n")
self._writer.flush()
# Mirror the trace step into the operational logger so admins
# can watch progress in real time. trace_id and researcher are
# already bound in contextvars by WebResearcher.research, so
# they automatically appear on every line.
log_method = _log.info if action in _INFO_ACTIONS else _log.debug
log_method(
action,
step=self._step_counter,
decision=decision,
**kwargs,
**duration_extras,
)
return entry
def write_result(self, result: Any) -> None:
"""Persist the final ResearchResult JSON next to the trace.
Issue #54: the JSONL trace records step events and final counts
only. Without the structured result on disk, replay can't show
which gaps fired or which sources were cited, and downstream
analysis (M3.2/M3.3) is impossible. We dump the pydantic model
to ``<trace_id>.result.json`` so the full contract survives the
process.
"""
# Accept either a pydantic model or a plain dict to keep the
# logger decoupled from the models module (avoids a circular
# import path).
if hasattr(result, "model_dump_json"):
payload = result.model_dump_json(indent=2)
else:
payload = json.dumps(result, indent=2, default=str)
self.result_path.write_text(payload, encoding="utf-8")
def read_entries(self) -> list[dict]:
"""Read all entries from the trace file.
Useful for replay and testing.
"""
if not self.file_path.exists():
return []
entries = []
with open(self.file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
entries.append(json.loads(line))
return entries
def close(self):
"""Close the trace file handle."""
if self._file is not None:
self._file.close()
self._file = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False