"""Trace logger for the web researcher. Produces JSONL audit logs keyed by trace_id. Each research() call gets its own trace file at {trace_dir}/{trace_id}.jsonl. Every line is a self-contained JSON object representing one step in the research loop. """ import json import os import time import uuid from pathlib import Path from typing import Any, Optional from obs import get_logger # Actions that get promoted to INFO in the operational log. Everything # else logs at DEBUG so the default INFO level shows ~6-8 milestones per # research call instead of 20+ chatty per-step events. Set # MARCHWARDEN_LOG_LEVEL=DEBUG to see all steps. _INFO_ACTIONS = frozenset( { "start", "iteration_start", "synthesis_start", "synthesis_complete", "synthesis_error", "synthesis_build_error", "budget_exhausted", "complete", } ) _log = get_logger("marchwarden.researcher.trace") class TraceLogger: """Logs research steps to a JSONL file. Usage: logger = TraceLogger() logger.log_step("search", query="Utah crops", decision="relevant query") logger.log_step("fetch_url", url="https://...", content_hash="sha256:abc...") logger.close() """ def __init__( self, trace_id: Optional[str] = None, trace_dir: Optional[str] = None, ): """Initialize a trace logger. Args: trace_id: UUID for this trace. Generated if not provided. trace_dir: Directory for trace files. Defaults to ~/.marchwarden/traces/ """ self.trace_id = trace_id or str(uuid.uuid4()) self.trace_dir = Path( trace_dir or os.path.expanduser("~/.marchwarden/traces") ) self.trace_dir.mkdir(parents=True, exist_ok=True) self.file_path = self.trace_dir / f"{self.trace_id}.jsonl" self._step_counter = 0 self._file = None @property def _writer(self): """Lazy file handle — opens on first write.""" if self._file is None: self._file = open(self.file_path, "a", encoding="utf-8") return self._file def log_step(self, action: str, decision: str = "", **kwargs: Any) -> dict: """Log one step in the research loop. Args: action: What happened (e.g. "search", "fetch_url", "synthesize"). decision: Why this action was taken or what was concluded. **kwargs: Additional fields (url, query, content_hash, etc.) Returns: The logged entry dict. """ self._step_counter += 1 entry = { "step": self._step_counter, "action": action, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "decision": decision, } entry.update(kwargs) self._writer.write(json.dumps(entry, default=str) + "\n") self._writer.flush() # Mirror the trace step into the operational logger so admins # can watch progress in real time. trace_id and researcher are # already bound in contextvars by WebResearcher.research, so # they automatically appear on every line. log_method = _log.info if action in _INFO_ACTIONS else _log.debug log_method(action, step=self._step_counter, decision=decision, **kwargs) return entry def read_entries(self) -> list[dict]: """Read all entries from the trace file. Useful for replay and testing. """ if not self.file_path.exists(): return [] entries = [] with open(self.file_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: entries.append(json.loads(line)) return entries def close(self): """Close the trace file handle.""" if self._file is not None: self._file.close() self._file = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False