"""Web researcher agent — the inner agentic loop. Takes a question, runs a plan→search→fetch→iterate→synthesize loop using Claude as the reasoning engine and Tavily/httpx as tools. Returns a ResearchResult conforming to the v1 contract. """ import asyncio import json import time from typing import Optional from anthropic import Anthropic from researchers.web.models import ( Citation, ConfidenceFactors, CostMetadata, DiscoveryEvent, Gap, GapCategory, OpenQuestion, ResearchConstraints, ResearchResult, ) from researchers.web.tools import SearchResult, fetch_url, tavily_search from researchers.web.trace import TraceLogger SYSTEM_PROMPT = """\ You are a Marchwarden — a research specialist stationed at the frontier of knowledge. \ Your job is to investigate a question thoroughly using web search and URL fetching, \ then produce a grounded, evidence-based answer. ## Your process 1. **Plan**: Decide what to search for. Break complex questions into sub-queries. 2. **Search**: Use the web_search tool to find relevant sources. 3. **Fetch**: Use the fetch_url tool to get full content from promising URLs. 4. **Iterate**: If you don't have enough evidence, search again with refined queries. 5. **Stop**: When you have sufficient evidence OR you've exhausted your budget. ## Rules - Every claim must be traceable to a source you actually fetched. - If you can't find information, say so — never fabricate. - If sources contradict each other, note the contradiction. - If the question requires expertise outside web search (academic papers, databases, \ legal documents), note it as a discovery for another researcher. - Be efficient. Don't fetch URLs that are clearly irrelevant from their title/snippet. - Prefer authoritative sources (.gov, .edu, established organizations) over blogs/forums. """ SYNTHESIS_PROMPT = """\ Based on the evidence gathered, produce a structured research result as JSON. ## Evidence gathered {evidence} ## Original question {question} ## Context from caller {context} ## Instructions Produce a JSON object with these exact fields: {{ "answer": "Your synthesized answer. Every claim must trace to a citation.", "citations": [ {{ "source": "web", "locator": "the exact URL", "title": "page title", "snippet": "your 50-200 char summary of why this source is relevant", "raw_excerpt": "verbatim 100-500 char excerpt from the source that supports your claim", "confidence": 0.0-1.0 }} ], "gaps": [ {{ "topic": "what wasn't resolved", "category": "source_not_found|access_denied|budget_exhausted|contradictory_sources|scope_exceeded", "detail": "human-readable explanation" }} ], "discovery_events": [ {{ "type": "related_research|new_source|contradiction", "suggested_researcher": "arxiv|database|legal|null", "query": "suggested query for that researcher", "reason": "why this matters", "source_locator": "URL where you found this, or null" }} ], "open_questions": [ {{ "question": "A follow-up question that emerged from the research", "context": "What evidence prompted this question", "priority": "high|medium|low", "source_locator": "URL where this question arose, or null" }} ], "confidence": 0.0-1.0, "confidence_factors": {{ "num_corroborating_sources": 0, "source_authority": "high|medium|low", "contradiction_detected": false, "query_specificity_match": 0.0-1.0, "budget_exhausted": false, "recency": "current|recent|dated|null" }} }} Respond with ONLY the JSON object, no markdown fences, no explanation. """ # Tool definitions for Claude's tool_use API TOOLS = [ { "name": "web_search", "description": ( "Search the web for information. Returns titles, URLs, snippets, " "and sometimes full page content. Use this to find sources." ), "input_schema": { "type": "object", "properties": { "query": { "type": "string", "description": "The search query.", }, "max_results": { "type": "integer", "description": "Number of results (1-10). Default 5.", "default": 5, }, }, "required": ["query"], }, }, { "name": "fetch_url", "description": ( "Fetch the full text content of a URL. Use this when a search result " "looks promising but the snippet isn't enough. Returns extracted text." ), "input_schema": { "type": "object", "properties": { "url": { "type": "string", "description": "The URL to fetch.", }, }, "required": ["url"], }, }, ] class WebResearcher: """Agentic web researcher that searches, fetches, and synthesizes.""" def __init__( self, anthropic_api_key: str, tavily_api_key: str, model_id: str = "claude-sonnet-4-6", trace_dir: Optional[str] = None, ): self.client = Anthropic(api_key=anthropic_api_key) self.tavily_api_key = tavily_api_key self.model_id = model_id self.trace_dir = trace_dir async def research( self, question: str, context: Optional[str] = None, depth: str = "balanced", constraints: Optional[ResearchConstraints] = None, ) -> ResearchResult: """Run a full research loop on a question. Args: question: The question to investigate. context: What the caller already knows (optional). depth: "shallow", "balanced", or "deep". constraints: Budget and iteration limits. Returns: A ResearchResult conforming to the v1 contract. """ constraints = constraints or ResearchConstraints() trace = TraceLogger(trace_dir=self.trace_dir) start_time = time.time() total_tokens = 0 iterations = 0 evidence: list[dict] = [] budget_exhausted = False trace.log_step( "start", decision=f"Beginning research: depth={depth}", question=question, context=context or "", max_iterations=constraints.max_iterations, token_budget=constraints.token_budget, ) # Build initial message user_message = f"Research this question: {question}" if context: user_message += f"\n\nContext from the caller: {context}" user_message += f"\n\nResearch depth: {depth}" messages = [{"role": "user", "content": user_message}] # --- Tool-use loop --- # Budget policy: the loop honors token_budget as a soft cap. Before # starting a new iteration we check whether we've already hit the # budget; if so we stop and let synthesis run on whatever evidence # we already have. Synthesis tokens are tracked but not capped here # — the synthesis call is always allowed to complete so the caller # gets a structured result rather than a stub. while iterations < constraints.max_iterations: if total_tokens >= constraints.token_budget: budget_exhausted = True trace.log_step( "budget_exhausted", decision=( f"Token budget reached before iteration " f"{iterations + 1}: {total_tokens}/{constraints.token_budget}" ), ) break iterations += 1 trace.log_step( "iteration_start", decision=f"Starting iteration {iterations}/{constraints.max_iterations}", tokens_so_far=total_tokens, ) response = self.client.messages.create( model=self.model_id, max_tokens=4096, system=SYSTEM_PROMPT, messages=messages, tools=TOOLS, ) # Track tokens total_tokens += response.usage.input_tokens + response.usage.output_tokens # Check if the model wants to use tools tool_calls = [b for b in response.content if b.type == "tool_use"] if not tool_calls: # Model is done researching — extract any final text text_blocks = [b.text for b in response.content if b.type == "text"] if text_blocks: trace.log_step( "agent_message", decision="Agent finished tool use", message=text_blocks[0][:500], ) break # Process each tool call tool_results = [] for tool_call in tool_calls: result_content = await self._execute_tool( tool_call.name, tool_call.input, evidence, trace, constraints, ) tool_results.append( { "type": "tool_result", "tool_use_id": tool_call.id, "content": result_content, } ) # Append assistant response + tool results to conversation messages.append({"role": "assistant", "content": response.content}) messages.append({"role": "user", "content": tool_results}) # --- Synthesis step --- trace.log_step( "synthesis_start", decision="Beginning synthesis of gathered evidence", evidence_count=len(evidence), iterations_run=iterations, tokens_used=total_tokens, ) result = await self._synthesize( question=question, context=context, evidence=evidence, trace=trace, total_tokens=total_tokens, iterations=iterations, start_time=start_time, budget_exhausted=budget_exhausted, ) trace.log_step( "complete", decision="Research complete", confidence=result.confidence, citation_count=len(result.citations), gap_count=len(result.gaps), discovery_count=len(result.discovery_events), ) trace.close() return result async def _execute_tool( self, tool_name: str, tool_input: dict, evidence: list[dict], trace: TraceLogger, constraints: ResearchConstraints, ) -> str: """Execute a tool call and return the result as a string.""" if tool_name == "web_search": query = tool_input.get("query", "") max_results = min( tool_input.get("max_results", 5), constraints.max_sources, ) trace.log_step( "web_search", decision=f"Searching: {query}", query=query, max_results=max_results, ) results = tavily_search( api_key=self.tavily_api_key, query=query, max_results=max_results, ) # Store evidence for r in results: ev = { "type": "search_result", "url": r.url, "title": r.title, "content": r.content, "raw_content": r.raw_content, "content_hash": r.content_hash, "score": r.score, } evidence.append(ev) trace.log_step( "web_search_complete", decision=f"Got {len(results)} results", result_count=len(results), urls=[r.url for r in results], ) # Return results as text for the LLM return _format_search_results(results) elif tool_name == "fetch_url": url = tool_input.get("url", "") trace.log_step( "fetch_url", decision=f"Fetching: {url}", url=url, ) result = await fetch_url(url) trace.log_step( "fetch_url_complete", decision="Fetch succeeded" if result.success else f"Fetch failed: {result.error}", url=url, content_hash=result.content_hash, content_length=result.content_length, success=result.success, ) if result.success: # Store evidence evidence.append( { "type": "fetched_page", "url": url, "content": result.text[:10000], "content_hash": result.content_hash, "content_length": result.content_length, } ) # Return truncated text for the LLM return result.text[:8000] else: return f"Failed to fetch URL: {result.error}" return f"Unknown tool: {tool_name}" async def _synthesize( self, question: str, context: Optional[str], evidence: list[dict], trace: TraceLogger, total_tokens: int, iterations: int, start_time: float, budget_exhausted: bool, ) -> ResearchResult: """Ask the LLM to synthesize evidence into a ResearchResult.""" # Format evidence for the synthesis prompt evidence_text = "" for i, ev in enumerate(evidence, 1): if ev["type"] == "search_result": content = ev.get("raw_content") or ev.get("content", "") evidence_text += ( f"\n--- Source {i} (search result) ---\n" f"URL: {ev['url']}\n" f"Title: {ev['title']}\n" f"Content hash: {ev['content_hash']}\n" f"Content: {content[:3000]}\n" ) elif ev["type"] == "fetched_page": evidence_text += ( f"\n--- Source {i} (fetched page) ---\n" f"URL: {ev['url']}\n" f"Content hash: {ev['content_hash']}\n" f"Content: {ev['content'][:3000]}\n" ) prompt = SYNTHESIS_PROMPT.format( evidence=evidence_text or "(No evidence gathered)", question=question, context=context or "(No additional context)", ) response = self.client.messages.create( model=self.model_id, max_tokens=16384, messages=[{"role": "user", "content": prompt}], ) total_tokens += response.usage.input_tokens + response.usage.output_tokens wall_time = time.time() - start_time # Parse the JSON response raw_text = response.content[0].text.strip() stop_reason = response.stop_reason # Strip markdown fences if the model added them despite instructions if raw_text.startswith("```"): raw_text = raw_text.split("\n", 1)[1] if "\n" in raw_text else raw_text[3:] if raw_text.endswith("```"): raw_text = raw_text[:-3].strip() try: data = json.loads(raw_text) except json.JSONDecodeError as parse_err: trace.log_step( "synthesis_error", decision=( f"Failed to parse synthesis JSON ({parse_err}); " f"stop_reason={stop_reason}" ), stop_reason=stop_reason, parse_error=str(parse_err), raw_response=raw_text, ) return self._fallback_result( question, evidence, trace, total_tokens, iterations, wall_time, budget_exhausted, ) trace.log_step( "synthesis_complete", decision="Parsed synthesis JSON successfully", ) # Build the ResearchResult from parsed JSON try: citations = [ Citation( source=c.get("source", "web"), locator=c.get("locator", ""), title=c.get("title"), snippet=c.get("snippet"), raw_excerpt=c.get("raw_excerpt", ""), confidence=c.get("confidence", 0.5), ) for c in data.get("citations", []) ] gaps = [ Gap( topic=g.get("topic", ""), category=GapCategory(g.get("category", "source_not_found")), detail=g.get("detail", ""), ) for g in data.get("gaps", []) ] discovery_events = [ DiscoveryEvent( type=d.get("type", "related_research"), suggested_researcher=d.get("suggested_researcher"), query=d.get("query", ""), reason=d.get("reason", ""), source_locator=d.get("source_locator"), ) for d in data.get("discovery_events", []) ] open_questions = [ OpenQuestion( question=q.get("question", ""), context=q.get("context", ""), priority=q.get("priority", "medium"), source_locator=q.get("source_locator"), ) for q in data.get("open_questions", []) ] cf = data.get("confidence_factors", {}) confidence_factors = ConfidenceFactors( num_corroborating_sources=cf.get("num_corroborating_sources", 0), source_authority=cf.get("source_authority", "low"), contradiction_detected=cf.get("contradiction_detected", False), query_specificity_match=cf.get("query_specificity_match", 0.5), budget_exhausted=budget_exhausted or cf.get("budget_exhausted", False), recency=cf.get("recency"), ) return ResearchResult( answer=data.get("answer", "No answer could be synthesized."), citations=citations, gaps=gaps, discovery_events=discovery_events, open_questions=open_questions, confidence=data.get("confidence", 0.5), confidence_factors=confidence_factors, cost_metadata=CostMetadata( tokens_used=total_tokens, iterations_run=iterations, wall_time_sec=wall_time, budget_exhausted=budget_exhausted, model_id=self.model_id, ), trace_id=trace.trace_id, ) except Exception as e: trace.log_step( "synthesis_build_error", decision=f"Failed to build ResearchResult: {e}", ) return self._fallback_result( question, evidence, trace, total_tokens, iterations, wall_time, budget_exhausted, ) def _fallback_result( self, question: str, evidence: list[dict], trace: TraceLogger, total_tokens: int, iterations: int, wall_time: float, budget_exhausted: bool, ) -> ResearchResult: """Produce a minimal valid ResearchResult when synthesis fails.""" return ResearchResult( answer=f"Research on '{question}' completed but synthesis failed. {len(evidence)} sources were gathered.", citations=[], gaps=[ Gap( topic="synthesis", category=GapCategory.BUDGET_EXHAUSTED if budget_exhausted else GapCategory.SOURCE_NOT_FOUND, detail="The synthesis step failed to produce structured output.", ) ], discovery_events=[], confidence=0.1, confidence_factors=ConfidenceFactors( num_corroborating_sources=0, source_authority="low", contradiction_detected=False, query_specificity_match=0.0, budget_exhausted=budget_exhausted, recency=None, ), cost_metadata=CostMetadata( tokens_used=total_tokens, iterations_run=iterations, wall_time_sec=wall_time, budget_exhausted=budget_exhausted, model_id=self.model_id, ), trace_id=trace.trace_id, ) def _format_search_results(results: list[SearchResult]) -> str: """Format search results as readable text for the LLM.""" parts = [] for i, r in enumerate(results, 1): content = r.raw_content or r.content parts.append( f"Result {i}:\n" f" Title: {r.title}\n" f" URL: {r.url}\n" f" Relevance: {r.score:.2f}\n" f" Content: {content[:2000]}\n" ) return "\n".join(parts) if parts else "No results found."