diff --git a/Dockerfile b/Dockerfile index 19c00f6..8c99528 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,6 +13,7 @@ RUN pip install --upgrade pip # Copy the project and install editable with dev extras. COPY cli ./cli +COPY obs ./obs COPY researchers ./researchers COPY orchestrator ./orchestrator COPY tests ./tests diff --git a/cli/main.py b/cli/main.py index aa135c6..0a0443e 100644 --- a/cli/main.py +++ b/cli/main.py @@ -7,7 +7,10 @@ ResearchResult contracts to the terminal. import asyncio import json import os +import re import sys +from collections import defaultdict +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Optional @@ -20,6 +23,7 @@ from rich.table import Table from rich.text import Text from obs import configure_logging, get_logger +from obs.costs import DEFAULT_LEDGER_PATH from researchers.web.models import ResearchResult @@ -298,5 +302,215 @@ def replay(trace_id: str, trace_dir: Optional[str]) -> None: render_trace(entries, trace_id, console) +# --------------------------------------------------------------------------- +# costs command +# --------------------------------------------------------------------------- + + +_RELATIVE_RE = re.compile(r"^(\d+)([dwhm])$") + + +def _parse_when(value: str) -> datetime: + """Parse an ISO date or a relative shorthand like '7d', '24h'.""" + m = _RELATIVE_RE.match(value) + if m: + n = int(m.group(1)) + unit = m.group(2) + delta = { + "h": timedelta(hours=n), + "d": timedelta(days=n), + "w": timedelta(weeks=n), + "m": timedelta(days=30 * n), + }[unit] + return datetime.now(timezone.utc) - delta + # Otherwise treat as ISO date / datetime + dt = datetime.fromisoformat(value) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + + +def _load_ledger(path: Path) -> list[dict]: + if not path.exists(): + return [] + entries: list[dict] = [] + with open(path, "r", encoding="utf-8") as f: + for lineno, line in enumerate(f, 1): + line = line.strip() + if not line: + continue + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + # Skip a corrupt line rather than blow up the whole report + continue + return entries + + +def _filter_entries( + entries: list[dict], + since: Optional[datetime], + until: Optional[datetime], + model: Optional[str], +) -> list[dict]: + out = [] + for e in entries: + ts_str = e.get("timestamp", "") + try: + ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) + except ValueError: + continue + if since and ts < since: + continue + if until and ts > until: + continue + if model and e.get("model_id") != model: + continue + out.append(e) + return out + + +def render_costs(entries: list[dict], console: Console) -> None: + """Render a cost summary from filtered ledger entries.""" + if not entries: + console.print("[dim]No cost data yet.[/dim]") + return + + total_calls = len(entries) + total_tokens = sum(e.get("tokens_used", 0) for e in entries) + total_input = sum(e.get("tokens_input") or 0 for e in entries) + total_output = sum(e.get("tokens_output") or 0 for e in entries) + total_tavily = sum(e.get("tavily_searches", 0) for e in entries) + total_spend = sum( + e.get("estimated_cost_usd") or 0.0 for e in entries + ) + unknown_cost_calls = sum( + 1 for e in entries if e.get("estimated_cost_usd") is None + ) + + # Summary panel + summary = Text() + summary.append(f"Calls: {total_calls}\n", style="bold") + summary.append(f"Total spend: ${total_spend:.4f}\n", style="bold green") + summary.append(f"Total tokens: {total_tokens:,} ") + summary.append(f"(in {total_input:,} / out {total_output:,})\n", style="dim") + summary.append(f"Tavily searches: {total_tavily}\n") + if unknown_cost_calls: + summary.append( + f"Calls with unknown model price: {unknown_cost_calls}\n", + style="yellow", + ) + console.print(Panel(summary, title="Cost Summary", border_style="green")) + + # Per-day breakdown + per_day: dict[str, dict] = defaultdict(lambda: {"calls": 0, "tokens": 0, "spend": 0.0}) + for e in entries: + day = e.get("timestamp", "")[:10] + per_day[day]["calls"] += 1 + per_day[day]["tokens"] += e.get("tokens_used", 0) + per_day[day]["spend"] += e.get("estimated_cost_usd") or 0.0 + day_table = Table(title="Per Day", show_lines=False, expand=True) + day_table.add_column("Date", style="dim") + day_table.add_column("Calls", justify="right") + day_table.add_column("Tokens", justify="right") + day_table.add_column("Spend (USD)", justify="right", style="green") + for day in sorted(per_day.keys()): + d = per_day[day] + day_table.add_row( + day, str(d["calls"]), f"{d['tokens']:,}", f"${d['spend']:.4f}" + ) + console.print(day_table) + + # Per-model breakdown + per_model: dict[str, dict] = defaultdict( + lambda: {"calls": 0, "tokens": 0, "spend": 0.0} + ) + for e in entries: + m = e.get("model_id", "(unknown)") + per_model[m]["calls"] += 1 + per_model[m]["tokens"] += e.get("tokens_used", 0) + per_model[m]["spend"] += e.get("estimated_cost_usd") or 0.0 + model_table = Table(title="Per Model", show_lines=False, expand=True) + model_table.add_column("Model") + model_table.add_column("Calls", justify="right") + model_table.add_column("Tokens", justify="right") + model_table.add_column("Spend (USD)", justify="right", style="green") + for m in sorted(per_model.keys()): + d = per_model[m] + model_table.add_row( + m, str(d["calls"]), f"{d['tokens']:,}", f"${d['spend']:.4f}" + ) + console.print(model_table) + + # Highest-cost call + costed = [e for e in entries if e.get("estimated_cost_usd") is not None] + if costed: + top = max(costed, key=lambda e: e["estimated_cost_usd"]) + top_text = Text() + top_text.append(f"trace_id: {top.get('trace_id', '?')}\n") + top_text.append(f"question: {top.get('question', '')[:120]}\n") + top_text.append(f"model: {top.get('model_id', '?')}\n") + top_text.append(f"tokens: {top.get('tokens_used', 0):,}\n") + top_text.append( + f"spend: ${top.get('estimated_cost_usd', 0):.4f}\n", + style="bold green", + ) + console.print( + Panel(top_text, title="Highest-Cost Call", border_style="yellow") + ) + + +@cli.command() +@click.option( + "--since", + default=None, + help="Filter by start time. ISO date or relative (e.g. 7d, 24h, 2w).", +) +@click.option( + "--until", + default=None, + help="Filter by end time. ISO date or relative.", +) +@click.option( + "--model", + default=None, + help="Filter to a specific model_id.", +) +@click.option( + "--json", + "as_json", + is_flag=True, + default=False, + help="Emit raw filtered ledger entries as JSON instead of the table.", +) +@click.option( + "--ledger", + default=None, + help=f"Override ledger path (default: {DEFAULT_LEDGER_PATH}).", +) +def costs( + since: Optional[str], + until: Optional[str], + model: Optional[str], + as_json: bool, + ledger: Optional[str], +) -> None: + """Show cost summary from the research ledger.""" + console = Console() + path = Path(os.path.expanduser(ledger or DEFAULT_LEDGER_PATH)) + entries = _load_ledger(path) + + since_dt = _parse_when(since) if since else None + until_dt = _parse_when(until) if until else None + filtered = _filter_entries(entries, since_dt, until_dt, model) + + if as_json: + for e in filtered: + click.echo(json.dumps(e)) + return + + render_costs(filtered, console) + + if __name__ == "__main__": cli() diff --git a/tests/test_cli.py b/tests/test_cli.py index 41c5b79..6ba2c6b 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -4,7 +4,7 @@ from unittest.mock import patch from click.testing import CliRunner -from cli.main import cli, render_result, render_trace +from cli.main import cli, render_costs, render_result, render_trace from researchers.web.models import ( Citation, ConfidenceFactors, @@ -186,3 +186,139 @@ class TestReplayCommand: out = console.export_text() assert "empty-trace" in out assert "empty" in out.lower() + + +# --------------------------------------------------------------------------- +# costs command +# --------------------------------------------------------------------------- + + +import json as _json + + +def _write_ledger(path, entries): + path.write_text("\n".join(_json.dumps(e) for e in entries) + "\n") + + +def _ledger_fixture(tmp_path): + path = tmp_path / "costs.jsonl" + entries = [ + { + "timestamp": "2026-04-06T10:00:00Z", + "trace_id": "t1", + "question": "What is X?", + "model_id": "claude-sonnet-4-6", + "tokens_used": 1000, + "tokens_input": 800, + "tokens_output": 200, + "iterations_run": 1, + "wall_time_sec": 5.0, + "tavily_searches": 1, + "estimated_cost_usd": 0.005, + "budget_exhausted": False, + "confidence": 0.9, + }, + { + "timestamp": "2026-04-07T11:00:00Z", + "trace_id": "t2", + "question": "Bigger query", + "model_id": "claude-opus-4-6", + "tokens_used": 50000, + "tokens_input": 40000, + "tokens_output": 10000, + "iterations_run": 5, + "wall_time_sec": 120.0, + "tavily_searches": 8, + "estimated_cost_usd": 1.25, + "budget_exhausted": True, + "confidence": 0.7, + }, + { + "timestamp": "2026-04-08T12:00:00Z", + "trace_id": "t3", + "question": "Unknown model run", + "model_id": "future-model-7", + "tokens_used": 500, + "tokens_input": 400, + "tokens_output": 100, + "iterations_run": 1, + "wall_time_sec": 2.0, + "tavily_searches": 0, + "estimated_cost_usd": None, + "budget_exhausted": False, + "confidence": 0.5, + }, + ] + _write_ledger(path, entries) + return path + + +class TestCostsCommand: + def test_renders_summary(self, tmp_path): + path = _ledger_fixture(tmp_path) + runner = CliRunner() + result = runner.invoke(cli, ["costs", "--ledger", str(path)]) + assert result.exit_code == 0, result.output + # Summary + assert "Calls: 3" in result.output + assert "$1.2550" in result.output + # Per-day rows + assert "2026-04-06" in result.output + assert "2026-04-07" in result.output + assert "2026-04-08" in result.output + # Per-model rows + assert "claude-sonnet-4-6" in result.output + assert "claude-opus-4-6" in result.output + # Highest-cost panel + assert "t2" in result.output + # Unknown model warning + assert "unknown model price" in result.output + + def test_filter_by_model(self, tmp_path): + path = _ledger_fixture(tmp_path) + runner = CliRunner() + result = runner.invoke( + cli, + ["costs", "--ledger", str(path), "--model", "claude-opus-4-6"], + ) + assert result.exit_code == 0 + assert "Calls: 1" in result.output + assert "claude-sonnet-4-6" not in result.output + + def test_filter_by_since_iso(self, tmp_path): + path = _ledger_fixture(tmp_path) + runner = CliRunner() + result = runner.invoke( + cli, + ["costs", "--ledger", str(path), "--since", "2026-04-08"], + ) + assert result.exit_code == 0 + assert "Calls: 1" in result.output + assert "future-model-7" in result.output + assert "claude-sonnet-4-6" not in result.output + + def test_json_output(self, tmp_path): + path = _ledger_fixture(tmp_path) + runner = CliRunner() + result = runner.invoke( + cli, + ["costs", "--ledger", str(path), "--json"], + ) + assert result.exit_code == 0 + lines = [l for l in result.output.strip().splitlines() if l] + assert len(lines) == 3 + first = _json.loads(lines[0]) + assert first["trace_id"] == "t1" + + def test_empty_ledger(self, tmp_path): + path = tmp_path / "missing.jsonl" + runner = CliRunner() + result = runner.invoke(cli, ["costs", "--ledger", str(path)]) + assert result.exit_code == 0 + assert "No cost data yet" in result.output + + def test_render_costs_handles_empty(self): + console = Console(record=True, width=120) + render_costs([], console) + out = console.export_text() + assert "No cost data yet" in out