marchwarden/cli/main.py
Jeff Smith c0d4f391b6 Display budget as spend status, not exhaustion alarm
Replace 'Budget exhausted: True/False' with 'Budget status: spent /
under cap' in the Confidence panel. The previous wording read as a
failure indicator when in practice 'exhausted' just means the agent
spent its tool-use cap before voluntarily stopping — the normal,
expected outcome on real questions with the default 20k budget.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 16:12:39 -06:00

517 lines
17 KiB
Python

"""Marchwarden CLI shim.
Talks to the web researcher MCP server over stdio and pretty-prints
ResearchResult contracts to the terminal.
"""
import asyncio
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
import click
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from obs import configure_logging, get_logger
from obs.costs import DEFAULT_LEDGER_PATH
from researchers.web.models import ResearchResult
DEFAULT_TRACE_DIR = "~/.marchwarden/traces"
log = get_logger("marchwarden.cli")
# ---------------------------------------------------------------------------
# MCP client
# ---------------------------------------------------------------------------
async def call_research_tool(
question: str,
depth: str,
max_iterations: int,
token_budget: int,
) -> ResearchResult:
"""Spawn the web researcher MCP server and call its `research` tool."""
params = StdioServerParameters(
command=sys.executable,
args=["-m", "researchers.web.server"],
env=os.environ.copy(),
)
async with stdio_client(params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(
"research",
arguments={
"question": question,
"depth": depth,
"max_iterations": max_iterations,
"token_budget": token_budget,
},
)
# FastMCP returns the tool's string return as a TextContent block.
payload = result.content[0].text
return ResearchResult.model_validate_json(payload)
# ---------------------------------------------------------------------------
# Pretty printing
# ---------------------------------------------------------------------------
def render_result(result: ResearchResult, console: Console) -> None:
"""Render a ResearchResult to the console using rich."""
# Answer
console.print(
Panel(
result.answer,
title="[bold cyan]Answer[/bold cyan]",
border_style="cyan",
)
)
# Citations
if result.citations:
table = Table(title="Citations", show_lines=True, expand=True)
table.add_column("#", style="dim", width=3)
table.add_column("Title / Locator", overflow="fold")
table.add_column("Excerpt", overflow="fold")
table.add_column("Conf", justify="right", width=5)
for i, c in enumerate(result.citations, 1):
header = f"[bold]{c.title or c.locator}[/bold]\n[dim]{c.locator}[/dim]"
table.add_row(str(i), header, c.raw_excerpt, f"{c.confidence:.2f}")
console.print(table)
else:
console.print("[dim]No citations.[/dim]")
# Gaps grouped by category
if result.gaps:
gap_table = Table(title="Gaps", show_lines=True, expand=True)
gap_table.add_column("Category", style="yellow")
gap_table.add_column("Topic")
gap_table.add_column("Detail", overflow="fold")
for g in result.gaps:
gap_table.add_row(g.category.value, g.topic, g.detail)
console.print(gap_table)
# Discovery events
if result.discovery_events:
de_table = Table(title="Discovery Events", show_lines=True, expand=True)
de_table.add_column("Type", style="magenta")
de_table.add_column("Suggested Researcher")
de_table.add_column("Query", overflow="fold")
de_table.add_column("Reason", overflow="fold")
for d in result.discovery_events:
de_table.add_row(
d.type, d.suggested_researcher or "-", d.query, d.reason
)
console.print(de_table)
# Open questions
if result.open_questions:
oq_table = Table(title="Open Questions", show_lines=True, expand=True)
oq_table.add_column("Priority", style="green")
oq_table.add_column("Question", overflow="fold")
oq_table.add_column("Context", overflow="fold")
for q in result.open_questions:
oq_table.add_row(q.priority, q.question, q.context)
console.print(oq_table)
# Confidence + factors
cf = result.confidence_factors
conf_text = Text()
conf_text.append(f"Overall: {result.confidence:.2f}\n", style="bold")
conf_text.append(f"Corroborating sources: {cf.num_corroborating_sources}\n")
conf_text.append(f"Source authority: {cf.source_authority}\n")
conf_text.append(f"Contradiction detected: {cf.contradiction_detected}\n")
conf_text.append(f"Query specificity match: {cf.query_specificity_match:.2f}\n")
budget_status = "spent" if cf.budget_exhausted else "under cap"
conf_text.append(f"Budget status: {budget_status}\n")
conf_text.append(f"Recency: {cf.recency or 'unknown'}")
console.print(Panel(conf_text, title="Confidence", border_style="green"))
# Cost
cm = result.cost_metadata
cost_text = Text()
cost_text.append(f"Tokens: {cm.tokens_used}\n")
cost_text.append(f"Iterations: {cm.iterations_run}\n")
cost_text.append(f"Wall time: {cm.wall_time_sec:.2f}s\n")
cost_text.append(f"Model: {cm.model_id}")
console.print(Panel(cost_text, title="Cost", border_style="blue"))
# Trace footer
console.print(f"\n[dim]trace_id: {result.trace_id}[/dim]")
# ---------------------------------------------------------------------------
# Click app
# ---------------------------------------------------------------------------
@click.group()
def cli() -> None:
"""Marchwarden — agentic research CLI."""
configure_logging()
@cli.command()
@click.argument("question")
@click.option(
"--depth",
type=click.Choice(["shallow", "balanced", "deep"]),
default="balanced",
show_default=True,
)
@click.option(
"--budget",
"token_budget",
type=int,
default=20_000,
show_default=True,
help="Token budget for the research loop.",
)
@click.option(
"--max-iterations",
type=int,
default=5,
show_default=True,
)
def ask(
question: str,
depth: str,
token_budget: int,
max_iterations: int,
) -> None:
"""Ask the web researcher a QUESTION."""
console = Console()
console.print(f"[dim]Researching:[/dim] {question}\n")
log.info(
"ask_started",
question=question,
depth=depth,
max_iterations=max_iterations,
token_budget=token_budget,
)
try:
result = asyncio.run(
call_research_tool(
question=question,
depth=depth,
max_iterations=max_iterations,
token_budget=token_budget,
)
)
except Exception as e:
log.error("ask_failed", question=question, error=str(e), exc_info=True)
console.print(f"[bold red]Error:[/bold red] {e}")
sys.exit(1)
log.info(
"ask_completed",
trace_id=result.trace_id,
confidence=result.confidence,
citations=len(result.citations),
tokens_used=result.cost_metadata.tokens_used,
wall_time_sec=result.cost_metadata.wall_time_sec,
)
render_result(result, console)
def _resolve_trace_path(trace_id: str, trace_dir: Optional[str]) -> Path:
"""Resolve the JSONL path for a trace_id."""
base = Path(os.path.expanduser(trace_dir or DEFAULT_TRACE_DIR))
return base / f"{trace_id}.jsonl"
def render_trace(entries: list[dict], trace_id: str, console: Console) -> None:
"""Pretty-print a list of trace entries."""
console.print(
Panel(
f"[bold]trace_id:[/bold] {trace_id}\n[bold]steps:[/bold] {len(entries)}",
title="[cyan]Replay[/cyan]",
border_style="cyan",
)
)
if not entries:
console.print("[dim]Trace file is empty.[/dim]")
return
table = Table(show_lines=True, expand=True)
table.add_column("#", style="dim", width=4)
table.add_column("Action", style="magenta")
table.add_column("Decision", overflow="fold")
table.add_column("Details", overflow="fold")
table.add_column("Hash", style="dim", overflow="fold")
reserved = {"step", "action", "decision", "timestamp", "content_hash"}
for e in entries:
step = str(e.get("step", "?"))
action = str(e.get("action", ""))
decision = str(e.get("decision", ""))
content_hash = str(e.get("content_hash", "") or "")
extras = {k: v for k, v in e.items() if k not in reserved}
details = "\n".join(f"{k}: {v}" for k, v in extras.items())
table.add_row(step, action, decision, details, content_hash)
console.print(table)
@cli.command()
@click.argument("trace_id")
@click.option(
"--trace-dir",
default=None,
help=f"Trace directory (default: {DEFAULT_TRACE_DIR}).",
)
def replay(trace_id: str, trace_dir: Optional[str]) -> None:
"""Replay a prior research run by TRACE_ID."""
console = Console()
path = _resolve_trace_path(trace_id, trace_dir)
if not path.exists():
console.print(
f"[bold red]Error:[/bold red] no trace file found for "
f"trace_id [bold]{trace_id}[/bold] at {path}"
)
sys.exit(1)
entries: list[dict] = []
with open(path, "r", encoding="utf-8") as f:
for lineno, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
entries.append(json.loads(line))
except json.JSONDecodeError as e:
console.print(
f"[bold red]Error:[/bold red] invalid JSON on line {lineno}: {e}"
)
sys.exit(1)
render_trace(entries, trace_id, console)
# ---------------------------------------------------------------------------
# costs command
# ---------------------------------------------------------------------------
_RELATIVE_RE = re.compile(r"^(\d+)([dwhm])$")
def _parse_when(value: str) -> datetime:
"""Parse an ISO date or a relative shorthand like '7d', '24h'."""
m = _RELATIVE_RE.match(value)
if m:
n = int(m.group(1))
unit = m.group(2)
delta = {
"h": timedelta(hours=n),
"d": timedelta(days=n),
"w": timedelta(weeks=n),
"m": timedelta(days=30 * n),
}[unit]
return datetime.now(timezone.utc) - delta
# Otherwise treat as ISO date / datetime
dt = datetime.fromisoformat(value)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
def _load_ledger(path: Path) -> list[dict]:
if not path.exists():
return []
entries: list[dict] = []
with open(path, "r", encoding="utf-8") as f:
for lineno, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
# Skip a corrupt line rather than blow up the whole report
continue
return entries
def _filter_entries(
entries: list[dict],
since: Optional[datetime],
until: Optional[datetime],
model: Optional[str],
) -> list[dict]:
out = []
for e in entries:
ts_str = e.get("timestamp", "")
try:
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
except ValueError:
continue
if since and ts < since:
continue
if until and ts > until:
continue
if model and e.get("model_id") != model:
continue
out.append(e)
return out
def render_costs(entries: list[dict], console: Console) -> None:
"""Render a cost summary from filtered ledger entries."""
if not entries:
console.print("[dim]No cost data yet.[/dim]")
return
total_calls = len(entries)
total_tokens = sum(e.get("tokens_used", 0) for e in entries)
total_input = sum(e.get("tokens_input") or 0 for e in entries)
total_output = sum(e.get("tokens_output") or 0 for e in entries)
total_tavily = sum(e.get("tavily_searches", 0) for e in entries)
total_spend = sum(
e.get("estimated_cost_usd") or 0.0 for e in entries
)
unknown_cost_calls = sum(
1 for e in entries if e.get("estimated_cost_usd") is None
)
# Summary panel
summary = Text()
summary.append(f"Calls: {total_calls}\n", style="bold")
summary.append(f"Total spend: ${total_spend:.4f}\n", style="bold green")
summary.append(f"Total tokens: {total_tokens:,} ")
summary.append(f"(in {total_input:,} / out {total_output:,})\n", style="dim")
summary.append(f"Tavily searches: {total_tavily}\n")
if unknown_cost_calls:
summary.append(
f"Calls with unknown model price: {unknown_cost_calls}\n",
style="yellow",
)
console.print(Panel(summary, title="Cost Summary", border_style="green"))
# Per-day breakdown
per_day: dict[str, dict] = defaultdict(lambda: {"calls": 0, "tokens": 0, "spend": 0.0})
for e in entries:
day = e.get("timestamp", "")[:10]
per_day[day]["calls"] += 1
per_day[day]["tokens"] += e.get("tokens_used", 0)
per_day[day]["spend"] += e.get("estimated_cost_usd") or 0.0
day_table = Table(title="Per Day", show_lines=False, expand=True)
day_table.add_column("Date", style="dim")
day_table.add_column("Calls", justify="right")
day_table.add_column("Tokens", justify="right")
day_table.add_column("Spend (USD)", justify="right", style="green")
for day in sorted(per_day.keys()):
d = per_day[day]
day_table.add_row(
day, str(d["calls"]), f"{d['tokens']:,}", f"${d['spend']:.4f}"
)
console.print(day_table)
# Per-model breakdown
per_model: dict[str, dict] = defaultdict(
lambda: {"calls": 0, "tokens": 0, "spend": 0.0}
)
for e in entries:
m = e.get("model_id", "(unknown)")
per_model[m]["calls"] += 1
per_model[m]["tokens"] += e.get("tokens_used", 0)
per_model[m]["spend"] += e.get("estimated_cost_usd") or 0.0
model_table = Table(title="Per Model", show_lines=False, expand=True)
model_table.add_column("Model")
model_table.add_column("Calls", justify="right")
model_table.add_column("Tokens", justify="right")
model_table.add_column("Spend (USD)", justify="right", style="green")
for m in sorted(per_model.keys()):
d = per_model[m]
model_table.add_row(
m, str(d["calls"]), f"{d['tokens']:,}", f"${d['spend']:.4f}"
)
console.print(model_table)
# Highest-cost call
costed = [e for e in entries if e.get("estimated_cost_usd") is not None]
if costed:
top = max(costed, key=lambda e: e["estimated_cost_usd"])
top_text = Text()
top_text.append(f"trace_id: {top.get('trace_id', '?')}\n")
top_text.append(f"question: {top.get('question', '')[:120]}\n")
top_text.append(f"model: {top.get('model_id', '?')}\n")
top_text.append(f"tokens: {top.get('tokens_used', 0):,}\n")
top_text.append(
f"spend: ${top.get('estimated_cost_usd', 0):.4f}\n",
style="bold green",
)
console.print(
Panel(top_text, title="Highest-Cost Call", border_style="yellow")
)
@cli.command()
@click.option(
"--since",
default=None,
help="Filter by start time. ISO date or relative (e.g. 7d, 24h, 2w).",
)
@click.option(
"--until",
default=None,
help="Filter by end time. ISO date or relative.",
)
@click.option(
"--model",
default=None,
help="Filter to a specific model_id.",
)
@click.option(
"--json",
"as_json",
is_flag=True,
default=False,
help="Emit raw filtered ledger entries as JSON instead of the table.",
)
@click.option(
"--ledger",
default=None,
help=f"Override ledger path (default: {DEFAULT_LEDGER_PATH}).",
)
def costs(
since: Optional[str],
until: Optional[str],
model: Optional[str],
as_json: bool,
ledger: Optional[str],
) -> None:
"""Show cost summary from the research ledger."""
console = Console()
path = Path(os.path.expanduser(ledger or DEFAULT_LEDGER_PATH))
entries = _load_ledger(path)
since_dt = _parse_when(since) if since else None
until_dt = _parse_when(until) if until else None
filtered = _filter_entries(entries, since_dt, until_dt, model)
if as_json:
for e in filtered:
click.echo(json.dumps(e))
return
render_costs(filtered, console)
if __name__ == "__main__":
cli()