M2.5.3: marchwarden costs CLI command #29

Merged
archeious merged 1 commit from feat/costs-command into main 2026-04-08 21:59:08 +00:00
3 changed files with 352 additions and 1 deletions

View file

@ -13,6 +13,7 @@ RUN pip install --upgrade pip
# Copy the project and install editable with dev extras.
COPY cli ./cli
COPY obs ./obs
COPY researchers ./researchers
COPY orchestrator ./orchestrator
COPY tests ./tests

View file

@ -7,7 +7,10 @@ ResearchResult contracts to the terminal.
import asyncio
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
@ -20,6 +23,7 @@ from rich.table import Table
from rich.text import Text
from obs import configure_logging, get_logger
from obs.costs import DEFAULT_LEDGER_PATH
from researchers.web.models import ResearchResult
@ -298,5 +302,215 @@ def replay(trace_id: str, trace_dir: Optional[str]) -> None:
render_trace(entries, trace_id, console)
# ---------------------------------------------------------------------------
# costs command
# ---------------------------------------------------------------------------
_RELATIVE_RE = re.compile(r"^(\d+)([dwhm])$")
def _parse_when(value: str) -> datetime:
"""Parse an ISO date or a relative shorthand like '7d', '24h'."""
m = _RELATIVE_RE.match(value)
if m:
n = int(m.group(1))
unit = m.group(2)
delta = {
"h": timedelta(hours=n),
"d": timedelta(days=n),
"w": timedelta(weeks=n),
"m": timedelta(days=30 * n),
}[unit]
return datetime.now(timezone.utc) - delta
# Otherwise treat as ISO date / datetime
dt = datetime.fromisoformat(value)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
def _load_ledger(path: Path) -> list[dict]:
if not path.exists():
return []
entries: list[dict] = []
with open(path, "r", encoding="utf-8") as f:
for lineno, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
# Skip a corrupt line rather than blow up the whole report
continue
return entries
def _filter_entries(
entries: list[dict],
since: Optional[datetime],
until: Optional[datetime],
model: Optional[str],
) -> list[dict]:
out = []
for e in entries:
ts_str = e.get("timestamp", "")
try:
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
except ValueError:
continue
if since and ts < since:
continue
if until and ts > until:
continue
if model and e.get("model_id") != model:
continue
out.append(e)
return out
def render_costs(entries: list[dict], console: Console) -> None:
"""Render a cost summary from filtered ledger entries."""
if not entries:
console.print("[dim]No cost data yet.[/dim]")
return
total_calls = len(entries)
total_tokens = sum(e.get("tokens_used", 0) for e in entries)
total_input = sum(e.get("tokens_input") or 0 for e in entries)
total_output = sum(e.get("tokens_output") or 0 for e in entries)
total_tavily = sum(e.get("tavily_searches", 0) for e in entries)
total_spend = sum(
e.get("estimated_cost_usd") or 0.0 for e in entries
)
unknown_cost_calls = sum(
1 for e in entries if e.get("estimated_cost_usd") is None
)
# Summary panel
summary = Text()
summary.append(f"Calls: {total_calls}\n", style="bold")
summary.append(f"Total spend: ${total_spend:.4f}\n", style="bold green")
summary.append(f"Total tokens: {total_tokens:,} ")
summary.append(f"(in {total_input:,} / out {total_output:,})\n", style="dim")
summary.append(f"Tavily searches: {total_tavily}\n")
if unknown_cost_calls:
summary.append(
f"Calls with unknown model price: {unknown_cost_calls}\n",
style="yellow",
)
console.print(Panel(summary, title="Cost Summary", border_style="green"))
# Per-day breakdown
per_day: dict[str, dict] = defaultdict(lambda: {"calls": 0, "tokens": 0, "spend": 0.0})
for e in entries:
day = e.get("timestamp", "")[:10]
per_day[day]["calls"] += 1
per_day[day]["tokens"] += e.get("tokens_used", 0)
per_day[day]["spend"] += e.get("estimated_cost_usd") or 0.0
day_table = Table(title="Per Day", show_lines=False, expand=True)
day_table.add_column("Date", style="dim")
day_table.add_column("Calls", justify="right")
day_table.add_column("Tokens", justify="right")
day_table.add_column("Spend (USD)", justify="right", style="green")
for day in sorted(per_day.keys()):
d = per_day[day]
day_table.add_row(
day, str(d["calls"]), f"{d['tokens']:,}", f"${d['spend']:.4f}"
)
console.print(day_table)
# Per-model breakdown
per_model: dict[str, dict] = defaultdict(
lambda: {"calls": 0, "tokens": 0, "spend": 0.0}
)
for e in entries:
m = e.get("model_id", "(unknown)")
per_model[m]["calls"] += 1
per_model[m]["tokens"] += e.get("tokens_used", 0)
per_model[m]["spend"] += e.get("estimated_cost_usd") or 0.0
model_table = Table(title="Per Model", show_lines=False, expand=True)
model_table.add_column("Model")
model_table.add_column("Calls", justify="right")
model_table.add_column("Tokens", justify="right")
model_table.add_column("Spend (USD)", justify="right", style="green")
for m in sorted(per_model.keys()):
d = per_model[m]
model_table.add_row(
m, str(d["calls"]), f"{d['tokens']:,}", f"${d['spend']:.4f}"
)
console.print(model_table)
# Highest-cost call
costed = [e for e in entries if e.get("estimated_cost_usd") is not None]
if costed:
top = max(costed, key=lambda e: e["estimated_cost_usd"])
top_text = Text()
top_text.append(f"trace_id: {top.get('trace_id', '?')}\n")
top_text.append(f"question: {top.get('question', '')[:120]}\n")
top_text.append(f"model: {top.get('model_id', '?')}\n")
top_text.append(f"tokens: {top.get('tokens_used', 0):,}\n")
top_text.append(
f"spend: ${top.get('estimated_cost_usd', 0):.4f}\n",
style="bold green",
)
console.print(
Panel(top_text, title="Highest-Cost Call", border_style="yellow")
)
@cli.command()
@click.option(
"--since",
default=None,
help="Filter by start time. ISO date or relative (e.g. 7d, 24h, 2w).",
)
@click.option(
"--until",
default=None,
help="Filter by end time. ISO date or relative.",
)
@click.option(
"--model",
default=None,
help="Filter to a specific model_id.",
)
@click.option(
"--json",
"as_json",
is_flag=True,
default=False,
help="Emit raw filtered ledger entries as JSON instead of the table.",
)
@click.option(
"--ledger",
default=None,
help=f"Override ledger path (default: {DEFAULT_LEDGER_PATH}).",
)
def costs(
since: Optional[str],
until: Optional[str],
model: Optional[str],
as_json: bool,
ledger: Optional[str],
) -> None:
"""Show cost summary from the research ledger."""
console = Console()
path = Path(os.path.expanduser(ledger or DEFAULT_LEDGER_PATH))
entries = _load_ledger(path)
since_dt = _parse_when(since) if since else None
until_dt = _parse_when(until) if until else None
filtered = _filter_entries(entries, since_dt, until_dt, model)
if as_json:
for e in filtered:
click.echo(json.dumps(e))
return
render_costs(filtered, console)
if __name__ == "__main__":
cli()

