marchwarden/researchers/web/agent.py
Jeff Smith ae48acd421 depth flag now drives constraint defaults (#30)
Previously the depth parameter (shallow/balanced/deep) was passed
only as a text hint inside the agent's user message, with no
mechanical effect on iterations, token budget, or source count.
The flag was effectively cosmetic — the LLM was expected to
"interpret" it.

Add DEPTH_PRESETS table and constraints_for_depth() helper in
researchers.web.models:

  shallow:  2 iters,  5,000 tokens,  5 sources
  balanced: 5 iters, 20,000 tokens, 10 sources  (= historical defaults)
  deep:     8 iters, 60,000 tokens, 20 sources

Wired through the stack:

- WebResearcher.research(): when constraints is None, builds from
  the depth preset instead of bare ResearchConstraints()
- MCP server `research` tool: max_iterations and token_budget now
  default to None; constraints are built via constraints_for_depth
  with explicit values overriding the preset
- CLI `ask` command: --max-iterations and --budget default to None;
  the CLI only forwards them to the MCP tool when set, so unset
  flags fall through to the depth preset

balanced is unchanged from the historical defaults so existing
callers see no behavior difference. Explicit --max-iterations /
--budget always win over the preset.

Tests cover each preset's values, balanced backward-compat,
unknown depth fallback, full override, and partial override.
116/116 tests passing. Live-verified: --depth shallow on a simple
question now caps at 2 iterations and stays under budget.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 16:27:38 -06:00

729 lines
25 KiB
Python

"""Web researcher agent — the inner agentic loop.
Takes a question, runs a plan→search→fetch→iterate→synthesize loop
using Claude as the reasoning engine and Tavily/httpx as tools.
Returns a ResearchResult conforming to the v1 contract.
"""
import asyncio
import json
import time
from typing import Optional
import structlog
from anthropic import Anthropic
from obs import get_logger
from obs.costs import CostLedger
from researchers.web.models import (
Citation,
ConfidenceFactors,
CostMetadata,
DiscoveryEvent,
Gap,
GapCategory,
OpenQuestion,
ResearchConstraints,
ResearchResult,
constraints_for_depth,
)
from researchers.web.tools import SearchResult, fetch_url, tavily_search
from researchers.web.trace import TraceLogger
log = get_logger("marchwarden.researcher.web")
SYSTEM_PROMPT = """\
You are a Marchwarden — a research specialist stationed at the frontier of knowledge. \
Your job is to investigate a question thoroughly using web search and URL fetching, \
then produce a grounded, evidence-based answer.
## Your process
1. **Plan**: Decide what to search for. Break complex questions into sub-queries.
2. **Search**: Use the web_search tool to find relevant sources.
3. **Fetch**: Use the fetch_url tool to get full content from promising URLs.
4. **Iterate**: If you don't have enough evidence, search again with refined queries.
5. **Stop**: When you have sufficient evidence OR you've exhausted your budget.
## Rules
- Every claim must be traceable to a source you actually fetched.
- If you can't find information, say so — never fabricate.
- If sources contradict each other, note the contradiction.
- If the question requires expertise outside web search (academic papers, databases, \
legal documents), note it as a discovery for another researcher.
- Be efficient. Don't fetch URLs that are clearly irrelevant from their title/snippet.
- Prefer authoritative sources (.gov, .edu, established organizations) over blogs/forums.
"""
SYNTHESIS_PROMPT = """\
Based on the evidence gathered, produce a structured research result as JSON.
## Evidence gathered
{evidence}
## Original question
{question}
## Context from caller
{context}
## Instructions
Produce a JSON object with these exact fields:
{{
"answer": "Your synthesized answer. Every claim must trace to a citation.",
"citations": [
{{
"source": "web",
"locator": "the exact URL",
"title": "page title",
"snippet": "your 50-200 char summary of why this source is relevant",
"raw_excerpt": "verbatim 100-500 char excerpt from the source that supports your claim",
"confidence": 0.0-1.0
}}
],
"gaps": [
{{
"topic": "what wasn't resolved",
"category": "source_not_found|access_denied|budget_exhausted|contradictory_sources|scope_exceeded",
"detail": "human-readable explanation"
}}
],
"discovery_events": [
{{
"type": "related_research|new_source|contradiction",
"suggested_researcher": "arxiv|database|legal|null",
"query": "suggested query for that researcher",
"reason": "why this matters",
"source_locator": "URL where you found this, or null"
}}
],
"open_questions": [
{{
"question": "A follow-up question that emerged from the research",
"context": "What evidence prompted this question",
"priority": "high|medium|low",
"source_locator": "URL where this question arose, or null"
}}
],
"confidence": 0.0-1.0,
"confidence_factors": {{
"num_corroborating_sources": 0,
"source_authority": "high|medium|low",
"contradiction_detected": false,
"query_specificity_match": 0.0-1.0,
"budget_exhausted": false,
"recency": "current|recent|dated|null"
}}
}}
Respond with ONLY the JSON object, no markdown fences, no explanation.
"""
# Tool definitions for Claude's tool_use API
TOOLS = [
{
"name": "web_search",
"description": (
"Search the web for information. Returns titles, URLs, snippets, "
"and sometimes full page content. Use this to find sources."
),
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query.",
},
"max_results": {
"type": "integer",
"description": "Number of results (1-10). Default 5.",
"default": 5,
},
},
"required": ["query"],
},
},
{
"name": "fetch_url",
"description": (
"Fetch the full text content of a URL. Use this when a search result "
"looks promising but the snippet isn't enough. Returns extracted text."
),
"input_schema": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch.",
},
},
"required": ["url"],
},
},
]
class WebResearcher:
"""Agentic web researcher that searches, fetches, and synthesizes."""
def __init__(
self,
anthropic_api_key: str,
tavily_api_key: str,
model_id: str = "claude-sonnet-4-6",
trace_dir: Optional[str] = None,
cost_ledger: Optional[CostLedger] = None,
):
self.client = Anthropic(api_key=anthropic_api_key)
self.tavily_api_key = tavily_api_key
self.model_id = model_id
self.trace_dir = trace_dir
# Lazy default — only constructed if no override is given. Tests
# inject a CostLedger pointed at a tmp path to avoid touching
# the real ledger file.
self.cost_ledger = cost_ledger
async def research(
self,
question: str,
context: Optional[str] = None,
depth: str = "balanced",
constraints: Optional[ResearchConstraints] = None,
) -> ResearchResult:
"""Run a full research loop on a question.
Args:
question: The question to investigate.
context: What the caller already knows (optional).
depth: "shallow", "balanced", or "deep".
constraints: Budget and iteration limits.
Returns:
A ResearchResult conforming to the v1 contract.
"""
# If the caller didn't supply explicit constraints, build them
# from the depth preset (Issue #30). Callers that DO pass a
# ResearchConstraints are taken at their word — explicit wins.
constraints = constraints or constraints_for_depth(depth)
trace = TraceLogger(trace_dir=self.trace_dir)
start_time = time.time()
total_tokens = 0
tokens_input = 0
tokens_output = 0
iterations = 0
evidence: list[dict] = []
budget_exhausted = False
tavily_searches = 0
# Bind trace context so every downstream log call automatically
# carries trace_id and researcher. Cleared in the finally block.
structlog.contextvars.bind_contextvars(
trace_id=trace.trace_id,
researcher="web",
)
log.info(
"research_started",
question=question,
depth=depth,
max_iterations=constraints.max_iterations,
token_budget=constraints.token_budget,
model_id=self.model_id,
)
trace.log_step(
"start",
decision=f"Beginning research: depth={depth}",
question=question,
context=context or "",
max_iterations=constraints.max_iterations,
token_budget=constraints.token_budget,
)
# Build initial message
user_message = f"Research this question: {question}"
if context:
user_message += f"\n\nContext from the caller: {context}"
user_message += f"\n\nResearch depth: {depth}"
messages = [{"role": "user", "content": user_message}]
# --- Tool-use loop ---
# Budget policy: the loop honors token_budget as a soft cap. Before
# starting a new iteration we check whether we've already hit the
# budget; if so we stop and let synthesis run on whatever evidence
# we already have. Synthesis tokens are tracked but not capped here
# — the synthesis call is always allowed to complete so the caller
# gets a structured result rather than a stub.
while iterations < constraints.max_iterations:
if total_tokens >= constraints.token_budget:
budget_exhausted = True
trace.log_step(
"budget_exhausted",
decision=(
f"Token budget reached before iteration "
f"{iterations + 1}: {total_tokens}/{constraints.token_budget}"
),
)
break
iterations += 1
trace.log_step(
"iteration_start",
decision=f"Starting iteration {iterations}/{constraints.max_iterations}",
tokens_so_far=total_tokens,
)
response = self.client.messages.create(
model=self.model_id,
max_tokens=4096,
system=SYSTEM_PROMPT,
messages=messages,
tools=TOOLS,
)
# Track tokens
tokens_input += response.usage.input_tokens
tokens_output += response.usage.output_tokens
total_tokens += response.usage.input_tokens + response.usage.output_tokens
# Check if the model wants to use tools
tool_calls = [b for b in response.content if b.type == "tool_use"]
tavily_searches += sum(
1 for tc in tool_calls if tc.name == "web_search"
)
if not tool_calls:
# Model is done researching — extract any final text
text_blocks = [b.text for b in response.content if b.type == "text"]
if text_blocks:
trace.log_step(
"agent_message",
decision="Agent finished tool use",
message=text_blocks[0][:500],
)
break
# Process each tool call
tool_results = []
for tool_call in tool_calls:
result_content = await self._execute_tool(
tool_call.name,
tool_call.input,
evidence,
trace,
constraints,
)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tool_call.id,
"content": result_content,
}
)
# Append assistant response + tool results to conversation
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
# --- Synthesis step ---
trace.log_step(
"synthesis_start",
decision="Beginning synthesis of gathered evidence",
evidence_count=len(evidence),
iterations_run=iterations,
tokens_used=total_tokens,
)
result, synth_in, synth_out = await self._synthesize(
question=question,
context=context,
evidence=evidence,
trace=trace,
total_tokens=total_tokens,
iterations=iterations,
start_time=start_time,
budget_exhausted=budget_exhausted,
)
tokens_input += synth_in
tokens_output += synth_out
trace.log_step(
"complete",
decision="Research complete",
confidence=result.confidence,
citation_count=len(result.citations),
gap_count=len(result.gaps),
discovery_count=len(result.discovery_events),
)
trace.close()
log.info(
"research_completed",
confidence=result.confidence,
citations=len(result.citations),
gaps=len(result.gaps),
discovery_events=len(result.discovery_events),
tokens_used=result.cost_metadata.tokens_used,
iterations_run=result.cost_metadata.iterations_run,
wall_time_sec=result.cost_metadata.wall_time_sec,
budget_exhausted=result.cost_metadata.budget_exhausted,
)
# Append to the operational cost ledger. Construct on first use
# so test injection (cost_ledger=...) and the env override
# (MARCHWARDEN_COST_LEDGER) both work without forcing every
# caller to build a CostLedger explicitly.
try:
ledger = self.cost_ledger or CostLedger()
ledger.record(
trace_id=result.trace_id,
question=question,
model_id=self.model_id,
tokens_used=result.cost_metadata.tokens_used,
tokens_input=tokens_input,
tokens_output=tokens_output,
iterations_run=result.cost_metadata.iterations_run,
wall_time_sec=result.cost_metadata.wall_time_sec,
tavily_searches=tavily_searches,
budget_exhausted=result.cost_metadata.budget_exhausted,
confidence=result.confidence,
)
except Exception as ledger_err:
# Never let a ledger failure poison a successful research call.
log.warning("cost_ledger_write_failed", error=str(ledger_err))
structlog.contextvars.clear_contextvars()
return result
async def _execute_tool(
self,
tool_name: str,
tool_input: dict,
evidence: list[dict],
trace: TraceLogger,
constraints: ResearchConstraints,
) -> str:
"""Execute a tool call and return the result as a string."""
if tool_name == "web_search":
query = tool_input.get("query", "")
max_results = min(
tool_input.get("max_results", 5),
constraints.max_sources,
)
trace.log_step(
"web_search",
decision=f"Searching: {query}",
query=query,
max_results=max_results,
)
results = tavily_search(
api_key=self.tavily_api_key,
query=query,
max_results=max_results,
)
# Store evidence
for r in results:
ev = {
"type": "search_result",
"url": r.url,
"title": r.title,
"content": r.content,
"raw_content": r.raw_content,
"content_hash": r.content_hash,
"score": r.score,
}
evidence.append(ev)
trace.log_step(
"web_search_complete",
decision=f"Got {len(results)} results",
result_count=len(results),
urls=[r.url for r in results],
)
# Return results as text for the LLM
return _format_search_results(results)
elif tool_name == "fetch_url":
url = tool_input.get("url", "")
trace.log_step(
"fetch_url",
decision=f"Fetching: {url}",
url=url,
)
result = await fetch_url(url)
trace.log_step(
"fetch_url_complete",
decision="Fetch succeeded" if result.success else f"Fetch failed: {result.error}",
url=url,
content_hash=result.content_hash,
content_length=result.content_length,
success=result.success,
)
if result.success:
# Store evidence
evidence.append(
{
"type": "fetched_page",
"url": url,
"content": result.text[:10000],
"content_hash": result.content_hash,
"content_length": result.content_length,
}
)
# Return truncated text for the LLM
return result.text[:8000]
else:
return f"Failed to fetch URL: {result.error}"
return f"Unknown tool: {tool_name}"
async def _synthesize(
self,
question: str,
context: Optional[str],
evidence: list[dict],
trace: TraceLogger,
total_tokens: int,
iterations: int,
start_time: float,
budget_exhausted: bool,
) -> tuple[ResearchResult, int, int]:
"""Ask the LLM to synthesize evidence into a ResearchResult.
Returns ``(result, synthesis_input_tokens, synthesis_output_tokens)``
so the caller can track per-call token splits for cost estimation.
"""
# Format evidence for the synthesis prompt
evidence_text = ""
for i, ev in enumerate(evidence, 1):
if ev["type"] == "search_result":
content = ev.get("raw_content") or ev.get("content", "")
evidence_text += (
f"\n--- Source {i} (search result) ---\n"
f"URL: {ev['url']}\n"
f"Title: {ev['title']}\n"
f"Content hash: {ev['content_hash']}\n"
f"Content: {content[:3000]}\n"
)
elif ev["type"] == "fetched_page":
evidence_text += (
f"\n--- Source {i} (fetched page) ---\n"
f"URL: {ev['url']}\n"
f"Content hash: {ev['content_hash']}\n"
f"Content: {ev['content'][:3000]}\n"
)
prompt = SYNTHESIS_PROMPT.format(
evidence=evidence_text or "(No evidence gathered)",
question=question,
context=context or "(No additional context)",
)
response = self.client.messages.create(
model=self.model_id,
max_tokens=16384,
messages=[{"role": "user", "content": prompt}],
)
synth_in = response.usage.input_tokens
synth_out = response.usage.output_tokens
total_tokens += synth_in + synth_out
wall_time = time.time() - start_time
# Parse the JSON response
raw_text = response.content[0].text.strip()
stop_reason = response.stop_reason
# Strip markdown fences if the model added them despite instructions
if raw_text.startswith("```"):
raw_text = raw_text.split("\n", 1)[1] if "\n" in raw_text else raw_text[3:]
if raw_text.endswith("```"):
raw_text = raw_text[:-3].strip()
try:
data = json.loads(raw_text)
except json.JSONDecodeError as parse_err:
trace.log_step(
"synthesis_error",
decision=(
f"Failed to parse synthesis JSON ({parse_err}); "
f"stop_reason={stop_reason}"
),
stop_reason=stop_reason,
parse_error=str(parse_err),
raw_response=raw_text,
)
return (
self._fallback_result(
question, evidence, trace, total_tokens, iterations,
wall_time, budget_exhausted,
),
synth_in,
synth_out,
)
trace.log_step(
"synthesis_complete",
decision="Parsed synthesis JSON successfully",
)
# Build the ResearchResult from parsed JSON
try:
citations = [
Citation(
source=c.get("source", "web"),
locator=c.get("locator", ""),
title=c.get("title"),
snippet=c.get("snippet"),
raw_excerpt=c.get("raw_excerpt", ""),
confidence=c.get("confidence", 0.5),
)
for c in data.get("citations", [])
]
gaps = [
Gap(
topic=g.get("topic", ""),
category=GapCategory(g.get("category", "source_not_found")),
detail=g.get("detail", ""),
)
for g in data.get("gaps", [])
]
discovery_events = [
DiscoveryEvent(
type=d.get("type", "related_research"),
suggested_researcher=d.get("suggested_researcher"),
query=d.get("query", ""),
reason=d.get("reason", ""),
source_locator=d.get("source_locator"),
)
for d in data.get("discovery_events", [])
]
open_questions = [
OpenQuestion(
question=q.get("question", ""),
context=q.get("context", ""),
priority=q.get("priority", "medium"),
source_locator=q.get("source_locator"),
)
for q in data.get("open_questions", [])
]
cf = data.get("confidence_factors", {})
confidence_factors = ConfidenceFactors(
num_corroborating_sources=cf.get("num_corroborating_sources", 0),
source_authority=cf.get("source_authority", "low"),
contradiction_detected=cf.get("contradiction_detected", False),
query_specificity_match=cf.get("query_specificity_match", 0.5),
budget_exhausted=budget_exhausted or cf.get("budget_exhausted", False),
recency=cf.get("recency"),
)
return (
ResearchResult(
answer=data.get("answer", "No answer could be synthesized."),
citations=citations,
gaps=gaps,
discovery_events=discovery_events,
open_questions=open_questions,
confidence=data.get("confidence", 0.5),
confidence_factors=confidence_factors,
cost_metadata=CostMetadata(
tokens_used=total_tokens,
iterations_run=iterations,
wall_time_sec=wall_time,
budget_exhausted=budget_exhausted,
model_id=self.model_id,
),
trace_id=trace.trace_id,
),
synth_in,
synth_out,
)
except Exception as e:
trace.log_step(
"synthesis_build_error",
decision=f"Failed to build ResearchResult: {e}",
)
return (
self._fallback_result(
question, evidence, trace, total_tokens, iterations,
wall_time, budget_exhausted,
),
synth_in,
synth_out,
)
def _fallback_result(
self,
question: str,
evidence: list[dict],
trace: TraceLogger,
total_tokens: int,
iterations: int,
wall_time: float,
budget_exhausted: bool,
) -> ResearchResult:
"""Produce a minimal valid ResearchResult when synthesis fails."""
return ResearchResult(
answer=f"Research on '{question}' completed but synthesis failed. {len(evidence)} sources were gathered.",
citations=[],
gaps=[
Gap(
topic="synthesis",
category=GapCategory.BUDGET_EXHAUSTED
if budget_exhausted
else GapCategory.SOURCE_NOT_FOUND,
detail="The synthesis step failed to produce structured output.",
)
],
discovery_events=[],
confidence=0.1,
confidence_factors=ConfidenceFactors(
num_corroborating_sources=0,
source_authority="low",
contradiction_detected=False,
query_specificity_match=0.0,
budget_exhausted=budget_exhausted,
recency=None,
),
cost_metadata=CostMetadata(
tokens_used=total_tokens,
iterations_run=iterations,
wall_time_sec=wall_time,
budget_exhausted=budget_exhausted,
model_id=self.model_id,
),
trace_id=trace.trace_id,
)
def _format_search_results(results: list[SearchResult]) -> str:
"""Format search results as readable text for the LLM."""
parts = []
for i, r in enumerate(results, 1):
content = r.raw_content or r.content
parts.append(
f"Result {i}:\n"
f" Title: {r.title}\n"
f" URL: {r.url}\n"
f" Relevance: {r.score:.2f}\n"
f" Content: {content[:2000]}\n"
)
return "\n".join(parts) if parts else "No results found."