M1.3: Inner agent loop #5

Merged
archeious merged 1 commit from feat/agent-loop into main 2026-04-08 20:29:41 +00:00
2 changed files with 967 additions and 0 deletions
Showing only changes of commit 7cb3fde90e - Show all commits

601
researchers/web/agent.py Normal file
View file

@ -0,0 +1,601 @@
"""Web researcher agent — the inner agentic loop.
Takes a question, runs a plansearchfetchiteratesynthesize loop
using Claude as the reasoning engine and Tavily/httpx as tools.
Returns a ResearchResult conforming to the v1 contract.
"""
import asyncio
import json
import time
from typing import Optional
from anthropic import Anthropic
from researchers.web.models import (
Citation,
ConfidenceFactors,
CostMetadata,
DiscoveryEvent,
Gap,
GapCategory,
ResearchConstraints,
ResearchResult,
)
from researchers.web.tools import SearchResult, fetch_url, tavily_search
from researchers.web.trace import TraceLogger
SYSTEM_PROMPT = """\
You are a Marchwarden a research specialist stationed at the frontier of knowledge. \
Your job is to investigate a question thoroughly using web search and URL fetching, \
then produce a grounded, evidence-based answer.
## Your process
1. **Plan**: Decide what to search for. Break complex questions into sub-queries.
2. **Search**: Use the web_search tool to find relevant sources.
3. **Fetch**: Use the fetch_url tool to get full content from promising URLs.
4. **Iterate**: If you don't have enough evidence, search again with refined queries.
5. **Stop**: When you have sufficient evidence OR you've exhausted your budget.
## Rules
- Every claim must be traceable to a source you actually fetched.
- If you can't find information, say so — never fabricate.
- If sources contradict each other, note the contradiction.
- If the question requires expertise outside web search (academic papers, databases, \
legal documents), note it as a discovery for another researcher.
- Be efficient. Don't fetch URLs that are clearly irrelevant from their title/snippet.
- Prefer authoritative sources (.gov, .edu, established organizations) over blogs/forums.
"""
SYNTHESIS_PROMPT = """\
Based on the evidence gathered, produce a structured research result as JSON.
## Evidence gathered
{evidence}
## Original question
{question}
## Context from caller
{context}
## Instructions
Produce a JSON object with these exact fields:
{{
"answer": "Your synthesized answer. Every claim must trace to a citation.",
"citations": [
{{
"source": "web",
"locator": "the exact URL",
"title": "page title",
"snippet": "your 50-200 char summary of why this source is relevant",
"raw_excerpt": "verbatim 100-500 char excerpt from the source that supports your claim",
"confidence": 0.0-1.0
}}
],
"gaps": [
{{
"topic": "what wasn't resolved",
"category": "source_not_found|access_denied|budget_exhausted|contradictory_sources|scope_exceeded",
"detail": "human-readable explanation"
}}
],
"discovery_events": [
{{
"type": "related_research|new_source|contradiction",
"suggested_researcher": "arxiv|database|legal|null",
"query": "suggested query for that researcher",
"reason": "why this matters",
"source_locator": "URL where you found this, or null"
}}
],
"confidence": 0.0-1.0,
"confidence_factors": {{
"num_corroborating_sources": 0,
"source_authority": "high|medium|low",
"contradiction_detected": false,
"query_specificity_match": 0.0-1.0,
"budget_exhausted": false,
"recency": "current|recent|dated|null"
}}
}}
Respond with ONLY the JSON object, no markdown fences, no explanation.
"""
# Tool definitions for Claude's tool_use API
TOOLS = [
{
"name": "web_search",
"description": (
"Search the web for information. Returns titles, URLs, snippets, "
"and sometimes full page content. Use this to find sources."
),
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query.",
},
"max_results": {
"type": "integer",
"description": "Number of results (1-10). Default 5.",
"default": 5,
},
},
"required": ["query"],
},
},
{
"name": "fetch_url",
"description": (
"Fetch the full text content of a URL. Use this when a search result "
"looks promising but the snippet isn't enough. Returns extracted text."
),
"input_schema": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch.",
},
},
"required": ["url"],
},
},
]
class WebResearcher:
"""Agentic web researcher that searches, fetches, and synthesizes."""
def __init__(
self,
anthropic_api_key: str,
tavily_api_key: str,
model_id: str = "claude-sonnet-4-5-20250514",
trace_dir: Optional[str] = None,
):
self.client = Anthropic(api_key=anthropic_api_key)
self.tavily_api_key = tavily_api_key
self.model_id = model_id
self.trace_dir = trace_dir
async def research(
self,
question: str,
context: Optional[str] = None,
depth: str = "balanced",
constraints: Optional[ResearchConstraints] = None,
) -> ResearchResult:
"""Run a full research loop on a question.
Args:
question: The question to investigate.
context: What the caller already knows (optional).
depth: "shallow", "balanced", or "deep".
constraints: Budget and iteration limits.
Returns:
A ResearchResult conforming to the v1 contract.
"""
constraints = constraints or ResearchConstraints()
trace = TraceLogger(trace_dir=self.trace_dir)
start_time = time.time()
total_tokens = 0
iterations = 0
evidence: list[dict] = []
budget_exhausted = False
trace.log_step(
"start",
decision=f"Beginning research: depth={depth}",
question=question,
context=context or "",
max_iterations=constraints.max_iterations,
token_budget=constraints.token_budget,
)
# Build initial message
user_message = f"Research this question: {question}"
if context:
user_message += f"\n\nContext from the caller: {context}"
user_message += f"\n\nResearch depth: {depth}"
messages = [{"role": "user", "content": user_message}]
# --- Tool-use loop ---
while iterations < constraints.max_iterations:
iterations += 1
trace.log_step(
"iteration_start",
decision=f"Starting iteration {iterations}/{constraints.max_iterations}",
tokens_so_far=total_tokens,
)
response = self.client.messages.create(
model=self.model_id,
max_tokens=4096,
system=SYSTEM_PROMPT,
messages=messages,
tools=TOOLS,
)
# Track tokens
total_tokens += response.usage.input_tokens + response.usage.output_tokens
# Check if the model wants to use tools
tool_calls = [b for b in response.content if b.type == "tool_use"]
if not tool_calls:
# Model is done researching — extract any final text
text_blocks = [b.text for b in response.content if b.type == "text"]
if text_blocks:
trace.log_step(
"agent_message",
decision="Agent finished tool use",
message=text_blocks[0][:500],
)
break
# Process each tool call
tool_results = []
for tool_call in tool_calls:
result_content = await self._execute_tool(
tool_call.name,
tool_call.input,
evidence,
trace,
constraints,
)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tool_call.id,
"content": result_content,
}
)
# Append assistant response + tool results to conversation
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
# Check token budget
if total_tokens >= constraints.token_budget:
budget_exhausted = True
trace.log_step(
"budget_exhausted",
decision=f"Token budget reached: {total_tokens}/{constraints.token_budget}",
)
break
# --- Synthesis step ---
trace.log_step(
"synthesis_start",
decision="Beginning synthesis of gathered evidence",
evidence_count=len(evidence),
iterations_run=iterations,
tokens_used=total_tokens,
)
result = await self._synthesize(
question=question,
context=context,
evidence=evidence,
trace=trace,
total_tokens=total_tokens,
iterations=iterations,
start_time=start_time,
budget_exhausted=budget_exhausted,
)
trace.log_step(
"complete",
decision="Research complete",
confidence=result.confidence,
citation_count=len(result.citations),
gap_count=len(result.gaps),
discovery_count=len(result.discovery_events),
)
trace.close()
return result
async def _execute_tool(
self,
tool_name: str,
tool_input: dict,
evidence: list[dict],
trace: TraceLogger,
constraints: ResearchConstraints,
) -> str:
"""Execute a tool call and return the result as a string."""
if tool_name == "web_search":
query = tool_input.get("query", "")
max_results = min(
tool_input.get("max_results", 5),
constraints.max_sources,
)
trace.log_step(
"web_search",
decision=f"Searching: {query}",
query=query,
max_results=max_results,
)
results = tavily_search(
api_key=self.tavily_api_key,
query=query,
max_results=max_results,
)
# Store evidence
for r in results:
ev = {
"type": "search_result",
"url": r.url,
"title": r.title,
"content": r.content,
"raw_content": r.raw_content,
"content_hash": r.content_hash,
"score": r.score,
}
evidence.append(ev)
trace.log_step(
"web_search_complete",
decision=f"Got {len(results)} results",
result_count=len(results),
urls=[r.url for r in results],
)
# Return results as text for the LLM
return _format_search_results(results)
elif tool_name == "fetch_url":
url = tool_input.get("url", "")
trace.log_step(
"fetch_url",
decision=f"Fetching: {url}",
url=url,
)
result = await fetch_url(url)
trace.log_step(
"fetch_url_complete",
decision="Fetch succeeded" if result.success else f"Fetch failed: {result.error}",
url=url,
content_hash=result.content_hash,
content_length=result.content_length,
success=result.success,
)
if result.success:
# Store evidence
evidence.append(
{
"type": "fetched_page",
"url": url,
"content": result.text[:10000],
"content_hash": result.content_hash,
"content_length": result.content_length,
}
)
# Return truncated text for the LLM
return result.text[:8000]
else:
return f"Failed to fetch URL: {result.error}"
return f"Unknown tool: {tool_name}"
async def _synthesize(
self,
question: str,
context: Optional[str],
evidence: list[dict],
trace: TraceLogger,
total_tokens: int,
iterations: int,
start_time: float,
budget_exhausted: bool,
) -> ResearchResult:
"""Ask the LLM to synthesize evidence into a ResearchResult."""
# Format evidence for the synthesis prompt
evidence_text = ""
for i, ev in enumerate(evidence, 1):
if ev["type"] == "search_result":
content = ev.get("raw_content") or ev.get("content", "")
evidence_text += (
f"\n--- Source {i} (search result) ---\n"
f"URL: {ev['url']}\n"
f"Title: {ev['title']}\n"
f"Content hash: {ev['content_hash']}\n"
f"Content: {content[:3000]}\n"
)
elif ev["type"] == "fetched_page":
evidence_text += (
f"\n--- Source {i} (fetched page) ---\n"
f"URL: {ev['url']}\n"
f"Content hash: {ev['content_hash']}\n"
f"Content: {ev['content'][:3000]}\n"
)
prompt = SYNTHESIS_PROMPT.format(
evidence=evidence_text or "(No evidence gathered)",
question=question,
context=context or "(No additional context)",
)
response = self.client.messages.create(
model=self.model_id,
max_tokens=4096,
messages=[{"role": "user", "content": prompt}],
)
total_tokens += response.usage.input_tokens + response.usage.output_tokens
wall_time = time.time() - start_time
# Parse the JSON response
raw_text = response.content[0].text.strip()
# Strip markdown fences if the model added them despite instructions
if raw_text.startswith("```"):
raw_text = raw_text.split("\n", 1)[1] if "\n" in raw_text else raw_text[3:]
if raw_text.endswith("```"):
raw_text = raw_text[:-3].strip()
try:
data = json.loads(raw_text)
except json.JSONDecodeError:
trace.log_step(
"synthesis_error",
decision="Failed to parse synthesis JSON, returning fallback",
raw_response=raw_text[:1000],
)
return self._fallback_result(
question, evidence, trace, total_tokens, iterations,
wall_time, budget_exhausted,
)
trace.log_step(
"synthesis_complete",
decision="Parsed synthesis JSON successfully",
)
# Build the ResearchResult from parsed JSON
try:
citations = [
Citation(
source=c.get("source", "web"),
locator=c.get("locator", ""),
title=c.get("title"),
snippet=c.get("snippet"),
raw_excerpt=c.get("raw_excerpt", ""),
confidence=c.get("confidence", 0.5),
)
for c in data.get("citations", [])
]
gaps = [
Gap(
topic=g.get("topic", ""),
category=GapCategory(g.get("category", "source_not_found")),
detail=g.get("detail", ""),
)
for g in data.get("gaps", [])
]
discovery_events = [
DiscoveryEvent(
type=d.get("type", "related_research"),
suggested_researcher=d.get("suggested_researcher"),
query=d.get("query", ""),
reason=d.get("reason", ""),
source_locator=d.get("source_locator"),
)
for d in data.get("discovery_events", [])
]
cf = data.get("confidence_factors", {})
confidence_factors = ConfidenceFactors(
num_corroborating_sources=cf.get("num_corroborating_sources", 0),
source_authority=cf.get("source_authority", "low"),
contradiction_detected=cf.get("contradiction_detected", False),
query_specificity_match=cf.get("query_specificity_match", 0.5),
budget_exhausted=budget_exhausted or cf.get("budget_exhausted", False),
recency=cf.get("recency"),
)
return ResearchResult(
answer=data.get("answer", "No answer could be synthesized."),
citations=citations,
gaps=gaps,
discovery_events=discovery_events,
confidence=data.get("confidence", 0.5),
confidence_factors=confidence_factors,
cost_metadata=CostMetadata(
tokens_used=total_tokens,
iterations_run=iterations,
wall_time_sec=wall_time,
budget_exhausted=budget_exhausted,
model_id=self.model_id,
),
trace_id=trace.trace_id,
)
except Exception as e:
trace.log_step(
"synthesis_build_error",
decision=f"Failed to build ResearchResult: {e}",
)
return self._fallback_result(
question, evidence, trace, total_tokens, iterations,
wall_time, budget_exhausted,
)
def _fallback_result(
self,
question: str,
evidence: list[dict],
trace: TraceLogger,
total_tokens: int,
iterations: int,
wall_time: float,
budget_exhausted: bool,
) -> ResearchResult:
"""Produce a minimal valid ResearchResult when synthesis fails."""
return ResearchResult(
answer=f"Research on '{question}' completed but synthesis failed. {len(evidence)} sources were gathered.",
citations=[],
gaps=[
Gap(
topic="synthesis",
category=GapCategory.BUDGET_EXHAUSTED
if budget_exhausted
else GapCategory.SOURCE_NOT_FOUND,
detail="The synthesis step failed to produce structured output.",
)
],
discovery_events=[],
confidence=0.1,
confidence_factors=ConfidenceFactors(
num_corroborating_sources=0,
source_authority="low",
contradiction_detected=False,
query_specificity_match=0.0,
budget_exhausted=budget_exhausted,
recency=None,
),
cost_metadata=CostMetadata(
tokens_used=total_tokens,
iterations_run=iterations,
wall_time_sec=wall_time,
budget_exhausted=budget_exhausted,
model_id=self.model_id,
),
trace_id=trace.trace_id,
)
def _format_search_results(results: list[SearchResult]) -> str:
"""Format search results as readable text for the LLM."""
parts = []
for i, r in enumerate(results, 1):
content = r.raw_content or r.content
parts.append(
f"Result {i}:\n"
f" Title: {r.title}\n"
f" URL: {r.url}\n"
f" Relevance: {r.score:.2f}\n"
f" Content: {content[:2000]}\n"
)
return "\n".join(parts) if parts else "No results found."

