Compare commits

..

2 commits

Author SHA1 Message Date
21c8191b81 Merge pull request 'M1.2: Trace logger' (#4) from feat/trace-logger into main 2026-04-08 20:25:58 +00:00
Jeff Smith
cef08c8984 M1.2: Trace logger with tests
TraceLogger produces JSONL audit logs per research() call:
- One file per trace_id at ~/.marchwarden/traces/{trace_id}.jsonl
- Each line is a self-contained JSON object (step, action, timestamp, decision)
- Supports arbitrary kwargs (url, content_hash, query, etc.)
- Lazy file handle, flush after each write, context manager support
- read_entries() for replay and testing

15 tests: file creation, step counting, JSONL validity, kwargs,
timestamps, flush behavior, multiple independent traces.

Refs: archeious/marchwarden#1

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-04-08 14:21:10 -06:00
2 changed files with 250 additions and 0 deletions

105
researchers/web/trace.py Normal file
View file

@ -0,0 +1,105 @@
"""Trace logger for the web researcher.
Produces JSONL audit logs keyed by trace_id. Each research() call
gets its own trace file at {trace_dir}/{trace_id}.jsonl.
Every line is a self-contained JSON object representing one step
in the research loop.
"""
import json
import os
import time
import uuid
from pathlib import Path
from typing import Any, Optional
class TraceLogger:
"""Logs research steps to a JSONL file.
Usage:
logger = TraceLogger()
logger.log_step("search", query="Utah crops", decision="relevant query")
logger.log_step("fetch_url", url="https://...", content_hash="sha256:abc...")
logger.close()
"""
def __init__(
self,
trace_id: Optional[str] = None,
trace_dir: Optional[str] = None,
):
"""Initialize a trace logger.
Args:
trace_id: UUID for this trace. Generated if not provided.
trace_dir: Directory for trace files. Defaults to ~/.marchwarden/traces/
"""
self.trace_id = trace_id or str(uuid.uuid4())
self.trace_dir = Path(
trace_dir or os.path.expanduser("~/.marchwarden/traces")
)
self.trace_dir.mkdir(parents=True, exist_ok=True)
self.file_path = self.trace_dir / f"{self.trace_id}.jsonl"
self._step_counter = 0
self._file = None
@property
def _writer(self):
"""Lazy file handle — opens on first write."""
if self._file is None:
self._file = open(self.file_path, "a", encoding="utf-8")
return self._file
def log_step(self, action: str, decision: str = "", **kwargs: Any) -> dict:
"""Log one step in the research loop.
Args:
action: What happened (e.g. "search", "fetch_url", "synthesize").
decision: Why this action was taken or what was concluded.
**kwargs: Additional fields (url, query, content_hash, etc.)
Returns:
The logged entry dict.
"""
self._step_counter += 1
entry = {
"step": self._step_counter,
"action": action,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"decision": decision,
}
entry.update(kwargs)
self._writer.write(json.dumps(entry, default=str) + "\n")
self._writer.flush()
return entry
def read_entries(self) -> list[dict]:
"""Read all entries from the trace file.
Useful for replay and testing.
"""
if not self.file_path.exists():
return []
entries = []
with open(self.file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
entries.append(json.loads(line))
return entries
def close(self):
"""Close the trace file handle."""
if self._file is not None:
self._file.close()
self._file = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False

145
tests/test_trace.py Normal file
View file

@ -0,0 +1,145 @@
"""Tests for the trace logger."""
import json
import os
import tempfile
import uuid
from researchers.web.trace import TraceLogger
class TestTraceLogger:
def _make_logger(self, tmp_dir, **kwargs):
return TraceLogger(trace_dir=tmp_dir, **kwargs)
def test_generates_trace_id(self):
with tempfile.TemporaryDirectory() as tmp:
logger = self._make_logger(tmp)
# Should be a valid UUID
uuid.UUID(logger.trace_id)
def test_uses_provided_trace_id(self):
with tempfile.TemporaryDirectory() as tmp:
logger = self._make_logger(tmp, trace_id="my-custom-id")
assert logger.trace_id == "my-custom-id"
def test_creates_trace_dir(self):
with tempfile.TemporaryDirectory() as tmp:
nested = os.path.join(tmp, "deep", "nested", "dir")
logger = TraceLogger(trace_dir=nested)
assert os.path.isdir(nested)
logger.close()
def test_file_path(self):
with tempfile.TemporaryDirectory() as tmp:
logger = self._make_logger(tmp, trace_id="test-123")
assert str(logger.file_path).endswith("test-123.jsonl")
def test_log_step_creates_file(self):
with tempfile.TemporaryDirectory() as tmp:
with self._make_logger(tmp) as logger:
logger.log_step("search", query="test")
assert logger.file_path.exists()
def test_log_step_increments_counter(self):
with tempfile.TemporaryDirectory() as tmp:
with self._make_logger(tmp) as logger:
e1 = logger.log_step("search")
e2 = logger.log_step("fetch")
e3 = logger.log_step("synthesize")
assert e1["step"] == 1
assert e2["step"] == 2
assert e3["step"] == 3
def test_log_step_required_fields(self):
with tempfile.TemporaryDirectory() as tmp:
with self._make_logger(tmp) as logger:
entry = logger.log_step("search", decision="relevant query")
assert entry["action"] == "search"
assert entry["decision"] == "relevant query"
assert "timestamp" in entry
assert "step" in entry
def test_log_step_extra_kwargs(self):
with tempfile.TemporaryDirectory() as tmp:
with self._make_logger(tmp) as logger:
entry = logger.log_step(
"fetch_url",
url="https://example.com",
content_hash="sha256:abc123",
content_length=5000,
)
assert entry["url"] == "https://example.com"
assert entry["content_hash"] == "sha256:abc123"
assert entry["content_length"] == 5000
def test_entries_are_valid_jsonl(self):
with tempfile.TemporaryDirectory() as tmp:
with self._make_logger(tmp) as logger:
logger.log_step("search", query="test")
logger.log_step("fetch_url", url="https://example.com")
logger.log_step("synthesize", decision="done")
with open(logger.file_path) as f:
lines = [l.strip() for l in f if l.strip()]
assert len(lines) == 3
for line in lines:
parsed = json.loads(line)
assert "step" in parsed
assert "action" in parsed
assert "timestamp" in parsed
def test_read_entries(self):
with tempfile.TemporaryDirectory() as tmp:
with self._make_logger(tmp) as logger:
logger.log_step("search", query="q1")
logger.log_step("fetch_url", url="https://example.com")
entries = logger.read_entries()
assert len(entries) == 2
assert entries[0]["action"] == "search"
assert entries[0]["query"] == "q1"
assert entries[1]["action"] == "fetch_url"
def test_read_entries_empty_file(self):
with tempfile.TemporaryDirectory() as tmp:
logger = self._make_logger(tmp)
entries = logger.read_entries()
assert entries == []
def test_context_manager_closes_file(self):
with tempfile.TemporaryDirectory() as tmp:
with self._make_logger(tmp) as logger:
logger.log_step("search")
assert logger._file is None
def test_timestamp_format(self):
with tempfile.TemporaryDirectory() as tmp:
with self._make_logger(tmp) as logger:
entry = logger.log_step("search")
# ISO 8601 UTC format
ts = entry["timestamp"]
assert ts.endswith("Z")
assert "T" in ts
def test_flush_after_each_write(self):
"""Entries should be readable immediately, not buffered."""
with tempfile.TemporaryDirectory() as tmp:
logger = self._make_logger(tmp)
logger.log_step("search", query="test")
# Read without closing
entries = logger.read_entries()
assert len(entries) == 1
logger.close()
def test_multiple_loggers_different_files(self):
with tempfile.TemporaryDirectory() as tmp:
with self._make_logger(tmp, trace_id="trace-a") as a:
a.log_step("search", query="a")
with self._make_logger(tmp, trace_id="trace-b") as b:
b.log_step("search", query="b")
assert a.read_entries()[0]["query"] == "a"
assert b.read_entries()[0]["query"] == "b"
assert a.file_path != b.file_path