Adds an append-only JSONL ledger of every research() call at ~/.marchwarden/costs.jsonl, supplementing (not replacing) the per-call cost_metadata field returned to callers. The ledger is the operator-facing source of truth for spend tracking, queryable via the upcoming `marchwarden costs` command (M2.5.3). Fields per entry: timestamp, trace_id, question (truncated 200ch), model_id, tokens_used, tokens_input, tokens_output, iterations_run, wall_time_sec, tavily_searches, estimated_cost_usd, budget_exhausted, confidence. Cost estimation reads ~/.marchwarden/prices.toml, which is auto-created with seed values for current Anthropic + Tavily rates on first run. Operators are expected to update prices.toml manually when upstream rates change — there is no automatic fetching. Existing files are never overwritten. Unknown models log a WARN and record estimated_cost_usd: null instead of crashing. Each ledger write also emits a structured `cost_recorded` log line via the M2.5.1 logger, so cost data ships to OpenSearch alongside the ledger file with no extra plumbing. Tracking changes in agent.py: - Track tokens_input / tokens_output split (not just total) - Count tavily_searches across iterations - _synthesize now returns (result, synth_in, synth_out) so the caller can attribute synthesis tokens to the running counters - Ledger.record() called after research_completed log; failures are caught and warn-logged so a ledger write can never poison a successful research call Tests cover: price table seeding, no-overwrite of existing files, cost estimation for known/unknown models, tavily-only cost, ledger appends, question truncation, env var override. End-to-end verified with a real Anthropic+Tavily call: 9107 input + 1140 output tokens, 1 tavily search, $0.049 estimated. 104/104 tests passing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
179 lines
5.6 KiB
Python
179 lines
5.6 KiB
Python
"""Cost tracking — price table loader and JSONL ledger writer.
|
|
|
|
Supplements (does not replace) the per-call ``cost_metadata`` field
|
|
on ``ResearchResult``. Operators consume this ledger via the
|
|
``marchwarden costs`` command (M2.5.3) for spend tracking.
|
|
|
|
Estimated costs are computed from a TOML price table at
|
|
``~/.marchwarden/prices.toml``, auto-created with seed values on
|
|
first run. Operators are expected to update prices manually when
|
|
upstream rates change — there is no automatic fetching.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
try:
|
|
import tomllib # Python 3.11+
|
|
except ModuleNotFoundError: # pragma: no cover
|
|
import tomli as tomllib # type: ignore[no-redef]
|
|
|
|
from obs import get_logger
|
|
|
|
log = get_logger("marchwarden.costs")
|
|
|
|
|
|
DEFAULT_LEDGER_PATH = "~/.marchwarden/costs.jsonl"
|
|
DEFAULT_PRICES_PATH = "~/.marchwarden/prices.toml"
|
|
|
|
# Seed values current as of 2026-04. Operators should update
|
|
# ~/.marchwarden/prices.toml when upstream rates change.
|
|
SEED_PRICES_TOML = """\
|
|
# Marchwarden price table — used for cost ledger estimation only.
|
|
# Update these values when upstream pricing changes. Marchwarden does
|
|
# not fetch prices automatically.
|
|
#
|
|
# input_per_mtok_usd = USD per 1,000,000 input tokens
|
|
# output_per_mtok_usd = USD per 1,000,000 output tokens
|
|
|
|
[models."claude-sonnet-4-6"]
|
|
input_per_mtok_usd = 3.00
|
|
output_per_mtok_usd = 15.00
|
|
|
|
[models."claude-opus-4-6"]
|
|
input_per_mtok_usd = 15.00
|
|
output_per_mtok_usd = 75.00
|
|
|
|
[models."claude-haiku-4-5-20251001"]
|
|
input_per_mtok_usd = 1.00
|
|
output_per_mtok_usd = 5.00
|
|
|
|
[tavily]
|
|
# Estimated post-free-tier per-search rate. Free tier covers the first
|
|
# 1000 searches per month at no cost.
|
|
per_search_usd = 0.005
|
|
"""
|
|
|
|
|
|
class PriceTable:
|
|
"""Loads and queries the price table at ~/.marchwarden/prices.toml."""
|
|
|
|
def __init__(self, path: Optional[str] = None):
|
|
self.path = Path(os.path.expanduser(path or DEFAULT_PRICES_PATH))
|
|
self._data: dict = {}
|
|
self._ensure_file()
|
|
self._load()
|
|
|
|
def _ensure_file(self) -> None:
|
|
if self.path.exists():
|
|
return
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
self.path.write_text(SEED_PRICES_TOML, encoding="utf-8")
|
|
log.info("price_table_seeded", path=str(self.path))
|
|
|
|
def _load(self) -> None:
|
|
with open(self.path, "rb") as f:
|
|
self._data = tomllib.load(f)
|
|
|
|
def estimate_call_usd(
|
|
self,
|
|
model_id: str,
|
|
tokens_input: Optional[int],
|
|
tokens_output: Optional[int],
|
|
tavily_searches: int,
|
|
) -> Optional[float]:
|
|
"""Estimate USD cost for a single research call.
|
|
|
|
Returns None if the model is unknown — caller should record
|
|
``estimated_cost_usd: null`` in the ledger and the operator
|
|
is expected to update prices.toml.
|
|
"""
|
|
models = self._data.get("models", {})
|
|
model_prices = models.get(model_id)
|
|
if not model_prices:
|
|
log.warning(
|
|
"unknown_model_for_pricing",
|
|
model_id=model_id,
|
|
hint=f"add a [models.\"{model_id}\"] section to {self.path}",
|
|
)
|
|
return None
|
|
|
|
in_tok = tokens_input or 0
|
|
out_tok = tokens_output or 0
|
|
|
|
input_cost = (in_tok / 1_000_000) * model_prices.get("input_per_mtok_usd", 0.0)
|
|
output_cost = (out_tok / 1_000_000) * model_prices.get("output_per_mtok_usd", 0.0)
|
|
|
|
tavily = self._data.get("tavily", {})
|
|
tavily_cost = tavily_searches * tavily.get("per_search_usd", 0.0)
|
|
|
|
return round(input_cost + output_cost + tavily_cost, 6)
|
|
|
|
|
|
class CostLedger:
|
|
"""Append-only JSONL ledger of completed research calls."""
|
|
|
|
def __init__(
|
|
self,
|
|
ledger_path: Optional[str] = None,
|
|
price_table: Optional[PriceTable] = None,
|
|
):
|
|
env_path = os.environ.get("MARCHWARDEN_COST_LEDGER")
|
|
self.path = Path(
|
|
os.path.expanduser(ledger_path or env_path or DEFAULT_LEDGER_PATH)
|
|
)
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
self.price_table = price_table or PriceTable()
|
|
|
|
def record(
|
|
self,
|
|
*,
|
|
trace_id: str,
|
|
question: str,
|
|
model_id: str,
|
|
tokens_used: int,
|
|
tokens_input: Optional[int],
|
|
tokens_output: Optional[int],
|
|
iterations_run: int,
|
|
wall_time_sec: float,
|
|
tavily_searches: int,
|
|
budget_exhausted: bool,
|
|
confidence: float,
|
|
) -> dict:
|
|
"""Append one entry to the ledger and emit a structured log line.
|
|
|
|
Returns the entry as a dict (useful for tests and the log call).
|
|
"""
|
|
estimated_cost_usd = self.price_table.estimate_call_usd(
|
|
model_id=model_id,
|
|
tokens_input=tokens_input,
|
|
tokens_output=tokens_output,
|
|
tavily_searches=tavily_searches,
|
|
)
|
|
|
|
entry = {
|
|
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
|
"trace_id": trace_id,
|
|
"question": question[:200],
|
|
"model_id": model_id,
|
|
"tokens_used": tokens_used,
|
|
"tokens_input": tokens_input,
|
|
"tokens_output": tokens_output,
|
|
"iterations_run": iterations_run,
|
|
"wall_time_sec": round(wall_time_sec, 3),
|
|
"tavily_searches": tavily_searches,
|
|
"estimated_cost_usd": estimated_cost_usd,
|
|
"budget_exhausted": budget_exhausted,
|
|
"confidence": confidence,
|
|
}
|
|
|
|
with open(self.path, "a", encoding="utf-8") as f:
|
|
f.write(json.dumps(entry) + "\n")
|
|
|
|
log.info("cost_recorded", **entry)
|
|
return entry
|