366
tests/test_agent.py Normal file
View file

@ -0,0 +1,366 @@
"""Tests for the web researcher agent."""
import json
import tempfile
from types import SimpleNamespace
from unittest.mock import MagicMock, patch, AsyncMock
import pytest
from researchers.web.agent import WebResearcher, _format_search_results
from researchers.web.models import ResearchConstraints, ResearchResult
from researchers.web.tools import SearchResult
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_anthropic_response(content_blocks, input_tokens=100, output_tokens=200):
"""Build a mock Anthropic messages.create response."""
resp = MagicMock()
resp.content = content_blocks
resp.usage = SimpleNamespace(input_tokens=input_tokens, output_tokens=output_tokens)
return resp
def _text_block(text):
block = MagicMock()
block.type = "text"
block.text = text
return block
def _tool_use_block(name, tool_input, tool_id="tool_1"):
block = MagicMock()
block.type = "tool_use"
block.name = name
block.input = tool_input
block.id = tool_id
return block
VALID_SYNTHESIS_JSON = json.dumps(
{
"answer": "Utah is ideal for cool-season crops at high elevation.",
"citations": [
{
"source": "web",
"locator": "https://example.com/utah-crops",
"title": "Utah Crop Guide",
"snippet": "Cool-season crops thrive above 7000 ft.",
"raw_excerpt": "In Utah's high-elevation gardens, cool-season vegetables such as peas, lettuce, and potatoes consistently outperform warm-season crops.",
"confidence": 0.9,
}
],
"gaps": [
{
"topic": "pest management",
"category": "source_not_found",
"detail": "No pest data found.",
}
],
"discovery_events": [
{
"type": "related_research",
"suggested_researcher": "database",
"query": "Utah soil salinity data",
"reason": "Multiple sources reference USU studies",
"source_locator": "https://example.com/ref",
}
],
"confidence": 0.82,
"confidence_factors": {
"num_corroborating_sources": 3,
"source_authority": "high",
"contradiction_detected": False,
"query_specificity_match": 0.85,
"budget_exhausted": False,
"recency": "current",
},
}
)
# ---------------------------------------------------------------------------
# _format_search_results
# ---------------------------------------------------------------------------
class TestFormatSearchResults:
def test_formats_results(self):
results = [
SearchResult(
url="https://example.com",
title="Test",
content="Short summary",
raw_content="Full text here",
score=0.95,
content_hash="sha256:abc",
)
]
text = _format_search_results(results)
assert "Test" in text
assert "https://example.com" in text
assert "0.95" in text
assert "Full text here" in text
def test_prefers_raw_content(self):
results = [
SearchResult(
url="https://example.com",
title="Test",
content="Short",
raw_content="Much longer raw content",
score=0.9,
content_hash="sha256:abc",
)
]
text = _format_search_results(results)
assert "Much longer raw content" in text
def test_falls_back_to_content(self):
results = [
SearchResult(
url="https://example.com",
title="Test",
content="Only short content",
raw_content=None,
score=0.9,
content_hash="sha256:abc",
)
]
text = _format_search_results(results)
assert "Only short content" in text
def test_empty_results(self):
assert "No results" in _format_search_results([])
# ---------------------------------------------------------------------------
# WebResearcher — mocked tool loop
# ---------------------------------------------------------------------------
class TestWebResearcher:
@pytest.mark.asyncio
async def test_simple_research_loop(self):
"""Test a complete loop: one search → LLM stops → synthesis."""
with tempfile.TemporaryDirectory() as tmp:
researcher = WebResearcher(
anthropic_api_key="fake",
tavily_api_key="fake",
model_id="claude-test",
trace_dir=tmp,
)
# First call: LLM requests a web_search
search_response = _make_anthropic_response(
[_tool_use_block("web_search", {"query": "Utah crops"})],
)
# Second call: LLM is done (text only, no tools)
done_response = _make_anthropic_response(
[_text_block("I have enough information.")],
)
# Third call: synthesis
synthesis_response = _make_anthropic_response(
[_text_block(VALID_SYNTHESIS_JSON)],
)
researcher.client.messages.create = MagicMock(
side_effect=[search_response, done_response, synthesis_response]
)
with patch("researchers.web.agent.tavily_search") as mock_search:
mock_search.return_value = [
SearchResult(
url="https://example.com/utah",
title="Utah Gardening",
content="Cool-season crops work well.",
raw_content="Full content about Utah gardening.",
score=0.95,
content_hash="sha256:abc123",
)
]
result = await researcher.research(
"What are ideal crops for Utah?",
constraints=ResearchConstraints(max_iterations=3),
)
assert isinstance(result, ResearchResult)
assert "Utah" in result.answer
assert len(result.citations) == 1
assert result.citations[0].locator == "https://example.com/utah-crops"
assert result.citations[0].raw_excerpt.startswith("In Utah")
assert len(result.gaps) == 1
assert result.gaps[0].category == "source_not_found"
assert len(result.discovery_events) == 1
assert result.confidence == 0.82
assert result.confidence_factors.num_corroborating_sources == 3
assert result.cost_metadata.model_id == "claude-test"
assert result.cost_metadata.tokens_used > 0
assert result.trace_id is not None
@pytest.mark.asyncio
async def test_budget_exhaustion(self):
"""Test that the loop stops when token budget is reached."""
with tempfile.TemporaryDirectory() as tmp:
researcher = WebResearcher(
anthropic_api_key="fake",
tavily_api_key="fake",
model_id="claude-test",
trace_dir=tmp,
)
# Each response uses 600 tokens — budget is 1000
search_response = _make_anthropic_response(
[_tool_use_block("web_search", {"query": "test"}, "t1")],
input_tokens=400,
output_tokens=200,
)
# Second search pushes over budget (600 + 600 = 1200 > 1000)
search_response_2 = _make_anthropic_response(
[_tool_use_block("web_search", {"query": "test2"}, "t2")],
input_tokens=400,
output_tokens=200,
)
synthesis_response = _make_anthropic_response(
[_text_block(VALID_SYNTHESIS_JSON)],
input_tokens=200,
output_tokens=100,
)
researcher.client.messages.create = MagicMock(
side_effect=[search_response, search_response_2, synthesis_response]
)
with patch("researchers.web.agent.tavily_search") as mock_search:
mock_search.return_value = [
SearchResult(
url="https://example.com",
title="Test",
content="Content",
raw_content=None,
score=0.9,
content_hash="sha256:abc",
)
]
result = await researcher.research(
"test question",
constraints=ResearchConstraints(
max_iterations=5,
token_budget=1000,
),
)
assert result.cost_metadata.budget_exhausted is True
@pytest.mark.asyncio
async def test_synthesis_failure_returns_fallback(self):
"""If synthesis JSON is unparseable, return a valid fallback."""
with tempfile.TemporaryDirectory() as tmp:
researcher = WebResearcher(
anthropic_api_key="fake",
tavily_api_key="fake",
model_id="claude-test",
trace_dir=tmp,
)
# LLM immediately stops (no tools)
done_response = _make_anthropic_response(
[_text_block("Nothing to search.")],
)
# Synthesis returns garbage
bad_synthesis = _make_anthropic_response(
[_text_block("This is not valid JSON at all!!!")],
)
researcher.client.messages.create = MagicMock(
side_effect=[done_response, bad_synthesis]
)
result = await researcher.research("test question")
assert isinstance(result, ResearchResult)
assert "synthesis failed" in result.answer.lower()
assert result.confidence == 0.1
assert len(result.gaps) == 1
@pytest.mark.asyncio
async def test_trace_file_created(self):
"""Verify trace file is created and has entries."""
with tempfile.TemporaryDirectory() as tmp:
researcher = WebResearcher(
anthropic_api_key="fake",
tavily_api_key="fake",
model_id="claude-test",
trace_dir=tmp,
)
done_response = _make_anthropic_response(
[_text_block("Done.")],
)
synthesis_response = _make_anthropic_response(
[_text_block(VALID_SYNTHESIS_JSON)],
)
researcher.client.messages.create = MagicMock(
side_effect=[done_response, synthesis_response]
)
result = await researcher.research("test")
# Check trace file exists
from researchers.web.trace import TraceLogger
trace = TraceLogger(trace_id=result.trace_id, trace_dir=tmp)
entries = trace.read_entries()
assert len(entries) >= 3 # start, iteration_start, synthesis, complete
assert entries[0]["action"] == "start"
actions = [e["action"] for e in entries]
assert "complete" in actions
@pytest.mark.asyncio
async def test_fetch_url_tool(self):
"""Test that fetch_url tool calls work in the loop."""
with tempfile.TemporaryDirectory() as tmp:
researcher = WebResearcher(
anthropic_api_key="fake",
tavily_api_key="fake",
model_id="claude-test",
trace_dir=tmp,
)
# LLM requests fetch_url
fetch_response = _make_anthropic_response(
[_tool_use_block("fetch_url", {"url": "https://example.com/page"})],
)
done_response = _make_anthropic_response(
[_text_block("Got it.")],
)
synthesis_response = _make_anthropic_response(
[_text_block(VALID_SYNTHESIS_JSON)],
)
researcher.client.messages.create = MagicMock(
side_effect=[fetch_response, done_response, synthesis_response]
)
with patch("researchers.web.agent.fetch_url") as mock_fetch:
from researchers.web.tools import FetchResult
mock_fetch.return_value = FetchResult(
url="https://example.com/page",
text="Fetched page content about Utah gardening.",
content_hash="sha256:def456",
content_length=42,
success=True,
)
result = await researcher.research("test question")
assert isinstance(result, ResearchResult)
mock_fetch.assert_called_once_with("https://example.com/page")