marchwarden/researchers/web/agent.py
Jeff Smith 6ff1a6af3d Enforce token_budget before each iteration (#17)
The loop previously checked the token budget at the *bottom* of each
iteration, after the LLM call and tool work had already happened. By
the time the cap was caught the budget had been exceeded and the
overshoot was unbounded by the iteration's cost.

Move the check to the *top* of the loop so a new iteration is never
started past the budget. Document the policy explicitly: token_budget
is a soft cap on the tool-use loop only; the synthesis call is always
allowed to complete so callers get a structured ResearchResult rather
than a fallback stub. Capping synthesis is a separate, larger design
question (would require splitting the budget between loop and
synthesis up-front).

Verified: token_budget=5000, max_iterations=10 now stops after 2
iterations with budget_exhausted=True and a complete answer with
10 citations.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 15:29:22 -06:00

635 lines
21 KiB
Python

"""Web researcher agent — the inner agentic loop.
Takes a question, runs a plan→search→fetch→iterate→synthesize loop
using Claude as the reasoning engine and Tavily/httpx as tools.
Returns a ResearchResult conforming to the v1 contract.
"""
import asyncio
import json
import time
from typing import Optional
from anthropic import Anthropic
from researchers.web.models import (
Citation,
ConfidenceFactors,
CostMetadata,
DiscoveryEvent,
Gap,
GapCategory,
OpenQuestion,
ResearchConstraints,
ResearchResult,
)
from researchers.web.tools import SearchResult, fetch_url, tavily_search
from researchers.web.trace import TraceLogger
SYSTEM_PROMPT = """\
You are a Marchwarden — a research specialist stationed at the frontier of knowledge. \
Your job is to investigate a question thoroughly using web search and URL fetching, \
then produce a grounded, evidence-based answer.
## Your process
1. **Plan**: Decide what to search for. Break complex questions into sub-queries.
2. **Search**: Use the web_search tool to find relevant sources.
3. **Fetch**: Use the fetch_url tool to get full content from promising URLs.
4. **Iterate**: If you don't have enough evidence, search again with refined queries.
5. **Stop**: When you have sufficient evidence OR you've exhausted your budget.
## Rules
- Every claim must be traceable to a source you actually fetched.
- If you can't find information, say so — never fabricate.
- If sources contradict each other, note the contradiction.
- If the question requires expertise outside web search (academic papers, databases, \
legal documents), note it as a discovery for another researcher.
- Be efficient. Don't fetch URLs that are clearly irrelevant from their title/snippet.
- Prefer authoritative sources (.gov, .edu, established organizations) over blogs/forums.
"""
SYNTHESIS_PROMPT = """\
Based on the evidence gathered, produce a structured research result as JSON.
## Evidence gathered
{evidence}
## Original question
{question}
## Context from caller
{context}
## Instructions
Produce a JSON object with these exact fields:
{{
"answer": "Your synthesized answer. Every claim must trace to a citation.",
"citations": [
{{
"source": "web",
"locator": "the exact URL",
"title": "page title",
"snippet": "your 50-200 char summary of why this source is relevant",
"raw_excerpt": "verbatim 100-500 char excerpt from the source that supports your claim",
"confidence": 0.0-1.0
}}
],
"gaps": [
{{
"topic": "what wasn't resolved",
"category": "source_not_found|access_denied|budget_exhausted|contradictory_sources|scope_exceeded",
"detail": "human-readable explanation"
}}
],
"discovery_events": [
{{
"type": "related_research|new_source|contradiction",
"suggested_researcher": "arxiv|database|legal|null",
"query": "suggested query for that researcher",
"reason": "why this matters",
"source_locator": "URL where you found this, or null"
}}
],
"open_questions": [
{{
"question": "A follow-up question that emerged from the research",
"context": "What evidence prompted this question",
"priority": "high|medium|low",
"source_locator": "URL where this question arose, or null"
}}
],
"confidence": 0.0-1.0,
"confidence_factors": {{
"num_corroborating_sources": 0,
"source_authority": "high|medium|low",
"contradiction_detected": false,
"query_specificity_match": 0.0-1.0,
"budget_exhausted": false,
"recency": "current|recent|dated|null"
}}
}}
Respond with ONLY the JSON object, no markdown fences, no explanation.
"""
# Tool definitions for Claude's tool_use API
TOOLS = [
{
"name": "web_search",
"description": (
"Search the web for information. Returns titles, URLs, snippets, "
"and sometimes full page content. Use this to find sources."
),
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query.",
},
"max_results": {
"type": "integer",
"description": "Number of results (1-10). Default 5.",
"default": 5,
},
},
"required": ["query"],
},
},
{
"name": "fetch_url",
"description": (
"Fetch the full text content of a URL. Use this when a search result "
"looks promising but the snippet isn't enough. Returns extracted text."
),
"input_schema": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch.",
},
},
"required": ["url"],
},
},
]
class WebResearcher:
"""Agentic web researcher that searches, fetches, and synthesizes."""
def __init__(
self,
anthropic_api_key: str,
tavily_api_key: str,
model_id: str = "claude-sonnet-4-6",
trace_dir: Optional[str] = None,
):
self.client = Anthropic(api_key=anthropic_api_key)
self.tavily_api_key = tavily_api_key
self.model_id = model_id
self.trace_dir = trace_dir
async def research(
self,
question: str,
context: Optional[str] = None,
depth: str = "balanced",
constraints: Optional[ResearchConstraints] = None,
) -> ResearchResult:
"""Run a full research loop on a question.
Args:
question: The question to investigate.
context: What the caller already knows (optional).
depth: "shallow", "balanced", or "deep".
constraints: Budget and iteration limits.
Returns:
A ResearchResult conforming to the v1 contract.
"""
constraints = constraints or ResearchConstraints()
trace = TraceLogger(trace_dir=self.trace_dir)
start_time = time.time()
total_tokens = 0
iterations = 0
evidence: list[dict] = []
budget_exhausted = False
trace.log_step(
"start",
decision=f"Beginning research: depth={depth}",
question=question,
context=context or "",
max_iterations=constraints.max_iterations,
token_budget=constraints.token_budget,
)
# Build initial message
user_message = f"Research this question: {question}"
if context:
user_message += f"\n\nContext from the caller: {context}"
user_message += f"\n\nResearch depth: {depth}"
messages = [{"role": "user", "content": user_message}]
# --- Tool-use loop ---
# Budget policy: the loop honors token_budget as a soft cap. Before
# starting a new iteration we check whether we've already hit the
# budget; if so we stop and let synthesis run on whatever evidence
# we already have. Synthesis tokens are tracked but not capped here
# — the synthesis call is always allowed to complete so the caller
# gets a structured result rather than a stub.
while iterations < constraints.max_iterations:
if total_tokens >= constraints.token_budget:
budget_exhausted = True
trace.log_step(
"budget_exhausted",
decision=(
f"Token budget reached before iteration "
f"{iterations + 1}: {total_tokens}/{constraints.token_budget}"
),
)
break
iterations += 1
trace.log_step(
"iteration_start",
decision=f"Starting iteration {iterations}/{constraints.max_iterations}",
tokens_so_far=total_tokens,
)
response = self.client.messages.create(
model=self.model_id,
max_tokens=4096,
system=SYSTEM_PROMPT,
messages=messages,
tools=TOOLS,
)
# Track tokens
total_tokens += response.usage.input_tokens + response.usage.output_tokens
# Check if the model wants to use tools
tool_calls = [b for b in response.content if b.type == "tool_use"]
if not tool_calls:
# Model is done researching — extract any final text
text_blocks = [b.text for b in response.content if b.type == "text"]
if text_blocks:
trace.log_step(
"agent_message",
decision="Agent finished tool use",
message=text_blocks[0][:500],
)
break
# Process each tool call
tool_results = []
for tool_call in tool_calls:
result_content = await self._execute_tool(
tool_call.name,
tool_call.input,
evidence,
trace,
constraints,
)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tool_call.id,
"content": result_content,
}
)
# Append assistant response + tool results to conversation
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
# --- Synthesis step ---
trace.log_step(
"synthesis_start",
decision="Beginning synthesis of gathered evidence",
evidence_count=len(evidence),
iterations_run=iterations,
tokens_used=total_tokens,
)
result = await self._synthesize(
question=question,
context=context,
evidence=evidence,
trace=trace,
total_tokens=total_tokens,
iterations=iterations,
start_time=start_time,
budget_exhausted=budget_exhausted,
)
trace.log_step(
"complete",
decision="Research complete",
confidence=result.confidence,
citation_count=len(result.citations),
gap_count=len(result.gaps),
discovery_count=len(result.discovery_events),
)
trace.close()
return result
async def _execute_tool(
self,
tool_name: str,
tool_input: dict,
evidence: list[dict],
trace: TraceLogger,
constraints: ResearchConstraints,
) -> str:
"""Execute a tool call and return the result as a string."""
if tool_name == "web_search":
query = tool_input.get("query", "")
max_results = min(
tool_input.get("max_results", 5),
constraints.max_sources,
)
trace.log_step(
"web_search",
decision=f"Searching: {query}",
query=query,
max_results=max_results,
)
results = tavily_search(
api_key=self.tavily_api_key,
query=query,
max_results=max_results,
)
# Store evidence
for r in results:
ev = {
"type": "search_result",
"url": r.url,
"title": r.title,
"content": r.content,
"raw_content": r.raw_content,
"content_hash": r.content_hash,
"score": r.score,
}
evidence.append(ev)
trace.log_step(
"web_search_complete",
decision=f"Got {len(results)} results",
result_count=len(results),
urls=[r.url for r in results],
)
# Return results as text for the LLM
return _format_search_results(results)
elif tool_name == "fetch_url":
url = tool_input.get("url", "")
trace.log_step(
"fetch_url",
decision=f"Fetching: {url}",
url=url,
)
result = await fetch_url(url)
trace.log_step(
"fetch_url_complete",
decision="Fetch succeeded" if result.success else f"Fetch failed: {result.error}",
url=url,
content_hash=result.content_hash,
content_length=result.content_length,
success=result.success,
)
if result.success:
# Store evidence
evidence.append(
{
"type": "fetched_page",
"url": url,
"content": result.text[:10000],
"content_hash": result.content_hash,
"content_length": result.content_length,
}
)
# Return truncated text for the LLM
return result.text[:8000]
else:
return f"Failed to fetch URL: {result.error}"
return f"Unknown tool: {tool_name}"
async def _synthesize(
self,
question: str,
context: Optional[str],
evidence: list[dict],
trace: TraceLogger,
total_tokens: int,
iterations: int,
start_time: float,
budget_exhausted: bool,
) -> ResearchResult:
"""Ask the LLM to synthesize evidence into a ResearchResult."""
# Format evidence for the synthesis prompt
evidence_text = ""
for i, ev in enumerate(evidence, 1):
if ev["type"] == "search_result":
content = ev.get("raw_content") or ev.get("content", "")
evidence_text += (
f"\n--- Source {i} (search result) ---\n"
f"URL: {ev['url']}\n"
f"Title: {ev['title']}\n"
f"Content hash: {ev['content_hash']}\n"
f"Content: {content[:3000]}\n"
)
elif ev["type"] == "fetched_page":
evidence_text += (
f"\n--- Source {i} (fetched page) ---\n"
f"URL: {ev['url']}\n"
f"Content hash: {ev['content_hash']}\n"
f"Content: {ev['content'][:3000]}\n"
)
prompt = SYNTHESIS_PROMPT.format(
evidence=evidence_text or "(No evidence gathered)",
question=question,
context=context or "(No additional context)",
)
response = self.client.messages.create(
model=self.model_id,
max_tokens=16384,
messages=[{"role": "user", "content": prompt}],
)
total_tokens += response.usage.input_tokens + response.usage.output_tokens
wall_time = time.time() - start_time
# Parse the JSON response
raw_text = response.content[0].text.strip()
stop_reason = response.stop_reason
# Strip markdown fences if the model added them despite instructions
if raw_text.startswith("```"):
raw_text = raw_text.split("\n", 1)[1] if "\n" in raw_text else raw_text[3:]
if raw_text.endswith("```"):
raw_text = raw_text[:-3].strip()
try:
data = json.loads(raw_text)
except json.JSONDecodeError as parse_err:
trace.log_step(
"synthesis_error",
decision=(
f"Failed to parse synthesis JSON ({parse_err}); "
f"stop_reason={stop_reason}"
),
stop_reason=stop_reason,
parse_error=str(parse_err),
raw_response=raw_text,
)
return self._fallback_result(
question, evidence, trace, total_tokens, iterations,
wall_time, budget_exhausted,
)
trace.log_step(
"synthesis_complete",
decision="Parsed synthesis JSON successfully",
)
# Build the ResearchResult from parsed JSON
try:
citations = [
Citation(
source=c.get("source", "web"),
locator=c.get("locator", ""),
title=c.get("title"),
snippet=c.get("snippet"),
raw_excerpt=c.get("raw_excerpt", ""),
confidence=c.get("confidence", 0.5),
)
for c in data.get("citations", [])
]
gaps = [
Gap(
topic=g.get("topic", ""),
category=GapCategory(g.get("category", "source_not_found")),
detail=g.get("detail", ""),
)
for g in data.get("gaps", [])
]
discovery_events = [
DiscoveryEvent(
type=d.get("type", "related_research"),
suggested_researcher=d.get("suggested_researcher"),
query=d.get("query", ""),
reason=d.get("reason", ""),
source_locator=d.get("source_locator"),
)
for d in data.get("discovery_events", [])
]
open_questions = [
OpenQuestion(
question=q.get("question", ""),
context=q.get("context", ""),
priority=q.get("priority", "medium"),
source_locator=q.get("source_locator"),
)
for q in data.get("open_questions", [])
]
cf = data.get("confidence_factors", {})
confidence_factors = ConfidenceFactors(
num_corroborating_sources=cf.get("num_corroborating_sources", 0),
source_authority=cf.get("source_authority", "low"),
contradiction_detected=cf.get("contradiction_detected", False),
query_specificity_match=cf.get("query_specificity_match", 0.5),
budget_exhausted=budget_exhausted or cf.get("budget_exhausted", False),
recency=cf.get("recency"),
)
return ResearchResult(
answer=data.get("answer", "No answer could be synthesized."),
citations=citations,
gaps=gaps,
discovery_events=discovery_events,
open_questions=open_questions,
confidence=data.get("confidence", 0.5),
confidence_factors=confidence_factors,
cost_metadata=CostMetadata(
tokens_used=total_tokens,
iterations_run=iterations,
wall_time_sec=wall_time,
budget_exhausted=budget_exhausted,
model_id=self.model_id,
),
trace_id=trace.trace_id,
)
except Exception as e:
trace.log_step(
"synthesis_build_error",
decision=f"Failed to build ResearchResult: {e}",
)
return self._fallback_result(
question, evidence, trace, total_tokens, iterations,
wall_time, budget_exhausted,
)
def _fallback_result(
self,
question: str,
evidence: list[dict],
trace: TraceLogger,
total_tokens: int,
iterations: int,
wall_time: float,
budget_exhausted: bool,
) -> ResearchResult:
"""Produce a minimal valid ResearchResult when synthesis fails."""
return ResearchResult(
answer=f"Research on '{question}' completed but synthesis failed. {len(evidence)} sources were gathered.",
citations=[],
gaps=[
Gap(
topic="synthesis",
category=GapCategory.BUDGET_EXHAUSTED
if budget_exhausted
else GapCategory.SOURCE_NOT_FOUND,
detail="The synthesis step failed to produce structured output.",
)
],
discovery_events=[],
confidence=0.1,
confidence_factors=ConfidenceFactors(
num_corroborating_sources=0,
source_authority="low",
contradiction_detected=False,
query_specificity_match=0.0,
budget_exhausted=budget_exhausted,
recency=None,
),
cost_metadata=CostMetadata(
tokens_used=total_tokens,
iterations_run=iterations,
wall_time_sec=wall_time,
budget_exhausted=budget_exhausted,
model_id=self.model_id,
),
trace_id=trace.trace_id,
)
def _format_search_results(results: list[SearchResult]) -> str:
"""Format search results as readable text for the LLM."""
parts = []
for i, r in enumerate(results, 1):
content = r.raw_content or r.content
parts.append(
f"Result {i}:\n"
f" Title: {r.title}\n"
f" URL: {r.url}\n"
f" Relevance: {r.score:.2f}\n"
f" Content: {content[:2000]}\n"
)
return "\n".join(parts) if parts else "No results found."