From 0d957336f553a41000805669f9869ae245b3eb12 Mon Sep 17 00:00:00 2001 From: Jeff Smith Date: Wed, 8 Apr 2026 15:52:25 -0600 Subject: [PATCH] M2.5.2: Cost ledger with price table (#25) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an append-only JSONL ledger of every research() call at ~/.marchwarden/costs.jsonl, supplementing (not replacing) the per-call cost_metadata field returned to callers. The ledger is the operator-facing source of truth for spend tracking, queryable via the upcoming `marchwarden costs` command (M2.5.3). Fields per entry: timestamp, trace_id, question (truncated 200ch), model_id, tokens_used, tokens_input, tokens_output, iterations_run, wall_time_sec, tavily_searches, estimated_cost_usd, budget_exhausted, confidence. Cost estimation reads ~/.marchwarden/prices.toml, which is auto-created with seed values for current Anthropic + Tavily rates on first run. Operators are expected to update prices.toml manually when upstream rates change — there is no automatic fetching. Existing files are never overwritten. Unknown models log a WARN and record estimated_cost_usd: null instead of crashing. Each ledger write also emits a structured `cost_recorded` log line via the M2.5.1 logger, so cost data ships to OpenSearch alongside the ledger file with no extra plumbing. Tracking changes in agent.py: - Track tokens_input / tokens_output split (not just total) - Count tavily_searches across iterations - _synthesize now returns (result, synth_in, synth_out) so the caller can attribute synthesis tokens to the running counters - Ledger.record() called after research_completed log; failures are caught and warn-logged so a ledger write can never poison a successful research call Tests cover: price table seeding, no-overwrite of existing files, cost estimation for known/unknown models, tavily-only cost, ledger appends, question truncation, env var override. End-to-end verified with a real Anthropic+Tavily call: 9107 input + 1140 output tokens, 1 tavily search, $0.049 estimated. 104/104 tests passing. Co-Authored-By: Claude Opus 4.6 (1M context) --- obs/costs.py | 179 +++++++++++++++++++++++++++++++++++++++ researchers/web/agent.py | 108 +++++++++++++++++------ tests/test_costs.py | 179 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 441 insertions(+), 25 deletions(-) create mode 100644 obs/costs.py create mode 100644 tests/test_costs.py diff --git a/obs/costs.py b/obs/costs.py new file mode 100644 index 0000000..1780e31 --- /dev/null +++ b/obs/costs.py @@ -0,0 +1,179 @@ +"""Cost tracking — price table loader and JSONL ledger writer. + +Supplements (does not replace) the per-call ``cost_metadata`` field +on ``ResearchResult``. Operators consume this ledger via the +``marchwarden costs`` command (M2.5.3) for spend tracking. + +Estimated costs are computed from a TOML price table at +``~/.marchwarden/prices.toml``, auto-created with seed values on +first run. Operators are expected to update prices manually when +upstream rates change — there is no automatic fetching. +""" + +from __future__ import annotations + +import json +import os +import time +from pathlib import Path +from typing import Optional + +try: + import tomllib # Python 3.11+ +except ModuleNotFoundError: # pragma: no cover + import tomli as tomllib # type: ignore[no-redef] + +from obs import get_logger + +log = get_logger("marchwarden.costs") + + +DEFAULT_LEDGER_PATH = "~/.marchwarden/costs.jsonl" +DEFAULT_PRICES_PATH = "~/.marchwarden/prices.toml" + +# Seed values current as of 2026-04. Operators should update +# ~/.marchwarden/prices.toml when upstream rates change. +SEED_PRICES_TOML = """\ +# Marchwarden price table — used for cost ledger estimation only. +# Update these values when upstream pricing changes. Marchwarden does +# not fetch prices automatically. +# +# input_per_mtok_usd = USD per 1,000,000 input tokens +# output_per_mtok_usd = USD per 1,000,000 output tokens + +[models."claude-sonnet-4-6"] +input_per_mtok_usd = 3.00 +output_per_mtok_usd = 15.00 + +[models."claude-opus-4-6"] +input_per_mtok_usd = 15.00 +output_per_mtok_usd = 75.00 + +[models."claude-haiku-4-5-20251001"] +input_per_mtok_usd = 1.00 +output_per_mtok_usd = 5.00 + +[tavily] +# Estimated post-free-tier per-search rate. Free tier covers the first +# 1000 searches per month at no cost. +per_search_usd = 0.005 +""" + + +class PriceTable: + """Loads and queries the price table at ~/.marchwarden/prices.toml.""" + + def __init__(self, path: Optional[str] = None): + self.path = Path(os.path.expanduser(path or DEFAULT_PRICES_PATH)) + self._data: dict = {} + self._ensure_file() + self._load() + + def _ensure_file(self) -> None: + if self.path.exists(): + return + self.path.parent.mkdir(parents=True, exist_ok=True) + self.path.write_text(SEED_PRICES_TOML, encoding="utf-8") + log.info("price_table_seeded", path=str(self.path)) + + def _load(self) -> None: + with open(self.path, "rb") as f: + self._data = tomllib.load(f) + + def estimate_call_usd( + self, + model_id: str, + tokens_input: Optional[int], + tokens_output: Optional[int], + tavily_searches: int, + ) -> Optional[float]: + """Estimate USD cost for a single research call. + + Returns None if the model is unknown — caller should record + ``estimated_cost_usd: null`` in the ledger and the operator + is expected to update prices.toml. + """ + models = self._data.get("models", {}) + model_prices = models.get(model_id) + if not model_prices: + log.warning( + "unknown_model_for_pricing", + model_id=model_id, + hint=f"add a [models.\"{model_id}\"] section to {self.path}", + ) + return None + + in_tok = tokens_input or 0 + out_tok = tokens_output or 0 + + input_cost = (in_tok / 1_000_000) * model_prices.get("input_per_mtok_usd", 0.0) + output_cost = (out_tok / 1_000_000) * model_prices.get("output_per_mtok_usd", 0.0) + + tavily = self._data.get("tavily", {}) + tavily_cost = tavily_searches * tavily.get("per_search_usd", 0.0) + + return round(input_cost + output_cost + tavily_cost, 6) + + +class CostLedger: + """Append-only JSONL ledger of completed research calls.""" + + def __init__( + self, + ledger_path: Optional[str] = None, + price_table: Optional[PriceTable] = None, + ): + env_path = os.environ.get("MARCHWARDEN_COST_LEDGER") + self.path = Path( + os.path.expanduser(ledger_path or env_path or DEFAULT_LEDGER_PATH) + ) + self.path.parent.mkdir(parents=True, exist_ok=True) + self.price_table = price_table or PriceTable() + + def record( + self, + *, + trace_id: str, + question: str, + model_id: str, + tokens_used: int, + tokens_input: Optional[int], + tokens_output: Optional[int], + iterations_run: int, + wall_time_sec: float, + tavily_searches: int, + budget_exhausted: bool, + confidence: float, + ) -> dict: + """Append one entry to the ledger and emit a structured log line. + + Returns the entry as a dict (useful for tests and the log call). + """ + estimated_cost_usd = self.price_table.estimate_call_usd( + model_id=model_id, + tokens_input=tokens_input, + tokens_output=tokens_output, + tavily_searches=tavily_searches, + ) + + entry = { + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "trace_id": trace_id, + "question": question[:200], + "model_id": model_id, + "tokens_used": tokens_used, + "tokens_input": tokens_input, + "tokens_output": tokens_output, + "iterations_run": iterations_run, + "wall_time_sec": round(wall_time_sec, 3), + "tavily_searches": tavily_searches, + "estimated_cost_usd": estimated_cost_usd, + "budget_exhausted": budget_exhausted, + "confidence": confidence, + } + + with open(self.path, "a", encoding="utf-8") as f: + f.write(json.dumps(entry) + "\n") + + log.info("cost_recorded", **entry) + return entry diff --git a/researchers/web/agent.py b/researchers/web/agent.py index 795580e..c626ef2 100644 --- a/researchers/web/agent.py +++ b/researchers/web/agent.py @@ -14,6 +14,7 @@ import structlog from anthropic import Anthropic from obs import get_logger +from obs.costs import CostLedger from researchers.web.models import ( Citation, ConfidenceFactors, @@ -173,11 +174,16 @@ class WebResearcher: tavily_api_key: str, model_id: str = "claude-sonnet-4-6", trace_dir: Optional[str] = None, + cost_ledger: Optional[CostLedger] = None, ): self.client = Anthropic(api_key=anthropic_api_key) self.tavily_api_key = tavily_api_key self.model_id = model_id self.trace_dir = trace_dir + # Lazy default — only constructed if no override is given. Tests + # inject a CostLedger pointed at a tmp path to avoid touching + # the real ledger file. + self.cost_ledger = cost_ledger async def research( self, @@ -201,9 +207,12 @@ class WebResearcher: trace = TraceLogger(trace_dir=self.trace_dir) start_time = time.time() total_tokens = 0 + tokens_input = 0 + tokens_output = 0 iterations = 0 evidence: list[dict] = [] budget_exhausted = False + tavily_searches = 0 # Bind trace context so every downstream log call automatically # carries trace_id and researcher. Cleared in the finally block. @@ -273,10 +282,15 @@ class WebResearcher: ) # Track tokens + tokens_input += response.usage.input_tokens + tokens_output += response.usage.output_tokens total_tokens += response.usage.input_tokens + response.usage.output_tokens # Check if the model wants to use tools tool_calls = [b for b in response.content if b.type == "tool_use"] + tavily_searches += sum( + 1 for tc in tool_calls if tc.name == "web_search" + ) if not tool_calls: # Model is done researching — extract any final text @@ -320,7 +334,7 @@ class WebResearcher: tokens_used=total_tokens, ) - result = await self._synthesize( + result, synth_in, synth_out = await self._synthesize( question=question, context=context, evidence=evidence, @@ -330,6 +344,8 @@ class WebResearcher: start_time=start_time, budget_exhausted=budget_exhausted, ) + tokens_input += synth_in + tokens_output += synth_out trace.log_step( "complete", @@ -352,6 +368,30 @@ class WebResearcher: wall_time_sec=result.cost_metadata.wall_time_sec, budget_exhausted=result.cost_metadata.budget_exhausted, ) + + # Append to the operational cost ledger. Construct on first use + # so test injection (cost_ledger=...) and the env override + # (MARCHWARDEN_COST_LEDGER) both work without forcing every + # caller to build a CostLedger explicitly. + try: + ledger = self.cost_ledger or CostLedger() + ledger.record( + trace_id=result.trace_id, + question=question, + model_id=self.model_id, + tokens_used=result.cost_metadata.tokens_used, + tokens_input=tokens_input, + tokens_output=tokens_output, + iterations_run=result.cost_metadata.iterations_run, + wall_time_sec=result.cost_metadata.wall_time_sec, + tavily_searches=tavily_searches, + budget_exhausted=result.cost_metadata.budget_exhausted, + confidence=result.confidence, + ) + except Exception as ledger_err: + # Never let a ledger failure poison a successful research call. + log.warning("cost_ledger_write_failed", error=str(ledger_err)) + structlog.contextvars.clear_contextvars() return result @@ -457,8 +497,12 @@ class WebResearcher: iterations: int, start_time: float, budget_exhausted: bool, - ) -> ResearchResult: - """Ask the LLM to synthesize evidence into a ResearchResult.""" + ) -> tuple[ResearchResult, int, int]: + """Ask the LLM to synthesize evidence into a ResearchResult. + + Returns ``(result, synthesis_input_tokens, synthesis_output_tokens)`` + so the caller can track per-call token splits for cost estimation. + """ # Format evidence for the synthesis prompt evidence_text = "" @@ -492,7 +536,9 @@ class WebResearcher: messages=[{"role": "user", "content": prompt}], ) - total_tokens += response.usage.input_tokens + response.usage.output_tokens + synth_in = response.usage.input_tokens + synth_out = response.usage.output_tokens + total_tokens += synth_in + synth_out wall_time = time.time() - start_time # Parse the JSON response @@ -517,9 +563,13 @@ class WebResearcher: parse_error=str(parse_err), raw_response=raw_text, ) - return self._fallback_result( - question, evidence, trace, total_tokens, iterations, - wall_time, budget_exhausted, + return ( + self._fallback_result( + question, evidence, trace, total_tokens, iterations, + wall_time, budget_exhausted, + ), + synth_in, + synth_out, ) trace.log_step( @@ -581,31 +631,39 @@ class WebResearcher: recency=cf.get("recency"), ) - return ResearchResult( - answer=data.get("answer", "No answer could be synthesized."), - citations=citations, - gaps=gaps, - discovery_events=discovery_events, - open_questions=open_questions, - confidence=data.get("confidence", 0.5), - confidence_factors=confidence_factors, - cost_metadata=CostMetadata( - tokens_used=total_tokens, - iterations_run=iterations, - wall_time_sec=wall_time, - budget_exhausted=budget_exhausted, - model_id=self.model_id, + return ( + ResearchResult( + answer=data.get("answer", "No answer could be synthesized."), + citations=citations, + gaps=gaps, + discovery_events=discovery_events, + open_questions=open_questions, + confidence=data.get("confidence", 0.5), + confidence_factors=confidence_factors, + cost_metadata=CostMetadata( + tokens_used=total_tokens, + iterations_run=iterations, + wall_time_sec=wall_time, + budget_exhausted=budget_exhausted, + model_id=self.model_id, + ), + trace_id=trace.trace_id, ), - trace_id=trace.trace_id, + synth_in, + synth_out, ) except Exception as e: trace.log_step( "synthesis_build_error", decision=f"Failed to build ResearchResult: {e}", ) - return self._fallback_result( - question, evidence, trace, total_tokens, iterations, - wall_time, budget_exhausted, + return ( + self._fallback_result( + question, evidence, trace, total_tokens, iterations, + wall_time, budget_exhausted, + ), + synth_in, + synth_out, ) def _fallback_result( diff --git a/tests/test_costs.py b/tests/test_costs.py new file mode 100644 index 0000000..1f65a53 --- /dev/null +++ b/tests/test_costs.py @@ -0,0 +1,179 @@ +"""Tests for the obs.costs cost ledger and price table.""" + +import json +from pathlib import Path + +import pytest + +from obs.costs import ( + DEFAULT_PRICES_PATH, + SEED_PRICES_TOML, + CostLedger, + PriceTable, +) + + +class TestPriceTable: + def test_seeds_missing_file(self, tmp_path): + prices_path = tmp_path / "prices.toml" + assert not prices_path.exists() + + table = PriceTable(path=str(prices_path)) + + assert prices_path.exists() + assert "claude-sonnet-4-6" in prices_path.read_text() + # Loaded into memory + assert table._data["models"]["claude-sonnet-4-6"]["input_per_mtok_usd"] == 3.00 + + def test_does_not_overwrite_existing_file(self, tmp_path): + prices_path = tmp_path / "prices.toml" + prices_path.write_text( + '[models."custom-model"]\n' + 'input_per_mtok_usd = 1.23\n' + 'output_per_mtok_usd = 4.56\n' + ) + table = PriceTable(path=str(prices_path)) + assert table._data["models"]["custom-model"]["input_per_mtok_usd"] == 1.23 + assert "claude-sonnet-4-6" not in table._data.get("models", {}) + + def test_estimates_known_model(self, tmp_path): + table = PriceTable(path=str(tmp_path / "prices.toml")) + # 1M input @ $3 + 1M output @ $15 = $18, no tavily + cost = table.estimate_call_usd( + model_id="claude-sonnet-4-6", + tokens_input=1_000_000, + tokens_output=1_000_000, + tavily_searches=0, + ) + assert cost == 18.00 + + def test_estimates_with_tavily(self, tmp_path): + table = PriceTable(path=str(tmp_path / "prices.toml")) + cost = table.estimate_call_usd( + model_id="claude-sonnet-4-6", + tokens_input=0, + tokens_output=0, + tavily_searches=10, + ) + # 10 * $0.005 = $0.05 + assert cost == 0.05 + + def test_unknown_model_returns_none(self, tmp_path): + table = PriceTable(path=str(tmp_path / "prices.toml")) + cost = table.estimate_call_usd( + model_id="some-future-model", + tokens_input=1000, + tokens_output=1000, + tavily_searches=0, + ) + assert cost is None + + +class TestCostLedger: + def _ledger(self, tmp_path): + return CostLedger( + ledger_path=str(tmp_path / "costs.jsonl"), + price_table=PriceTable(path=str(tmp_path / "prices.toml")), + ) + + def test_record_writes_jsonl(self, tmp_path): + ledger = self._ledger(tmp_path) + entry = ledger.record( + trace_id="abc-123", + question="What grows in Utah?", + model_id="claude-sonnet-4-6", + tokens_used=10_000, + tokens_input=8_000, + tokens_output=2_000, + iterations_run=3, + wall_time_sec=42.5, + tavily_searches=4, + budget_exhausted=False, + confidence=0.9, + ) + + # File contains one JSON line + lines = (tmp_path / "costs.jsonl").read_text().strip().splitlines() + assert len(lines) == 1 + on_disk = json.loads(lines[0]) + assert on_disk == entry + + # All required fields present and shaped correctly + assert on_disk["trace_id"] == "abc-123" + assert on_disk["question"] == "What grows in Utah?" + assert on_disk["model_id"] == "claude-sonnet-4-6" + assert on_disk["tokens_used"] == 10_000 + assert on_disk["tokens_input"] == 8_000 + assert on_disk["tokens_output"] == 2_000 + assert on_disk["iterations_run"] == 3 + assert on_disk["wall_time_sec"] == 42.5 + assert on_disk["tavily_searches"] == 4 + assert on_disk["budget_exhausted"] is False + assert on_disk["confidence"] == 0.9 + assert "timestamp" in on_disk + # 8000 input @ $3/Mtok + 2000 output @ $15/Mtok + 4 * $0.005 = $0.074 + assert on_disk["estimated_cost_usd"] == pytest.approx(0.074, abs=1e-6) + + def test_record_appends(self, tmp_path): + ledger = self._ledger(tmp_path) + for i in range(3): + ledger.record( + trace_id=f"trace-{i}", + question=f"q{i}", + model_id="claude-sonnet-4-6", + tokens_used=100, + tokens_input=80, + tokens_output=20, + iterations_run=1, + wall_time_sec=1.0, + tavily_searches=0, + budget_exhausted=False, + confidence=0.5, + ) + lines = (tmp_path / "costs.jsonl").read_text().strip().splitlines() + assert len(lines) == 3 + assert json.loads(lines[0])["trace_id"] == "trace-0" + assert json.loads(lines[2])["trace_id"] == "trace-2" + + def test_unknown_model_records_null_cost(self, tmp_path): + ledger = self._ledger(tmp_path) + entry = ledger.record( + trace_id="abc", + question="q", + model_id="some-future-model", + tokens_used=1000, + tokens_input=500, + tokens_output=500, + iterations_run=1, + wall_time_sec=1.0, + tavily_searches=0, + budget_exhausted=False, + confidence=0.5, + ) + assert entry["estimated_cost_usd"] is None + + def test_question_is_truncated(self, tmp_path): + ledger = self._ledger(tmp_path) + long_q = "x" * 1000 + entry = ledger.record( + trace_id="abc", + question=long_q, + model_id="claude-sonnet-4-6", + tokens_used=10, + tokens_input=5, + tokens_output=5, + iterations_run=1, + wall_time_sec=0.1, + tavily_searches=0, + budget_exhausted=False, + confidence=0.5, + ) + assert len(entry["question"]) == 200 + + def test_env_var_override(self, tmp_path, monkeypatch): + custom = tmp_path / "custom-ledger.jsonl" + monkeypatch.setenv("MARCHWARDEN_COST_LEDGER", str(custom)) + ledger = CostLedger( + price_table=PriceTable(path=str(tmp_path / "prices.toml")), + ) + assert ledger.path == custom