Compare commits
No commits in common. "5a0ca73e2afb892ceef8f05fe595e6db57a6c9bf" and "d25c8865eafa3786b3dd8401cfbd85470dd8f0e7" have entirely different histories.
5a0ca73e2a
...
d25c8865ea
3 changed files with 25 additions and 441 deletions
179
obs/costs.py
179
obs/costs.py
|
|
@ -1,179 +0,0 @@
|
|||
"""Cost tracking — price table loader and JSONL ledger writer.
|
||||
|
||||
Supplements (does not replace) the per-call ``cost_metadata`` field
|
||||
on ``ResearchResult``. Operators consume this ledger via the
|
||||
``marchwarden costs`` command (M2.5.3) for spend tracking.
|
||||
|
||||
Estimated costs are computed from a TOML price table at
|
||||
``~/.marchwarden/prices.toml``, auto-created with seed values on
|
||||
first run. Operators are expected to update prices manually when
|
||||
upstream rates change — there is no automatic fetching.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
try:
|
||||
import tomllib # Python 3.11+
|
||||
except ModuleNotFoundError: # pragma: no cover
|
||||
import tomli as tomllib # type: ignore[no-redef]
|
||||
|
||||
from obs import get_logger
|
||||
|
||||
log = get_logger("marchwarden.costs")
|
||||
|
||||
|
||||
DEFAULT_LEDGER_PATH = "~/.marchwarden/costs.jsonl"
|
||||
DEFAULT_PRICES_PATH = "~/.marchwarden/prices.toml"
|
||||
|
||||
# Seed values current as of 2026-04. Operators should update
|
||||
# ~/.marchwarden/prices.toml when upstream rates change.
|
||||
SEED_PRICES_TOML = """\
|
||||
# Marchwarden price table — used for cost ledger estimation only.
|
||||
# Update these values when upstream pricing changes. Marchwarden does
|
||||
# not fetch prices automatically.
|
||||
#
|
||||
# input_per_mtok_usd = USD per 1,000,000 input tokens
|
||||
# output_per_mtok_usd = USD per 1,000,000 output tokens
|
||||
|
||||
[models."claude-sonnet-4-6"]
|
||||
input_per_mtok_usd = 3.00
|
||||
output_per_mtok_usd = 15.00
|
||||
|
||||
[models."claude-opus-4-6"]
|
||||
input_per_mtok_usd = 15.00
|
||||
output_per_mtok_usd = 75.00
|
||||
|
||||
[models."claude-haiku-4-5-20251001"]
|
||||
input_per_mtok_usd = 1.00
|
||||
output_per_mtok_usd = 5.00
|
||||
|
||||
[tavily]
|
||||
# Estimated post-free-tier per-search rate. Free tier covers the first
|
||||
# 1000 searches per month at no cost.
|
||||
per_search_usd = 0.005
|
||||
"""
|
||||
|
||||
|
||||
class PriceTable:
|
||||
"""Loads and queries the price table at ~/.marchwarden/prices.toml."""
|
||||
|
||||
def __init__(self, path: Optional[str] = None):
|
||||
self.path = Path(os.path.expanduser(path or DEFAULT_PRICES_PATH))
|
||||
self._data: dict = {}
|
||||
self._ensure_file()
|
||||
self._load()
|
||||
|
||||
def _ensure_file(self) -> None:
|
||||
if self.path.exists():
|
||||
return
|
||||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self.path.write_text(SEED_PRICES_TOML, encoding="utf-8")
|
||||
log.info("price_table_seeded", path=str(self.path))
|
||||
|
||||
def _load(self) -> None:
|
||||
with open(self.path, "rb") as f:
|
||||
self._data = tomllib.load(f)
|
||||
|
||||
def estimate_call_usd(
|
||||
self,
|
||||
model_id: str,
|
||||
tokens_input: Optional[int],
|
||||
tokens_output: Optional[int],
|
||||
tavily_searches: int,
|
||||
) -> Optional[float]:
|
||||
"""Estimate USD cost for a single research call.
|
||||
|
||||
Returns None if the model is unknown — caller should record
|
||||
``estimated_cost_usd: null`` in the ledger and the operator
|
||||
is expected to update prices.toml.
|
||||
"""
|
||||
models = self._data.get("models", {})
|
||||
model_prices = models.get(model_id)
|
||||
if not model_prices:
|
||||
log.warning(
|
||||
"unknown_model_for_pricing",
|
||||
model_id=model_id,
|
||||
hint=f"add a [models.\"{model_id}\"] section to {self.path}",
|
||||
)
|
||||
return None
|
||||
|
||||
in_tok = tokens_input or 0
|
||||
out_tok = tokens_output or 0
|
||||
|
||||
input_cost = (in_tok / 1_000_000) * model_prices.get("input_per_mtok_usd", 0.0)
|
||||
output_cost = (out_tok / 1_000_000) * model_prices.get("output_per_mtok_usd", 0.0)
|
||||
|
||||
tavily = self._data.get("tavily", {})
|
||||
tavily_cost = tavily_searches * tavily.get("per_search_usd", 0.0)
|
||||
|
||||
return round(input_cost + output_cost + tavily_cost, 6)
|
||||
|
||||
|
||||
class CostLedger:
|
||||
"""Append-only JSONL ledger of completed research calls."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
ledger_path: Optional[str] = None,
|
||||
price_table: Optional[PriceTable] = None,
|
||||
):
|
||||
env_path = os.environ.get("MARCHWARDEN_COST_LEDGER")
|
||||
self.path = Path(
|
||||
os.path.expanduser(ledger_path or env_path or DEFAULT_LEDGER_PATH)
|
||||
)
|
||||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self.price_table = price_table or PriceTable()
|
||||
|
||||
def record(
|
||||
self,
|
||||
*,
|
||||
trace_id: str,
|
||||
question: str,
|
||||
model_id: str,
|
||||
tokens_used: int,
|
||||
tokens_input: Optional[int],
|
||||
tokens_output: Optional[int],
|
||||
iterations_run: int,
|
||||
wall_time_sec: float,
|
||||
tavily_searches: int,
|
||||
budget_exhausted: bool,
|
||||
confidence: float,
|
||||
) -> dict:
|
||||
"""Append one entry to the ledger and emit a structured log line.
|
||||
|
||||
Returns the entry as a dict (useful for tests and the log call).
|
||||
"""
|
||||
estimated_cost_usd = self.price_table.estimate_call_usd(
|
||||
model_id=model_id,
|
||||
tokens_input=tokens_input,
|
||||
tokens_output=tokens_output,
|
||||
tavily_searches=tavily_searches,
|
||||
)
|
||||
|
||||
entry = {
|
||||
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||||
"trace_id": trace_id,
|
||||
"question": question[:200],
|
||||
"model_id": model_id,
|
||||
"tokens_used": tokens_used,
|
||||
"tokens_input": tokens_input,
|
||||
"tokens_output": tokens_output,
|
||||
"iterations_run": iterations_run,
|
||||
"wall_time_sec": round(wall_time_sec, 3),
|
||||
"tavily_searches": tavily_searches,
|
||||
"estimated_cost_usd": estimated_cost_usd,
|
||||
"budget_exhausted": budget_exhausted,
|
||||
"confidence": confidence,
|
||||
}
|
||||
|
||||
with open(self.path, "a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(entry) + "\n")
|
||||
|
||||
log.info("cost_recorded", **entry)
|
||||
return entry
|
||||
|
|
@ -14,7 +14,6 @@ import structlog
|
|||
from anthropic import Anthropic
|
||||
|
||||
from obs import get_logger
|
||||
from obs.costs import CostLedger
|
||||
from researchers.web.models import (
|
||||
Citation,
|
||||
ConfidenceFactors,
|
||||
|
|
@ -174,16 +173,11 @@ class WebResearcher:
|
|||
tavily_api_key: str,
|
||||
model_id: str = "claude-sonnet-4-6",
|
||||
trace_dir: Optional[str] = None,
|
||||
cost_ledger: Optional[CostLedger] = None,
|
||||
):
|
||||
self.client = Anthropic(api_key=anthropic_api_key)
|
||||
self.tavily_api_key = tavily_api_key
|
||||
self.model_id = model_id
|
||||
self.trace_dir = trace_dir
|
||||
# Lazy default — only constructed if no override is given. Tests
|
||||
# inject a CostLedger pointed at a tmp path to avoid touching
|
||||
# the real ledger file.
|
||||
self.cost_ledger = cost_ledger
|
||||
|
||||
async def research(
|
||||
self,
|
||||
|
|
@ -207,12 +201,9 @@ class WebResearcher:
|
|||
trace = TraceLogger(trace_dir=self.trace_dir)
|
||||
start_time = time.time()
|
||||
total_tokens = 0
|
||||
tokens_input = 0
|
||||
tokens_output = 0
|
||||
iterations = 0
|
||||
evidence: list[dict] = []
|
||||
budget_exhausted = False
|
||||
tavily_searches = 0
|
||||
|
||||
# Bind trace context so every downstream log call automatically
|
||||
# carries trace_id and researcher. Cleared in the finally block.
|
||||
|
|
@ -282,15 +273,10 @@ class WebResearcher:
|
|||
)
|
||||
|
||||
# Track tokens
|
||||
tokens_input += response.usage.input_tokens
|
||||
tokens_output += response.usage.output_tokens
|
||||
total_tokens += response.usage.input_tokens + response.usage.output_tokens
|
||||
|
||||
# Check if the model wants to use tools
|
||||
tool_calls = [b for b in response.content if b.type == "tool_use"]
|
||||
tavily_searches += sum(
|
||||
1 for tc in tool_calls if tc.name == "web_search"
|
||||
)
|
||||
|
||||
if not tool_calls:
|
||||
# Model is done researching — extract any final text
|
||||
|
|
@ -334,7 +320,7 @@ class WebResearcher:
|
|||
tokens_used=total_tokens,
|
||||
)
|
||||
|
||||
result, synth_in, synth_out = await self._synthesize(
|
||||
result = await self._synthesize(
|
||||
question=question,
|
||||
context=context,
|
||||
evidence=evidence,
|
||||
|
|
@ -344,8 +330,6 @@ class WebResearcher:
|
|||
start_time=start_time,
|
||||
budget_exhausted=budget_exhausted,
|
||||
)
|
||||
tokens_input += synth_in
|
||||
tokens_output += synth_out
|
||||
|
||||
trace.log_step(
|
||||
"complete",
|
||||
|
|
@ -368,30 +352,6 @@ class WebResearcher:
|
|||
wall_time_sec=result.cost_metadata.wall_time_sec,
|
||||
budget_exhausted=result.cost_metadata.budget_exhausted,
|
||||
)
|
||||
|
||||
# Append to the operational cost ledger. Construct on first use
|
||||
# so test injection (cost_ledger=...) and the env override
|
||||
# (MARCHWARDEN_COST_LEDGER) both work without forcing every
|
||||
# caller to build a CostLedger explicitly.
|
||||
try:
|
||||
ledger = self.cost_ledger or CostLedger()
|
||||
ledger.record(
|
||||
trace_id=result.trace_id,
|
||||
question=question,
|
||||
model_id=self.model_id,
|
||||
tokens_used=result.cost_metadata.tokens_used,
|
||||
tokens_input=tokens_input,
|
||||
tokens_output=tokens_output,
|
||||
iterations_run=result.cost_metadata.iterations_run,
|
||||
wall_time_sec=result.cost_metadata.wall_time_sec,
|
||||
tavily_searches=tavily_searches,
|
||||
budget_exhausted=result.cost_metadata.budget_exhausted,
|
||||
confidence=result.confidence,
|
||||
)
|
||||
except Exception as ledger_err:
|
||||
# Never let a ledger failure poison a successful research call.
|
||||
log.warning("cost_ledger_write_failed", error=str(ledger_err))
|
||||
|
||||
structlog.contextvars.clear_contextvars()
|
||||
|
||||
return result
|
||||
|
|
@ -497,12 +457,8 @@ class WebResearcher:
|
|||
iterations: int,
|
||||
start_time: float,
|
||||
budget_exhausted: bool,
|
||||
) -> tuple[ResearchResult, int, int]:
|
||||
"""Ask the LLM to synthesize evidence into a ResearchResult.
|
||||
|
||||
Returns ``(result, synthesis_input_tokens, synthesis_output_tokens)``
|
||||
so the caller can track per-call token splits for cost estimation.
|
||||
"""
|
||||
) -> ResearchResult:
|
||||
"""Ask the LLM to synthesize evidence into a ResearchResult."""
|
||||
|
||||
# Format evidence for the synthesis prompt
|
||||
evidence_text = ""
|
||||
|
|
@ -536,9 +492,7 @@ class WebResearcher:
|
|||
messages=[{"role": "user", "content": prompt}],
|
||||
)
|
||||
|
||||
synth_in = response.usage.input_tokens
|
||||
synth_out = response.usage.output_tokens
|
||||
total_tokens += synth_in + synth_out
|
||||
total_tokens += response.usage.input_tokens + response.usage.output_tokens
|
||||
wall_time = time.time() - start_time
|
||||
|
||||
# Parse the JSON response
|
||||
|
|
@ -563,13 +517,9 @@ class WebResearcher:
|
|||
parse_error=str(parse_err),
|
||||
raw_response=raw_text,
|
||||
)
|
||||
return (
|
||||
self._fallback_result(
|
||||
return self._fallback_result(
|
||||
question, evidence, trace, total_tokens, iterations,
|
||||
wall_time, budget_exhausted,
|
||||
),
|
||||
synth_in,
|
||||
synth_out,
|
||||
)
|
||||
|
||||
trace.log_step(
|
||||
|
|
@ -631,8 +581,7 @@ class WebResearcher:
|
|||
recency=cf.get("recency"),
|
||||
)
|
||||
|
||||
return (
|
||||
ResearchResult(
|
||||
return ResearchResult(
|
||||
answer=data.get("answer", "No answer could be synthesized."),
|
||||
citations=citations,
|
||||
gaps=gaps,
|
||||
|
|
@ -648,22 +597,15 @@ class WebResearcher:
|
|||
model_id=self.model_id,
|
||||
),
|
||||
trace_id=trace.trace_id,
|
||||
),
|
||||
synth_in,
|
||||
synth_out,
|
||||
)
|
||||
except Exception as e:
|
||||
trace.log_step(
|
||||
"synthesis_build_error",
|
||||
decision=f"Failed to build ResearchResult: {e}",
|
||||
)
|
||||
return (
|
||||
self._fallback_result(
|
||||
return self._fallback_result(
|
||||
question, evidence, trace, total_tokens, iterations,
|
||||
wall_time, budget_exhausted,
|
||||
),
|
||||
synth_in,
|
||||
synth_out,
|
||||
)
|
||||
|
||||
def _fallback_result(
|
||||
|
|
|
|||
|
|
@ -1,179 +0,0 @@
|
|||
"""Tests for the obs.costs cost ledger and price table."""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from obs.costs import (
|
||||
DEFAULT_PRICES_PATH,
|
||||
SEED_PRICES_TOML,
|
||||
CostLedger,
|
||||
PriceTable,
|
||||
)
|
||||
|
||||
|
||||
class TestPriceTable:
|
||||
def test_seeds_missing_file(self, tmp_path):
|
||||
prices_path = tmp_path / "prices.toml"
|
||||
assert not prices_path.exists()
|
||||
|
||||
table = PriceTable(path=str(prices_path))
|
||||
|
||||
assert prices_path.exists()
|
||||
assert "claude-sonnet-4-6" in prices_path.read_text()
|
||||
# Loaded into memory
|
||||
assert table._data["models"]["claude-sonnet-4-6"]["input_per_mtok_usd"] == 3.00
|
||||
|
||||
def test_does_not_overwrite_existing_file(self, tmp_path):
|
||||
prices_path = tmp_path / "prices.toml"
|
||||
prices_path.write_text(
|
||||
'[models."custom-model"]\n'
|
||||
'input_per_mtok_usd = 1.23\n'
|
||||
'output_per_mtok_usd = 4.56\n'
|
||||
)
|
||||
table = PriceTable(path=str(prices_path))
|
||||
assert table._data["models"]["custom-model"]["input_per_mtok_usd"] == 1.23
|
||||
assert "claude-sonnet-4-6" not in table._data.get("models", {})
|
||||
|
||||
def test_estimates_known_model(self, tmp_path):
|
||||
table = PriceTable(path=str(tmp_path / "prices.toml"))
|
||||
# 1M input @ $3 + 1M output @ $15 = $18, no tavily
|
||||
cost = table.estimate_call_usd(
|
||||
model_id="claude-sonnet-4-6",
|
||||
tokens_input=1_000_000,
|
||||
tokens_output=1_000_000,
|
||||
tavily_searches=0,
|
||||
)
|
||||
assert cost == 18.00
|
||||
|
||||
def test_estimates_with_tavily(self, tmp_path):
|
||||
table = PriceTable(path=str(tmp_path / "prices.toml"))
|
||||
cost = table.estimate_call_usd(
|
||||
model_id="claude-sonnet-4-6",
|
||||
tokens_input=0,
|
||||
tokens_output=0,
|
||||
tavily_searches=10,
|
||||
)
|
||||
# 10 * $0.005 = $0.05
|
||||
assert cost == 0.05
|
||||
|
||||
def test_unknown_model_returns_none(self, tmp_path):
|
||||
table = PriceTable(path=str(tmp_path / "prices.toml"))
|
||||
cost = table.estimate_call_usd(
|
||||
model_id="some-future-model",
|
||||
tokens_input=1000,
|
||||
tokens_output=1000,
|
||||
tavily_searches=0,
|
||||
)
|
||||
assert cost is None
|
||||
|
||||
|
||||
class TestCostLedger:
|
||||
def _ledger(self, tmp_path):
|
||||
return CostLedger(
|
||||
ledger_path=str(tmp_path / "costs.jsonl"),
|
||||
price_table=PriceTable(path=str(tmp_path / "prices.toml")),
|
||||
)
|
||||
|
||||
def test_record_writes_jsonl(self, tmp_path):
|
||||
ledger = self._ledger(tmp_path)
|
||||
entry = ledger.record(
|
||||
trace_id="abc-123",
|
||||
question="What grows in Utah?",
|
||||
model_id="claude-sonnet-4-6",
|
||||
tokens_used=10_000,
|
||||
tokens_input=8_000,
|
||||
tokens_output=2_000,
|
||||
iterations_run=3,
|
||||
wall_time_sec=42.5,
|
||||
tavily_searches=4,
|
||||
budget_exhausted=False,
|
||||
confidence=0.9,
|
||||
)
|
||||
|
||||
# File contains one JSON line
|
||||
lines = (tmp_path / "costs.jsonl").read_text().strip().splitlines()
|
||||
assert len(lines) == 1
|
||||
on_disk = json.loads(lines[0])
|
||||
assert on_disk == entry
|
||||
|
||||
# All required fields present and shaped correctly
|
||||
assert on_disk["trace_id"] == "abc-123"
|
||||
assert on_disk["question"] == "What grows in Utah?"
|
||||
assert on_disk["model_id"] == "claude-sonnet-4-6"
|
||||
assert on_disk["tokens_used"] == 10_000
|
||||
assert on_disk["tokens_input"] == 8_000
|
||||
assert on_disk["tokens_output"] == 2_000
|
||||
assert on_disk["iterations_run"] == 3
|
||||
assert on_disk["wall_time_sec"] == 42.5
|
||||
assert on_disk["tavily_searches"] == 4
|
||||
assert on_disk["budget_exhausted"] is False
|
||||
assert on_disk["confidence"] == 0.9
|
||||
assert "timestamp" in on_disk
|
||||
# 8000 input @ $3/Mtok + 2000 output @ $15/Mtok + 4 * $0.005 = $0.074
|
||||
assert on_disk["estimated_cost_usd"] == pytest.approx(0.074, abs=1e-6)
|
||||
|
||||
def test_record_appends(self, tmp_path):
|
||||
ledger = self._ledger(tmp_path)
|
||||
for i in range(3):
|
||||
ledger.record(
|
||||
trace_id=f"trace-{i}",
|
||||
question=f"q{i}",
|
||||
model_id="claude-sonnet-4-6",
|
||||
tokens_used=100,
|
||||
tokens_input=80,
|
||||
tokens_output=20,
|
||||
iterations_run=1,
|
||||
wall_time_sec=1.0,
|
||||
tavily_searches=0,
|
||||
budget_exhausted=False,
|
||||
confidence=0.5,
|
||||
)
|
||||
lines = (tmp_path / "costs.jsonl").read_text().strip().splitlines()
|
||||
assert len(lines) == 3
|
||||
assert json.loads(lines[0])["trace_id"] == "trace-0"
|
||||
assert json.loads(lines[2])["trace_id"] == "trace-2"
|
||||
|
||||
def test_unknown_model_records_null_cost(self, tmp_path):
|
||||
ledger = self._ledger(tmp_path)
|
||||
entry = ledger.record(
|
||||
trace_id="abc",
|
||||
question="q",
|
||||
model_id="some-future-model",
|
||||
tokens_used=1000,
|
||||
tokens_input=500,
|
||||
tokens_output=500,
|
||||
iterations_run=1,
|
||||
wall_time_sec=1.0,
|
||||
tavily_searches=0,
|
||||
budget_exhausted=False,
|
||||
confidence=0.5,
|
||||
)
|
||||
assert entry["estimated_cost_usd"] is None
|
||||
|
||||
def test_question_is_truncated(self, tmp_path):
|
||||
ledger = self._ledger(tmp_path)
|
||||
long_q = "x" * 1000
|
||||
entry = ledger.record(
|
||||
trace_id="abc",
|
||||
question=long_q,
|
||||
model_id="claude-sonnet-4-6",
|
||||
tokens_used=10,
|
||||
tokens_input=5,
|
||||
tokens_output=5,
|
||||
iterations_run=1,
|
||||
wall_time_sec=0.1,
|
||||
tavily_searches=0,
|
||||
budget_exhausted=False,
|
||||
confidence=0.5,
|
||||
)
|
||||
assert len(entry["question"]) == 200
|
||||
|
||||
def test_env_var_override(self, tmp_path, monkeypatch):
|
||||
custom = tmp_path / "custom-ledger.jsonl"
|
||||
monkeypatch.setenv("MARCHWARDEN_COST_LEDGER", str(custom))
|
||||
ledger = CostLedger(
|
||||
price_table=PriceTable(path=str(tmp_path / "prices.toml")),
|
||||
)
|
||||
assert ledger.path == custom
|
||||
Loading…
Reference in a new issue