View file

@ -4,7 +4,7 @@ from unittest.mock import patch
from click.testing import CliRunner
from cli.main import cli, render_result, render_trace
from cli.main import cli, render_costs, render_result, render_trace
from researchers.web.models import (
Citation,
ConfidenceFactors,
@ -186,3 +186,139 @@ class TestReplayCommand:
out = console.export_text()
assert "empty-trace" in out
assert "empty" in out.lower()
# ---------------------------------------------------------------------------
# costs command
# ---------------------------------------------------------------------------
import json as _json
def _write_ledger(path, entries):
path.write_text("\n".join(_json.dumps(e) for e in entries) + "\n")
def _ledger_fixture(tmp_path):
path = tmp_path / "costs.jsonl"
entries = [
{
"timestamp": "2026-04-06T10:00:00Z",
"trace_id": "t1",
"question": "What is X?",
"model_id": "claude-sonnet-4-6",
"tokens_used": 1000,
"tokens_input": 800,
"tokens_output": 200,
"iterations_run": 1,
"wall_time_sec": 5.0,
"tavily_searches": 1,
"estimated_cost_usd": 0.005,
"budget_exhausted": False,
"confidence": 0.9,
},
{
"timestamp": "2026-04-07T11:00:00Z",
"trace_id": "t2",
"question": "Bigger query",
"model_id": "claude-opus-4-6",
"tokens_used": 50000,
"tokens_input": 40000,
"tokens_output": 10000,
"iterations_run": 5,
"wall_time_sec": 120.0,
"tavily_searches": 8,
"estimated_cost_usd": 1.25,
"budget_exhausted": True,
"confidence": 0.7,
},
{
"timestamp": "2026-04-08T12:00:00Z",
"trace_id": "t3",
"question": "Unknown model run",
"model_id": "future-model-7",
"tokens_used": 500,
"tokens_input": 400,
"tokens_output": 100,
"iterations_run": 1,
"wall_time_sec": 2.0,
"tavily_searches": 0,
"estimated_cost_usd": None,
"budget_exhausted": False,
"confidence": 0.5,
},
]
_write_ledger(path, entries)
return path
class TestCostsCommand:
def test_renders_summary(self, tmp_path):
path = _ledger_fixture(tmp_path)
runner = CliRunner()
result = runner.invoke(cli, ["costs", "--ledger", str(path)])
assert result.exit_code == 0, result.output
# Summary
assert "Calls: 3" in result.output
assert "$1.2550" in result.output
# Per-day rows
assert "2026-04-06" in result.output
assert "2026-04-07" in result.output
assert "2026-04-08" in result.output
# Per-model rows
assert "claude-sonnet-4-6" in result.output
assert "claude-opus-4-6" in result.output
# Highest-cost panel
assert "t2" in result.output
# Unknown model warning
assert "unknown model price" in result.output
def test_filter_by_model(self, tmp_path):
path = _ledger_fixture(tmp_path)
runner = CliRunner()
result = runner.invoke(
cli,
["costs", "--ledger", str(path), "--model", "claude-opus-4-6"],
)
assert result.exit_code == 0
assert "Calls: 1" in result.output
assert "claude-sonnet-4-6" not in result.output
def test_filter_by_since_iso(self, tmp_path):
path = _ledger_fixture(tmp_path)
runner = CliRunner()
result = runner.invoke(
cli,
["costs", "--ledger", str(path), "--since", "2026-04-08"],
)
assert result.exit_code == 0
assert "Calls: 1" in result.output
assert "future-model-7" in result.output
assert "claude-sonnet-4-6" not in result.output
def test_json_output(self, tmp_path):
path = _ledger_fixture(tmp_path)
runner = CliRunner()
result = runner.invoke(
cli,
["costs", "--ledger", str(path), "--json"],
)
assert result.exit_code == 0
lines = [l for l in result.output.strip().splitlines() if l]
assert len(lines) == 3
first = _json.loads(lines[0])
assert first["trace_id"] == "t1"
def test_empty_ledger(self, tmp_path):
path = tmp_path / "missing.jsonl"
runner = CliRunner()
result = runner.invoke(cli, ["costs", "--ledger", str(path)])
assert result.exit_code == 0
assert "No cost data yet" in result.output
def test_render_costs_handles_empty(self):
console = Console(record=True, width=120)
render_costs([], console)
out = console.export_text()
assert "No cost data yet" in out