Previously the depth parameter (shallow/balanced/deep) was passed only as a text hint inside the agent's user message, with no mechanical effect on iterations, token budget, or source count. The flag was effectively cosmetic — the LLM was expected to "interpret" it. Add DEPTH_PRESETS table and constraints_for_depth() helper in researchers.web.models: shallow: 2 iters, 5,000 tokens, 5 sources balanced: 5 iters, 20,000 tokens, 10 sources (= historical defaults) deep: 8 iters, 60,000 tokens, 20 sources Wired through the stack: - WebResearcher.research(): when constraints is None, builds from the depth preset instead of bare ResearchConstraints() - MCP server `research` tool: max_iterations and token_budget now default to None; constraints are built via constraints_for_depth with explicit values overriding the preset - CLI `ask` command: --max-iterations and --budget default to None; the CLI only forwards them to the MCP tool when set, so unset flags fall through to the depth preset balanced is unchanged from the historical defaults so existing callers see no behavior difference. Explicit --max-iterations / --budget always win over the preset. Tests cover each preset's values, balanced backward-compat, unknown depth fallback, full override, and partial override. 116/116 tests passing. Live-verified: --depth shallow on a simple question now caps at 2 iterations and stays under budget. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
517 lines
17 KiB
Python
517 lines
17 KiB
Python
"""Marchwarden CLI shim.
|
|
|
|
Talks to the web researcher MCP server over stdio and pretty-prints
|
|
ResearchResult contracts to the terminal.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import click
|
|
from mcp import ClientSession, StdioServerParameters
|
|
from mcp.client.stdio import stdio_client
|
|
from rich.console import Console
|
|
from rich.panel import Panel
|
|
from rich.table import Table
|
|
from rich.text import Text
|
|
|
|
from obs import configure_logging, get_logger
|
|
from obs.costs import DEFAULT_LEDGER_PATH
|
|
from researchers.web.models import ResearchResult
|
|
|
|
|
|
DEFAULT_TRACE_DIR = "~/.marchwarden/traces"
|
|
|
|
log = get_logger("marchwarden.cli")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MCP client
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def call_research_tool(
|
|
question: str,
|
|
depth: str,
|
|
max_iterations: Optional[int],
|
|
token_budget: Optional[int],
|
|
) -> ResearchResult:
|
|
"""Spawn the web researcher MCP server and call its `research` tool.
|
|
|
|
``max_iterations`` and ``token_budget`` are optional — when None,
|
|
the MCP server uses the depth preset (Issue #30).
|
|
"""
|
|
params = StdioServerParameters(
|
|
command=sys.executable,
|
|
args=["-m", "researchers.web.server"],
|
|
env=os.environ.copy(),
|
|
)
|
|
arguments: dict = {"question": question, "depth": depth}
|
|
if max_iterations is not None:
|
|
arguments["max_iterations"] = max_iterations
|
|
if token_budget is not None:
|
|
arguments["token_budget"] = token_budget
|
|
async with stdio_client(params) as (read, write):
|
|
async with ClientSession(read, write) as session:
|
|
await session.initialize()
|
|
result = await session.call_tool("research", arguments=arguments)
|
|
# FastMCP returns the tool's string return as a TextContent block.
|
|
payload = result.content[0].text
|
|
return ResearchResult.model_validate_json(payload)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pretty printing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def render_result(result: ResearchResult, console: Console) -> None:
|
|
"""Render a ResearchResult to the console using rich."""
|
|
# Answer
|
|
console.print(
|
|
Panel(
|
|
result.answer,
|
|
title="[bold cyan]Answer[/bold cyan]",
|
|
border_style="cyan",
|
|
)
|
|
)
|
|
|
|
# Citations
|
|
if result.citations:
|
|
table = Table(title="Citations", show_lines=True, expand=True)
|
|
table.add_column("#", style="dim", width=3)
|
|
table.add_column("Title / Locator", overflow="fold")
|
|
table.add_column("Excerpt", overflow="fold")
|
|
table.add_column("Conf", justify="right", width=5)
|
|
for i, c in enumerate(result.citations, 1):
|
|
header = f"[bold]{c.title or c.locator}[/bold]\n[dim]{c.locator}[/dim]"
|
|
table.add_row(str(i), header, c.raw_excerpt, f"{c.confidence:.2f}")
|
|
console.print(table)
|
|
else:
|
|
console.print("[dim]No citations.[/dim]")
|
|
|
|
# Gaps grouped by category
|
|
if result.gaps:
|
|
gap_table = Table(title="Gaps", show_lines=True, expand=True)
|
|
gap_table.add_column("Category", style="yellow")
|
|
gap_table.add_column("Topic")
|
|
gap_table.add_column("Detail", overflow="fold")
|
|
for g in result.gaps:
|
|
gap_table.add_row(g.category.value, g.topic, g.detail)
|
|
console.print(gap_table)
|
|
|
|
# Discovery events
|
|
if result.discovery_events:
|
|
de_table = Table(title="Discovery Events", show_lines=True, expand=True)
|
|
de_table.add_column("Type", style="magenta")
|
|
de_table.add_column("Suggested Researcher")
|
|
de_table.add_column("Query", overflow="fold")
|
|
de_table.add_column("Reason", overflow="fold")
|
|
for d in result.discovery_events:
|
|
de_table.add_row(
|
|
d.type, d.suggested_researcher or "-", d.query, d.reason
|
|
)
|
|
console.print(de_table)
|
|
|
|
# Open questions
|
|
if result.open_questions:
|
|
oq_table = Table(title="Open Questions", show_lines=True, expand=True)
|
|
oq_table.add_column("Priority", style="green")
|
|
oq_table.add_column("Question", overflow="fold")
|
|
oq_table.add_column("Context", overflow="fold")
|
|
for q in result.open_questions:
|
|
oq_table.add_row(q.priority, q.question, q.context)
|
|
console.print(oq_table)
|
|
|
|
# Confidence + factors
|
|
cf = result.confidence_factors
|
|
conf_text = Text()
|
|
conf_text.append(f"Overall: {result.confidence:.2f}\n", style="bold")
|
|
conf_text.append(f"Corroborating sources: {cf.num_corroborating_sources}\n")
|
|
conf_text.append(f"Source authority: {cf.source_authority}\n")
|
|
conf_text.append(f"Contradiction detected: {cf.contradiction_detected}\n")
|
|
conf_text.append(f"Query specificity match: {cf.query_specificity_match:.2f}\n")
|
|
budget_status = "spent" if cf.budget_exhausted else "under cap"
|
|
conf_text.append(f"Budget status: {budget_status}\n")
|
|
conf_text.append(f"Recency: {cf.recency or 'unknown'}")
|
|
console.print(Panel(conf_text, title="Confidence", border_style="green"))
|
|
|
|
# Cost
|
|
cm = result.cost_metadata
|
|
cost_text = Text()
|
|
cost_text.append(f"Tokens: {cm.tokens_used}\n")
|
|
cost_text.append(f"Iterations: {cm.iterations_run}\n")
|
|
cost_text.append(f"Wall time: {cm.wall_time_sec:.2f}s\n")
|
|
cost_text.append(f"Model: {cm.model_id}")
|
|
console.print(Panel(cost_text, title="Cost", border_style="blue"))
|
|
|
|
# Trace footer
|
|
console.print(f"\n[dim]trace_id: {result.trace_id}[/dim]")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Click app
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@click.group()
|
|
def cli() -> None:
|
|
"""Marchwarden — agentic research CLI."""
|
|
configure_logging()
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("question")
|
|
@click.option(
|
|
"--depth",
|
|
type=click.Choice(["shallow", "balanced", "deep"]),
|
|
default="balanced",
|
|
show_default=True,
|
|
)
|
|
@click.option(
|
|
"--budget",
|
|
"token_budget",
|
|
type=int,
|
|
default=None,
|
|
help="Token budget for the research loop. Overrides the depth preset.",
|
|
)
|
|
@click.option(
|
|
"--max-iterations",
|
|
type=int,
|
|
default=None,
|
|
help="Max research loop iterations. Overrides the depth preset.",
|
|
)
|
|
def ask(
|
|
question: str,
|
|
depth: str,
|
|
token_budget: Optional[int],
|
|
max_iterations: Optional[int],
|
|
) -> None:
|
|
"""Ask the web researcher a QUESTION."""
|
|
console = Console()
|
|
console.print(f"[dim]Researching:[/dim] {question}\n")
|
|
log.info(
|
|
"ask_started",
|
|
question=question,
|
|
depth=depth,
|
|
max_iterations=max_iterations,
|
|
token_budget=token_budget,
|
|
)
|
|
try:
|
|
result = asyncio.run(
|
|
call_research_tool(
|
|
question=question,
|
|
depth=depth,
|
|
max_iterations=max_iterations,
|
|
token_budget=token_budget,
|
|
)
|
|
)
|
|
except Exception as e:
|
|
log.error("ask_failed", question=question, error=str(e), exc_info=True)
|
|
console.print(f"[bold red]Error:[/bold red] {e}")
|
|
sys.exit(1)
|
|
log.info(
|
|
"ask_completed",
|
|
trace_id=result.trace_id,
|
|
confidence=result.confidence,
|
|
citations=len(result.citations),
|
|
tokens_used=result.cost_metadata.tokens_used,
|
|
wall_time_sec=result.cost_metadata.wall_time_sec,
|
|
)
|
|
render_result(result, console)
|
|
|
|
|
|
def _resolve_trace_path(trace_id: str, trace_dir: Optional[str]) -> Path:
|
|
"""Resolve the JSONL path for a trace_id."""
|
|
base = Path(os.path.expanduser(trace_dir or DEFAULT_TRACE_DIR))
|
|
return base / f"{trace_id}.jsonl"
|
|
|
|
|
|
def render_trace(entries: list[dict], trace_id: str, console: Console) -> None:
|
|
"""Pretty-print a list of trace entries."""
|
|
console.print(
|
|
Panel(
|
|
f"[bold]trace_id:[/bold] {trace_id}\n[bold]steps:[/bold] {len(entries)}",
|
|
title="[cyan]Replay[/cyan]",
|
|
border_style="cyan",
|
|
)
|
|
)
|
|
|
|
if not entries:
|
|
console.print("[dim]Trace file is empty.[/dim]")
|
|
return
|
|
|
|
table = Table(show_lines=True, expand=True)
|
|
table.add_column("#", style="dim", width=4)
|
|
table.add_column("Action", style="magenta")
|
|
table.add_column("Decision", overflow="fold")
|
|
table.add_column("Details", overflow="fold")
|
|
table.add_column("Hash", style="dim", overflow="fold")
|
|
|
|
reserved = {"step", "action", "decision", "timestamp", "content_hash"}
|
|
for e in entries:
|
|
step = str(e.get("step", "?"))
|
|
action = str(e.get("action", ""))
|
|
decision = str(e.get("decision", ""))
|
|
content_hash = str(e.get("content_hash", "") or "")
|
|
extras = {k: v for k, v in e.items() if k not in reserved}
|
|
details = "\n".join(f"{k}: {v}" for k, v in extras.items())
|
|
table.add_row(step, action, decision, details, content_hash)
|
|
|
|
console.print(table)
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("trace_id")
|
|
@click.option(
|
|
"--trace-dir",
|
|
default=None,
|
|
help=f"Trace directory (default: {DEFAULT_TRACE_DIR}).",
|
|
)
|
|
def replay(trace_id: str, trace_dir: Optional[str]) -> None:
|
|
"""Replay a prior research run by TRACE_ID."""
|
|
console = Console()
|
|
path = _resolve_trace_path(trace_id, trace_dir)
|
|
if not path.exists():
|
|
console.print(
|
|
f"[bold red]Error:[/bold red] no trace file found for "
|
|
f"trace_id [bold]{trace_id}[/bold] at {path}"
|
|
)
|
|
sys.exit(1)
|
|
|
|
entries: list[dict] = []
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
for lineno, line in enumerate(f, 1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
entries.append(json.loads(line))
|
|
except json.JSONDecodeError as e:
|
|
console.print(
|
|
f"[bold red]Error:[/bold red] invalid JSON on line {lineno}: {e}"
|
|
)
|
|
sys.exit(1)
|
|
|
|
render_trace(entries, trace_id, console)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# costs command
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_RELATIVE_RE = re.compile(r"^(\d+)([dwhm])$")
|
|
|
|
|
|
def _parse_when(value: str) -> datetime:
|
|
"""Parse an ISO date or a relative shorthand like '7d', '24h'."""
|
|
m = _RELATIVE_RE.match(value)
|
|
if m:
|
|
n = int(m.group(1))
|
|
unit = m.group(2)
|
|
delta = {
|
|
"h": timedelta(hours=n),
|
|
"d": timedelta(days=n),
|
|
"w": timedelta(weeks=n),
|
|
"m": timedelta(days=30 * n),
|
|
}[unit]
|
|
return datetime.now(timezone.utc) - delta
|
|
# Otherwise treat as ISO date / datetime
|
|
dt = datetime.fromisoformat(value)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt
|
|
|
|
|
|
def _load_ledger(path: Path) -> list[dict]:
|
|
if not path.exists():
|
|
return []
|
|
entries: list[dict] = []
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
for lineno, line in enumerate(f, 1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
entries.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
# Skip a corrupt line rather than blow up the whole report
|
|
continue
|
|
return entries
|
|
|
|
|
|
def _filter_entries(
|
|
entries: list[dict],
|
|
since: Optional[datetime],
|
|
until: Optional[datetime],
|
|
model: Optional[str],
|
|
) -> list[dict]:
|
|
out = []
|
|
for e in entries:
|
|
ts_str = e.get("timestamp", "")
|
|
try:
|
|
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
continue
|
|
if since and ts < since:
|
|
continue
|
|
if until and ts > until:
|
|
continue
|
|
if model and e.get("model_id") != model:
|
|
continue
|
|
out.append(e)
|
|
return out
|
|
|
|
|
|
def render_costs(entries: list[dict], console: Console) -> None:
|
|
"""Render a cost summary from filtered ledger entries."""
|
|
if not entries:
|
|
console.print("[dim]No cost data yet.[/dim]")
|
|
return
|
|
|
|
total_calls = len(entries)
|
|
total_tokens = sum(e.get("tokens_used", 0) for e in entries)
|
|
total_input = sum(e.get("tokens_input") or 0 for e in entries)
|
|
total_output = sum(e.get("tokens_output") or 0 for e in entries)
|
|
total_tavily = sum(e.get("tavily_searches", 0) for e in entries)
|
|
total_spend = sum(
|
|
e.get("estimated_cost_usd") or 0.0 for e in entries
|
|
)
|
|
unknown_cost_calls = sum(
|
|
1 for e in entries if e.get("estimated_cost_usd") is None
|
|
)
|
|
|
|
# Summary panel
|
|
summary = Text()
|
|
summary.append(f"Calls: {total_calls}\n", style="bold")
|
|
summary.append(f"Total spend: ${total_spend:.4f}\n", style="bold green")
|
|
summary.append(f"Total tokens: {total_tokens:,} ")
|
|
summary.append(f"(in {total_input:,} / out {total_output:,})\n", style="dim")
|
|
summary.append(f"Tavily searches: {total_tavily}\n")
|
|
if unknown_cost_calls:
|
|
summary.append(
|
|
f"Calls with unknown model price: {unknown_cost_calls}\n",
|
|
style="yellow",
|
|
)
|
|
console.print(Panel(summary, title="Cost Summary", border_style="green"))
|
|
|
|
# Per-day breakdown
|
|
per_day: dict[str, dict] = defaultdict(lambda: {"calls": 0, "tokens": 0, "spend": 0.0})
|
|
for e in entries:
|
|
day = e.get("timestamp", "")[:10]
|
|
per_day[day]["calls"] += 1
|
|
per_day[day]["tokens"] += e.get("tokens_used", 0)
|
|
per_day[day]["spend"] += e.get("estimated_cost_usd") or 0.0
|
|
day_table = Table(title="Per Day", show_lines=False, expand=True)
|
|
day_table.add_column("Date", style="dim")
|
|
day_table.add_column("Calls", justify="right")
|
|
day_table.add_column("Tokens", justify="right")
|
|
day_table.add_column("Spend (USD)", justify="right", style="green")
|
|
for day in sorted(per_day.keys()):
|
|
d = per_day[day]
|
|
day_table.add_row(
|
|
day, str(d["calls"]), f"{d['tokens']:,}", f"${d['spend']:.4f}"
|
|
)
|
|
console.print(day_table)
|
|
|
|
# Per-model breakdown
|
|
per_model: dict[str, dict] = defaultdict(
|
|
lambda: {"calls": 0, "tokens": 0, "spend": 0.0}
|
|
)
|
|
for e in entries:
|
|
m = e.get("model_id", "(unknown)")
|
|
per_model[m]["calls"] += 1
|
|
per_model[m]["tokens"] += e.get("tokens_used", 0)
|
|
per_model[m]["spend"] += e.get("estimated_cost_usd") or 0.0
|
|
model_table = Table(title="Per Model", show_lines=False, expand=True)
|
|
model_table.add_column("Model")
|
|
model_table.add_column("Calls", justify="right")
|
|
model_table.add_column("Tokens", justify="right")
|
|
model_table.add_column("Spend (USD)", justify="right", style="green")
|
|
for m in sorted(per_model.keys()):
|
|
d = per_model[m]
|
|
model_table.add_row(
|
|
m, str(d["calls"]), f"{d['tokens']:,}", f"${d['spend']:.4f}"
|
|
)
|
|
console.print(model_table)
|
|
|
|
# Highest-cost call
|
|
costed = [e for e in entries if e.get("estimated_cost_usd") is not None]
|
|
if costed:
|
|
top = max(costed, key=lambda e: e["estimated_cost_usd"])
|
|
top_text = Text()
|
|
top_text.append(f"trace_id: {top.get('trace_id', '?')}\n")
|
|
top_text.append(f"question: {top.get('question', '')[:120]}\n")
|
|
top_text.append(f"model: {top.get('model_id', '?')}\n")
|
|
top_text.append(f"tokens: {top.get('tokens_used', 0):,}\n")
|
|
top_text.append(
|
|
f"spend: ${top.get('estimated_cost_usd', 0):.4f}\n",
|
|
style="bold green",
|
|
)
|
|
console.print(
|
|
Panel(top_text, title="Highest-Cost Call", border_style="yellow")
|
|
)
|
|
|
|
|
|
@cli.command()
|
|
@click.option(
|
|
"--since",
|
|
default=None,
|
|
help="Filter by start time. ISO date or relative (e.g. 7d, 24h, 2w).",
|
|
)
|
|
@click.option(
|
|
"--until",
|
|
default=None,
|
|
help="Filter by end time. ISO date or relative.",
|
|
)
|
|
@click.option(
|
|
"--model",
|
|
default=None,
|
|
help="Filter to a specific model_id.",
|
|
)
|
|
@click.option(
|
|
"--json",
|
|
"as_json",
|
|
is_flag=True,
|
|
default=False,
|
|
help="Emit raw filtered ledger entries as JSON instead of the table.",
|
|
)
|
|
@click.option(
|
|
"--ledger",
|
|
default=None,
|
|
help=f"Override ledger path (default: {DEFAULT_LEDGER_PATH}).",
|
|
)
|
|
def costs(
|
|
since: Optional[str],
|
|
until: Optional[str],
|
|
model: Optional[str],
|
|
as_json: bool,
|
|
ledger: Optional[str],
|
|
) -> None:
|
|
"""Show cost summary from the research ledger."""
|
|
console = Console()
|
|
path = Path(os.path.expanduser(ledger or DEFAULT_LEDGER_PATH))
|
|
entries = _load_ledger(path)
|
|
|
|
since_dt = _parse_when(since) if since else None
|
|
until_dt = _parse_when(until) if until else None
|
|
filtered = _filter_entries(entries, since_dt, until_dt, model)
|
|
|
|
if as_json:
|
|
for e in filtered:
|
|
click.echo(json.dumps(e))
|
|
return
|
|
|
|
render_costs(filtered, console)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|