"""Trace logger for the web researcher. Produces JSONL audit logs keyed by trace_id. Each research() call gets its own trace file at {trace_dir}/{trace_id}.jsonl. Every line is a self-contained JSON object representing one step in the research loop. """ import json import os import time import uuid from pathlib import Path from typing import Any, Optional from obs import get_logger # Actions that get promoted to INFO in the operational log. Everything # else logs at DEBUG so the default INFO level shows ~6-8 milestones per # research call instead of 20+ chatty per-step events. Set # MARCHWARDEN_LOG_LEVEL=DEBUG to see all steps. _INFO_ACTIONS = frozenset( { "start", "iteration_start", "synthesis_start", "synthesis_complete", "synthesis_error", "synthesis_build_error", "budget_exhausted", "complete", } ) _log = get_logger("marchwarden.researcher.trace") # Action pairings for duration tracking. When a starter action fires # we record a monotonic start time keyed by the starter name. When the # matching completer fires we compute the elapsed duration and attach # it as a field on the completer's entry, then clear the start. # # Synthesis has two possible completers (success or error), both # pointing back to synthesis_start. _DURATION_PAIRS: dict[str, str] = { "web_search_complete": "web_search", "fetch_url_complete": "fetch_url", "synthesis_complete": "synthesis_start", "synthesis_error": "synthesis_start", "complete": "start", } _STARTER_ACTIONS = frozenset(_DURATION_PAIRS.values()) class TraceLogger: """Logs research steps to a JSONL file. Usage: logger = TraceLogger() logger.log_step("search", query="Utah crops", decision="relevant query") logger.log_step("fetch_url", url="https://...", content_hash="sha256:abc...") logger.close() """ def __init__( self, trace_id: Optional[str] = None, trace_dir: Optional[str] = None, ): """Initialize a trace logger. Args: trace_id: UUID for this trace. Generated if not provided. trace_dir: Directory for trace files. Defaults to ~/.marchwarden/traces/ """ self.trace_id = trace_id or str(uuid.uuid4()) self.trace_dir = Path( trace_dir or os.path.expanduser("~/.marchwarden/traces") ) self.trace_dir.mkdir(parents=True, exist_ok=True) self.file_path = self.trace_dir / f"{self.trace_id}.jsonl" self._step_counter = 0 self._file = None # action_name -> monotonic start time, populated by starter # actions and consumed by their matching completer (Issue #35). self._pending_starts: dict[str, float] = {} @property def _writer(self): """Lazy file handle — opens on first write.""" if self._file is None: self._file = open(self.file_path, "a", encoding="utf-8") return self._file def log_step(self, action: str, decision: str = "", **kwargs: Any) -> dict: """Log one step in the research loop. Args: action: What happened (e.g. "search", "fetch_url", "synthesize"). decision: Why this action was taken or what was concluded. **kwargs: Additional fields (url, query, content_hash, etc.) Returns: The logged entry dict. """ self._step_counter += 1 entry = { "step": self._step_counter, "action": action, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "decision": decision, } entry.update(kwargs) # Duration tracking (Issue #35). Record start times for starter # actions; when the matching completer fires, attach elapsed time # to both the trace entry and the operational log line. now = time.monotonic() if action in _STARTER_ACTIONS: self._pending_starts[action] = now duration_extras: dict[str, Any] = {} if action in _DURATION_PAIRS: starter = _DURATION_PAIRS[action] start = self._pending_starts.pop(starter, None) if start is not None: elapsed = now - start if action == "complete": duration_extras["total_duration_sec"] = round(elapsed, 3) else: duration_extras["duration_ms"] = int(elapsed * 1000) entry.update(duration_extras) self._writer.write(json.dumps(entry, default=str) + "\n") self._writer.flush() # Mirror the trace step into the operational logger so admins # can watch progress in real time. trace_id and researcher are # already bound in contextvars by WebResearcher.research, so # they automatically appear on every line. log_method = _log.info if action in _INFO_ACTIONS else _log.debug log_method( action, step=self._step_counter, decision=decision, **kwargs, **duration_extras, ) return entry def read_entries(self) -> list[dict]: """Read all entries from the trace file. Useful for replay and testing. """ if not self.file_path.exists(): return [] entries = [] with open(self.file_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: entries.append(json.loads(line)) return entries def close(self): """Close the trace file handle.""" if self._file is not None: self._file.close() self._file = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False