Compare commits

..

No commits in common. "5a0ca73e2afb892ceef8f05fe595e6db57a6c9bf" and "d25c8865eafa3786b3dd8401cfbd85470dd8f0e7" have entirely different histories.

3 changed files with 25 additions and 441 deletions

View file

@ -1,179 +0,0 @@
"""Cost tracking — price table loader and JSONL ledger writer.
Supplements (does not replace) the per-call ``cost_metadata`` field
on ``ResearchResult``. Operators consume this ledger via the
``marchwarden costs`` command (M2.5.3) for spend tracking.
Estimated costs are computed from a TOML price table at
``~/.marchwarden/prices.toml``, auto-created with seed values on
first run. Operators are expected to update prices manually when
upstream rates change there is no automatic fetching.
"""
from __future__ import annotations
import json
import os
import time
from pathlib import Path
from typing import Optional
try:
import tomllib # Python 3.11+
except ModuleNotFoundError: # pragma: no cover
import tomli as tomllib # type: ignore[no-redef]
from obs import get_logger
log = get_logger("marchwarden.costs")
DEFAULT_LEDGER_PATH = "~/.marchwarden/costs.jsonl"
DEFAULT_PRICES_PATH = "~/.marchwarden/prices.toml"
# Seed values current as of 2026-04. Operators should update
# ~/.marchwarden/prices.toml when upstream rates change.
SEED_PRICES_TOML = """\
# Marchwarden price table — used for cost ledger estimation only.
# Update these values when upstream pricing changes. Marchwarden does
# not fetch prices automatically.
#
# input_per_mtok_usd = USD per 1,000,000 input tokens
# output_per_mtok_usd = USD per 1,000,000 output tokens
[models."claude-sonnet-4-6"]
input_per_mtok_usd = 3.00
output_per_mtok_usd = 15.00
[models."claude-opus-4-6"]
input_per_mtok_usd = 15.00
output_per_mtok_usd = 75.00
[models."claude-haiku-4-5-20251001"]
input_per_mtok_usd = 1.00
output_per_mtok_usd = 5.00
[tavily]
# Estimated post-free-tier per-search rate. Free tier covers the first
# 1000 searches per month at no cost.
per_search_usd = 0.005
"""
class PriceTable:
"""Loads and queries the price table at ~/.marchwarden/prices.toml."""
def __init__(self, path: Optional[str] = None):
self.path = Path(os.path.expanduser(path or DEFAULT_PRICES_PATH))
self._data: dict = {}
self._ensure_file()
self._load()
def _ensure_file(self) -> None:
if self.path.exists():
return
self.path.parent.mkdir(parents=True, exist_ok=True)
self.path.write_text(SEED_PRICES_TOML, encoding="utf-8")
log.info("price_table_seeded", path=str(self.path))
def _load(self) -> None:
with open(self.path, "rb") as f:
self._data = tomllib.load(f)
def estimate_call_usd(
self,
model_id: str,
tokens_input: Optional[int],
tokens_output: Optional[int],
tavily_searches: int,
) -> Optional[float]:
"""Estimate USD cost for a single research call.
Returns None if the model is unknown caller should record
``estimated_cost_usd: null`` in the ledger and the operator
is expected to update prices.toml.
"""
models = self._data.get("models", {})
model_prices = models.get(model_id)
if not model_prices:
log.warning(
"unknown_model_for_pricing",
model_id=model_id,
hint=f"add a [models.\"{model_id}\"] section to {self.path}",
)
return None
in_tok = tokens_input or 0
out_tok = tokens_output or 0
input_cost = (in_tok / 1_000_000) * model_prices.get("input_per_mtok_usd", 0.0)
output_cost = (out_tok / 1_000_000) * model_prices.get("output_per_mtok_usd", 0.0)
tavily = self._data.get("tavily", {})
tavily_cost = tavily_searches * tavily.get("per_search_usd", 0.0)
return round(input_cost + output_cost + tavily_cost, 6)
class CostLedger:
"""Append-only JSONL ledger of completed research calls."""
def __init__(
self,
ledger_path: Optional[str] = None,
price_table: Optional[PriceTable] = None,
):
env_path = os.environ.get("MARCHWARDEN_COST_LEDGER")
self.path = Path(
os.path.expanduser(ledger_path or env_path or DEFAULT_LEDGER_PATH)
)
self.path.parent.mkdir(parents=True, exist_ok=True)
self.price_table = price_table or PriceTable()
def record(
self,
*,
trace_id: str,
question: str,
model_id: str,
tokens_used: int,
tokens_input: Optional[int],
tokens_output: Optional[int],
iterations_run: int,
wall_time_sec: float,
tavily_searches: int,
budget_exhausted: bool,
confidence: float,
) -> dict:
"""Append one entry to the ledger and emit a structured log line.
Returns the entry as a dict (useful for tests and the log call).
"""
estimated_cost_usd = self.price_table.estimate_call_usd(
model_id=model_id,
tokens_input=tokens_input,
tokens_output=tokens_output,
tavily_searches=tavily_searches,
)
entry = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"trace_id": trace_id,
"question": question[:200],
"model_id": model_id,
"tokens_used": tokens_used,
"tokens_input": tokens_input,
"tokens_output": tokens_output,
"iterations_run": iterations_run,
"wall_time_sec": round(wall_time_sec, 3),
"tavily_searches": tavily_searches,
"estimated_cost_usd": estimated_cost_usd,
"budget_exhausted": budget_exhausted,
"confidence": confidence,
}
with open(self.path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
log.info("cost_recorded", **entry)
return entry

View file

@ -14,7 +14,6 @@ import structlog
from anthropic import Anthropic
from obs import get_logger
from obs.costs import CostLedger
from researchers.web.models import (
Citation,
ConfidenceFactors,
@ -174,16 +173,11 @@ class WebResearcher:
tavily_api_key: str,
model_id: str = "claude-sonnet-4-6",
trace_dir: Optional[str] = None,
cost_ledger: Optional[CostLedger] = None,
):
self.client = Anthropic(api_key=anthropic_api_key)
self.tavily_api_key = tavily_api_key
self.model_id = model_id
self.trace_dir = trace_dir
# Lazy default — only constructed if no override is given. Tests
# inject a CostLedger pointed at a tmp path to avoid touching
# the real ledger file.
self.cost_ledger = cost_ledger
async def research(
self,
@ -207,12 +201,9 @@ class WebResearcher:
trace = TraceLogger(trace_dir=self.trace_dir)
start_time = time.time()
total_tokens = 0
tokens_input = 0
tokens_output = 0
iterations = 0
evidence: list[dict] = []
budget_exhausted = False
tavily_searches = 0
# Bind trace context so every downstream log call automatically
# carries trace_id and researcher. Cleared in the finally block.
@ -282,15 +273,10 @@ class WebResearcher:
)
# Track tokens
tokens_input += response.usage.input_tokens
tokens_output += response.usage.output_tokens
total_tokens += response.usage.input_tokens + response.usage.output_tokens
# Check if the model wants to use tools
tool_calls = [b for b in response.content if b.type == "tool_use"]
tavily_searches += sum(
1 for tc in tool_calls if tc.name == "web_search"
)
if not tool_calls:
# Model is done researching — extract any final text
@ -334,7 +320,7 @@ class WebResearcher:
tokens_used=total_tokens,
)
result, synth_in, synth_out = await self._synthesize(
result = await self._synthesize(
question=question,
context=context,
evidence=evidence,
@ -344,8 +330,6 @@ class WebResearcher:
start_time=start_time,
budget_exhausted=budget_exhausted,
)
tokens_input += synth_in
tokens_output += synth_out
trace.log_step(
"complete",
@ -368,30 +352,6 @@ class WebResearcher:
wall_time_sec=result.cost_metadata.wall_time_sec,
budget_exhausted=result.cost_metadata.budget_exhausted,
)
# Append to the operational cost ledger. Construct on first use
# so test injection (cost_ledger=...) and the env override
# (MARCHWARDEN_COST_LEDGER) both work without forcing every
# caller to build a CostLedger explicitly.
try:
ledger = self.cost_ledger or CostLedger()
ledger.record(
trace_id=result.trace_id,
question=question,
model_id=self.model_id,
tokens_used=result.cost_metadata.tokens_used,
tokens_input=tokens_input,
tokens_output=tokens_output,
iterations_run=result.cost_metadata.iterations_run,
wall_time_sec=result.cost_metadata.wall_time_sec,
tavily_searches=tavily_searches,
budget_exhausted=result.cost_metadata.budget_exhausted,
confidence=result.confidence,
)
except Exception as ledger_err:
# Never let a ledger failure poison a successful research call.
log.warning("cost_ledger_write_failed", error=str(ledger_err))
structlog.contextvars.clear_contextvars()
return result
@ -497,12 +457,8 @@ class WebResearcher:
iterations: int,
start_time: float,
budget_exhausted: bool,
) -> tuple[ResearchResult, int, int]:
"""Ask the LLM to synthesize evidence into a ResearchResult.
Returns ``(result, synthesis_input_tokens, synthesis_output_tokens)``
so the caller can track per-call token splits for cost estimation.
"""
) -> ResearchResult:
"""Ask the LLM to synthesize evidence into a ResearchResult."""
# Format evidence for the synthesis prompt
evidence_text = ""
@ -536,9 +492,7 @@ class WebResearcher:
messages=[{"role": "user", "content": prompt}],
)
synth_in = response.usage.input_tokens
synth_out = response.usage.output_tokens
total_tokens += synth_in + synth_out
total_tokens += response.usage.input_tokens + response.usage.output_tokens
wall_time = time.time() - start_time
# Parse the JSON response
@ -563,13 +517,9 @@ class WebResearcher:
parse_error=str(parse_err),
raw_response=raw_text,
)
return (
self._fallback_result(
question, evidence, trace, total_tokens, iterations,
wall_time, budget_exhausted,
),
synth_in,
synth_out,
return self._fallback_result(
question, evidence, trace, total_tokens, iterations,
wall_time, budget_exhausted,
)
trace.log_step(
@ -631,39 +581,31 @@ class WebResearcher:
recency=cf.get("recency"),
)
return (
ResearchResult(
answer=data.get("answer", "No answer could be synthesized."),
citations=citations,
gaps=gaps,
discovery_events=discovery_events,
open_questions=open_questions,
confidence=data.get("confidence", 0.5),
confidence_factors=confidence_factors,
cost_metadata=CostMetadata(
tokens_used=total_tokens,
iterations_run=iterations,
wall_time_sec=wall_time,
budget_exhausted=budget_exhausted,
model_id=self.model_id,
),
trace_id=trace.trace_id,
return ResearchResult(
answer=data.get("answer", "No answer could be synthesized."),
citations=citations,
gaps=gaps,
discovery_events=discovery_events,
open_questions=open_questions,
confidence=data.get("confidence", 0.5),
confidence_factors=confidence_factors,
cost_metadata=CostMetadata(
tokens_used=total_tokens,
iterations_run=iterations,
wall_time_sec=wall_time,
budget_exhausted=budget_exhausted,
model_id=self.model_id,
),
synth_in,
synth_out,
trace_id=trace.trace_id,
)
except Exception as e:
trace.log_step(
"synthesis_build_error",
decision=f"Failed to build ResearchResult: {e}",
)
return (
self._fallback_result(
question, evidence, trace, total_tokens, iterations,
wall_time, budget_exhausted,
),
synth_in,
synth_out,
return self._fallback_result(
question, evidence, trace, total_tokens, iterations,
wall_time, budget_exhausted,
)
def _fallback_result(

View file

@ -1,179 +0,0 @@
"""Tests for the obs.costs cost ledger and price table."""
import json
from pathlib import Path
import pytest
from obs.costs import (
DEFAULT_PRICES_PATH,
SEED_PRICES_TOML,
CostLedger,
PriceTable,
)
class TestPriceTable:
def test_seeds_missing_file(self, tmp_path):
prices_path = tmp_path / "prices.toml"
assert not prices_path.exists()
table = PriceTable(path=str(prices_path))
assert prices_path.exists()
assert "claude-sonnet-4-6" in prices_path.read_text()
# Loaded into memory
assert table._data["models"]["claude-sonnet-4-6"]["input_per_mtok_usd"] == 3.00
def test_does_not_overwrite_existing_file(self, tmp_path):
prices_path = tmp_path / "prices.toml"
prices_path.write_text(
'[models."custom-model"]\n'
'input_per_mtok_usd = 1.23\n'
'output_per_mtok_usd = 4.56\n'
)
table = PriceTable(path=str(prices_path))
assert table._data["models"]["custom-model"]["input_per_mtok_usd"] == 1.23
assert "claude-sonnet-4-6" not in table._data.get("models", {})
def test_estimates_known_model(self, tmp_path):
table = PriceTable(path=str(tmp_path / "prices.toml"))
# 1M input @ $3 + 1M output @ $15 = $18, no tavily
cost = table.estimate_call_usd(
model_id="claude-sonnet-4-6",
tokens_input=1_000_000,
tokens_output=1_000_000,
tavily_searches=0,
)
assert cost == 18.00
def test_estimates_with_tavily(self, tmp_path):
table = PriceTable(path=str(tmp_path / "prices.toml"))
cost = table.estimate_call_usd(
model_id="claude-sonnet-4-6",
tokens_input=0,
tokens_output=0,
tavily_searches=10,
)
# 10 * $0.005 = $0.05
assert cost == 0.05
def test_unknown_model_returns_none(self, tmp_path):
table = PriceTable(path=str(tmp_path / "prices.toml"))
cost = table.estimate_call_usd(
model_id="some-future-model",
tokens_input=1000,
tokens_output=1000,
tavily_searches=0,
)
assert cost is None
class TestCostLedger:
def _ledger(self, tmp_path):
return CostLedger(
ledger_path=str(tmp_path / "costs.jsonl"),
price_table=PriceTable(path=str(tmp_path / "prices.toml")),
)
def test_record_writes_jsonl(self, tmp_path):
ledger = self._ledger(tmp_path)
entry = ledger.record(
trace_id="abc-123",
question="What grows in Utah?",
model_id="claude-sonnet-4-6",
tokens_used=10_000,
tokens_input=8_000,
tokens_output=2_000,
iterations_run=3,
wall_time_sec=42.5,
tavily_searches=4,
budget_exhausted=False,
confidence=0.9,
)
# File contains one JSON line
lines = (tmp_path / "costs.jsonl").read_text().strip().splitlines()
assert len(lines) == 1
on_disk = json.loads(lines[0])
assert on_disk == entry
# All required fields present and shaped correctly
assert on_disk["trace_id"] == "abc-123"
assert on_disk["question"] == "What grows in Utah?"
assert on_disk["model_id"] == "claude-sonnet-4-6"
assert on_disk["tokens_used"] == 10_000
assert on_disk["tokens_input"] == 8_000
assert on_disk["tokens_output"] == 2_000
assert on_disk["iterations_run"] == 3
assert on_disk["wall_time_sec"] == 42.5
assert on_disk["tavily_searches"] == 4
assert on_disk["budget_exhausted"] is False
assert on_disk["confidence"] == 0.9
assert "timestamp" in on_disk
# 8000 input @ $3/Mtok + 2000 output @ $15/Mtok + 4 * $0.005 = $0.074
assert on_disk["estimated_cost_usd"] == pytest.approx(0.074, abs=1e-6)
def test_record_appends(self, tmp_path):
ledger = self._ledger(tmp_path)
for i in range(3):
ledger.record(
trace_id=f"trace-{i}",
question=f"q{i}",
model_id="claude-sonnet-4-6",
tokens_used=100,
tokens_input=80,
tokens_output=20,
iterations_run=1,
wall_time_sec=1.0,
tavily_searches=0,
budget_exhausted=False,
confidence=0.5,
)
lines = (tmp_path / "costs.jsonl").read_text().strip().splitlines()
assert len(lines) == 3
assert json.loads(lines[0])["trace_id"] == "trace-0"
assert json.loads(lines[2])["trace_id"] == "trace-2"
def test_unknown_model_records_null_cost(self, tmp_path):
ledger = self._ledger(tmp_path)
entry = ledger.record(
trace_id="abc",
question="q",
model_id="some-future-model",
tokens_used=1000,
tokens_input=500,
tokens_output=500,
iterations_run=1,
wall_time_sec=1.0,
tavily_searches=0,
budget_exhausted=False,
confidence=0.5,
)
assert entry["estimated_cost_usd"] is None
def test_question_is_truncated(self, tmp_path):
ledger = self._ledger(tmp_path)
long_q = "x" * 1000
entry = ledger.record(
trace_id="abc",
question=long_q,
model_id="claude-sonnet-4-6",
tokens_used=10,
tokens_input=5,
tokens_output=5,
iterations_run=1,
wall_time_sec=0.1,
tavily_searches=0,
budget_exhausted=False,
confidence=0.5,
)
assert len(entry["question"]) == 200
def test_env_var_override(self, tmp_path, monkeypatch):
custom = tmp_path / "custom-ledger.jsonl"
monkeypatch.setenv("MARCHWARDEN_COST_LEDGER", str(custom))
ledger = CostLedger(
price_table=PriceTable(path=str(tmp_path / "prices.toml")),
)
assert ledger.path == custom