The planner sees basename(target) in the tree output (e.g. "luminos_lib") and uses that as the path in its plan. But _apply_plan() mapped the target root to "." via os.path.relpath(), so the planner's path never matched and the allocation was silently dropped. Fix: register both "." and basename(target) as aliases for the target root in the lookup table. Also log a warning when plan paths don't match any known directory, so future mismatches are visible. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2060 lines
68 KiB
Python
2060 lines
68 KiB
Python
"""AI-powered directory analysis using a multi-pass, cache-driven agent loop.
|
||
|
||
Architecture:
|
||
1. Discover all directories under the target
|
||
2. Sort leaves-first (deepest directories first)
|
||
3. Run an isolated agent loop per directory (max 10 turns each)
|
||
4. Cache every file and directory summary to disk
|
||
5. Run a final synthesis pass reading only directory cache entries
|
||
|
||
Uses the Anthropic SDK for streaming, automatic retries, and token counting.
|
||
Uses tree-sitter for AST parsing and python-magic for file classification.
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
from collections import namedtuple
|
||
from datetime import datetime, timezone
|
||
|
||
import anthropic
|
||
import magic
|
||
from luminos_lib.ast_parser import parse_structure
|
||
from luminos_lib.cache import _CacheManager, _get_investigation_id
|
||
from luminos_lib.prompts import (
|
||
_DIR_SYSTEM_PROMPT,
|
||
_PLANNING_SYSTEM_PROMPT,
|
||
_SURVEY_SYSTEM_PROMPT,
|
||
_SYNTHESIS_SYSTEM_PROMPT,
|
||
)
|
||
from luminos_lib.tree import build_tree, render_tree
|
||
|
||
MODEL = "claude-sonnet-4-20250514"
|
||
|
||
# Context budget: trigger early exit when a single API call's input_tokens
|
||
# (the actual size of the context window in use, NOT the cumulative sum
|
||
# across turns) approaches the model's real context limit. Sonnet 4 has
|
||
# a 200k context window; we leave a 30% safety margin for the response
|
||
# and any tool result we're about to append.
|
||
MAX_CONTEXT = 200_000
|
||
CONTEXT_BUDGET = int(MAX_CONTEXT * 0.70)
|
||
|
||
# Pricing per 1M tokens (Claude Sonnet).
|
||
INPUT_PRICE_PER_M = 3.00
|
||
OUTPUT_PRICE_PER_M = 15.00
|
||
|
||
# Directories to always skip during investigation.
|
||
_SKIP_DIRS = {
|
||
".git", "__pycache__", "node_modules", ".tox", ".mypy_cache",
|
||
".pytest_cache", ".venv", "venv", ".env", "dist", "build",
|
||
".eggs", "*.egg-info", ".svn", ".hg",
|
||
}
|
||
|
||
# Commands the run_command tool is allowed to execute.
|
||
_COMMAND_WHITELIST = {"wc", "file", "grep", "head", "tail", "stat", "du", "find"}
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _get_api_key():
|
||
"""Read the Anthropic API key from the environment."""
|
||
key = os.environ.get("ANTHROPIC_API_KEY", "")
|
||
if not key:
|
||
print("Warning: ANTHROPIC_API_KEY not set. Skipping AI analysis.",
|
||
file=sys.stderr)
|
||
return key
|
||
|
||
|
||
def _path_is_safe(path, target):
|
||
"""Return True if *path* resolves to somewhere inside *target*."""
|
||
real = os.path.realpath(path)
|
||
target_real = os.path.realpath(target)
|
||
return real == target_real or real.startswith(target_real + os.sep)
|
||
|
||
|
||
def _now_iso():
|
||
return datetime.now(timezone.utc).isoformat()
|
||
|
||
|
||
def _should_skip_dir(name):
|
||
"""Return True if a directory name matches the skip list."""
|
||
if name in _SKIP_DIRS:
|
||
return True
|
||
for pattern in _SKIP_DIRS:
|
||
if pattern.startswith("*") and name.endswith(pattern[1:]):
|
||
return True
|
||
return False
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Token tracker
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class _TokenTracker:
|
||
"""Track token usage across API calls.
|
||
|
||
Two distinct quantities are tracked:
|
||
- cumulative totals (total_*, loop_*) — for cost reporting
|
||
- last_input — the size of the context window on the most recent
|
||
call, used to detect approaching the model's context limit
|
||
|
||
Cumulative input is NOT a meaningful proxy for context size: each
|
||
turn's input_tokens already includes the full message history, so
|
||
summing across turns double-counts everything. Use last_input for
|
||
budget decisions, totals for billing. (See #44.)
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.total_input = 0
|
||
self.total_output = 0
|
||
self.loop_input = 0
|
||
self.loop_output = 0
|
||
self.last_input = 0
|
||
self._loop_turns = 0
|
||
|
||
def record(self, usage):
|
||
"""Record usage from a single API call."""
|
||
inp = getattr(usage, "input_tokens", 0)
|
||
out = getattr(usage, "output_tokens", 0)
|
||
self.total_input += inp
|
||
self.total_output += out
|
||
self.loop_input += inp
|
||
self.loop_output += out
|
||
self.last_input = inp
|
||
self._loop_turns += 1
|
||
|
||
def reset_loop(self):
|
||
"""Reset per-loop counters (called between directory loops)."""
|
||
self.loop_input = 0
|
||
self.loop_output = 0
|
||
self.last_input = 0
|
||
self._loop_turns = 0
|
||
|
||
@property
|
||
def loop_total(self):
|
||
return self.loop_input + self.loop_output
|
||
|
||
def budget_exceeded(self):
|
||
"""True when the most recent call's context exceeded the budget."""
|
||
return self.last_input > CONTEXT_BUDGET
|
||
|
||
def summary(self):
|
||
cost_in = self.total_input * INPUT_PRICE_PER_M / 1_000_000
|
||
cost_out = self.total_output * OUTPUT_PRICE_PER_M / 1_000_000
|
||
cost = cost_in + cost_out
|
||
return (f"{self.total_input:,} input / {self.total_output:,} output "
|
||
f"(approx ${cost:.2f})")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Tool definitions
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Tool registry
|
||
#
|
||
# Tools are declared once via register_tool() at the bottom of the tool
|
||
# implementations section. Each registration lands its schema in one or
|
||
# more scope lists (_DIR_TOOLS / _SYNTHESIS_TOOLS / _SURVEY_TOOLS) and
|
||
# its handler in _TOOL_DISPATCH (used by _execute_tool()).
|
||
#
|
||
# Tools intercepted by the loop body — submit_report and submit_survey —
|
||
# register their schema only and have no handler entry.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_DIR_TOOLS = []
|
||
_SYNTHESIS_TOOLS = []
|
||
_SURVEY_TOOLS = []
|
||
_PLANNING_TOOLS = []
|
||
_TOOL_DISPATCH = {}
|
||
|
||
_TOOL_REGISTRIES = {
|
||
"dir": _DIR_TOOLS,
|
||
"synthesis": _SYNTHESIS_TOOLS,
|
||
"survey": _SURVEY_TOOLS,
|
||
"planning": _PLANNING_TOOLS,
|
||
}
|
||
|
||
|
||
def register_tool(name, description, schema, scopes, handler=None):
|
||
"""Register a tool's schema in one or more loop scopes and its handler.
|
||
|
||
A single tool can be registered multiple times with different schemas
|
||
in different scopes (submit_report has different schemas for the dir
|
||
and synthesis loops). The handler is global — pass handler= once and
|
||
omit it on subsequent registrations under the same name.
|
||
"""
|
||
schema_entry = {
|
||
"name": name,
|
||
"description": description,
|
||
"input_schema": schema,
|
||
}
|
||
for scope in scopes:
|
||
_TOOL_REGISTRIES[scope].append(schema_entry)
|
||
if handler is not None:
|
||
_TOOL_DISPATCH[name] = handler
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Tool implementations
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _tool_read_file(args, target, _cache):
|
||
path = args.get("path", "")
|
||
max_bytes = args.get("max_bytes", 4096)
|
||
if not os.path.isabs(path):
|
||
path = os.path.join(target, path)
|
||
if not _path_is_safe(path, target):
|
||
return f"Error: path '{path}' is outside the target directory."
|
||
try:
|
||
file_size = os.path.getsize(path)
|
||
with open(path, "r", errors="replace") as f:
|
||
content = f.read(max_bytes)
|
||
if not content:
|
||
return "(empty file)"
|
||
if file_size > max_bytes:
|
||
content += (
|
||
f"\n\n[TRUNCATED — showed {max_bytes} of {file_size} bytes. "
|
||
f"Call again with a larger max_bytes or use "
|
||
f"run_command('tail -n ... {os.path.relpath(path, target)}') "
|
||
f"to see the rest.]"
|
||
)
|
||
return content
|
||
except OSError as e:
|
||
return f"Error reading file: {e}"
|
||
|
||
|
||
def _tool_list_directory(args, target, _cache):
|
||
path = args.get("path", target)
|
||
show_hidden = args.get("show_hidden", False)
|
||
if not os.path.isabs(path):
|
||
path = os.path.join(target, path)
|
||
if not _path_is_safe(path, target):
|
||
return f"Error: path '{path}' is outside the target directory."
|
||
if not os.path.isdir(path):
|
||
return f"Error: '{path}' is not a directory."
|
||
try:
|
||
entries = sorted(os.listdir(path))
|
||
lines = []
|
||
for name in entries:
|
||
if not show_hidden and name.startswith("."):
|
||
continue
|
||
full = os.path.join(path, name)
|
||
try:
|
||
st = os.stat(full)
|
||
mime = magic.from_file(full, mime=True) if not os.path.isdir(full) else None
|
||
if os.path.isdir(full):
|
||
lines.append(f" {name}/ (dir)")
|
||
else:
|
||
mime_str = f" [{mime}]" if mime else ""
|
||
lines.append(f" {name} ({st.st_size} bytes){mime_str}")
|
||
except OSError:
|
||
lines.append(f" {name} (stat failed)")
|
||
return "\n".join(lines) if lines else "(empty directory)"
|
||
except OSError as e:
|
||
return f"Error listing directory: {e}"
|
||
|
||
|
||
def _tool_run_command(args, target, _cache):
|
||
command = args.get("command", "")
|
||
parts = command.split()
|
||
if not parts:
|
||
return "Error: empty command."
|
||
binary = os.path.basename(parts[0])
|
||
if binary not in _COMMAND_WHITELIST:
|
||
return (
|
||
f"Error: '{binary}' is not allowed. "
|
||
f"Whitelist: {', '.join(sorted(_COMMAND_WHITELIST))}"
|
||
)
|
||
try:
|
||
result = subprocess.run(
|
||
command, shell=True, capture_output=True, text=True,
|
||
timeout=15, cwd=target,
|
||
)
|
||
output = result.stdout
|
||
if result.returncode != 0 and result.stderr:
|
||
output += f"\n(stderr: {result.stderr.strip()})"
|
||
return output.strip() if output.strip() else "(no output)"
|
||
except subprocess.TimeoutExpired:
|
||
return "Error: command timed out after 15 seconds."
|
||
except OSError as e:
|
||
return f"Error running command: {e}"
|
||
|
||
|
||
def _tool_parse_structure(args, target, _cache):
|
||
path = args.get("path", "")
|
||
if not os.path.isabs(path):
|
||
path = os.path.join(target, path)
|
||
if not _path_is_safe(path, target):
|
||
return f"Error: path '{path}' is outside the target directory."
|
||
return parse_structure(path)
|
||
|
||
|
||
def _tool_write_cache(args, _target, cache):
|
||
cache_type = args.get("cache_type", "")
|
||
path = args.get("path", "")
|
||
data = args.get("data", {})
|
||
if cache_type not in ("file", "dir"):
|
||
return "Error: cache_type must be 'file' or 'dir'."
|
||
return cache.write_entry(cache_type, path, data)
|
||
|
||
|
||
def _tool_read_cache(args, _target, cache):
|
||
cache_type = args.get("cache_type", "")
|
||
path = args.get("path", "")
|
||
if cache_type not in ("file", "dir"):
|
||
return "Error: cache_type must be 'file' or 'dir'."
|
||
entry = cache.read_entry(cache_type, path)
|
||
if entry is None:
|
||
return "null"
|
||
return json.dumps(entry, indent=2)
|
||
|
||
|
||
def _tool_list_cache(args, _target, cache):
|
||
cache_type = args.get("cache_type", "")
|
||
if cache_type not in ("file", "dir"):
|
||
return "Error: cache_type must be 'file' or 'dir'."
|
||
paths = cache.list_entries(cache_type)
|
||
if not paths:
|
||
return "(no cached entries)"
|
||
return "\n".join(paths)
|
||
|
||
|
||
def _tool_think(args, _target, _cache):
|
||
obs = args.get("observation", "")
|
||
hyp = args.get("hypothesis", "")
|
||
nxt = args.get("next_action", "")
|
||
print(f" [AI] THINK", file=sys.stderr)
|
||
print(f" observation: {obs}", file=sys.stderr)
|
||
print(f" hypothesis: {hyp}", file=sys.stderr)
|
||
print(f" next_action: {nxt}", file=sys.stderr)
|
||
return "ok"
|
||
|
||
|
||
def _tool_checkpoint(args, _target, _cache):
|
||
learned = args.get("learned", "")
|
||
unknown = args.get("still_unknown", "")
|
||
phase = args.get("next_phase", "")
|
||
print(f" [AI] CHECKPOINT", file=sys.stderr)
|
||
print(f" learned: {learned}", file=sys.stderr)
|
||
print(f" still_unknown: {unknown}", file=sys.stderr)
|
||
print(f" next_phase: {phase}", file=sys.stderr)
|
||
return "ok"
|
||
|
||
|
||
def _tool_flag(args, _target, cache):
|
||
path = args.get("path", "general")
|
||
finding = args.get("finding", "")
|
||
severity = args.get("severity", "info")
|
||
print(f" [AI] FLAG [{severity.upper()}] {path}", file=sys.stderr)
|
||
print(f" {finding}", file=sys.stderr)
|
||
flags_path = os.path.join(cache.root, "flags.jsonl")
|
||
entry = {"path": path, "finding": finding, "severity": severity}
|
||
try:
|
||
with open(flags_path, "a") as f:
|
||
f.write(json.dumps(entry) + "\n")
|
||
except OSError:
|
||
pass
|
||
return "ok"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Tool registrations
|
||
#
|
||
# Order within each scope is preserved to keep the agent-visible tool list
|
||
# stable. Tools that appear in two scopes (flag) and tools whose schema
|
||
# differs by scope (submit_report) are registered once per scope.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_FLAG_DESCRIPTION = (
|
||
"Mark a file, directory, or finding as notable or anomalous. "
|
||
"Call this immediately when you discover something surprising, "
|
||
"concerning, or important — do not save it for the report."
|
||
)
|
||
_FLAG_SCHEMA = {
|
||
"type": "object",
|
||
"properties": {
|
||
"path": {
|
||
"type": "string",
|
||
"description": "Relative path, or 'general'.",
|
||
},
|
||
"finding": {
|
||
"type": "string",
|
||
"description": "What you found.",
|
||
},
|
||
"severity": {
|
||
"type": "string",
|
||
"enum": ["info", "concern", "critical"],
|
||
"description": "info | concern | critical",
|
||
},
|
||
},
|
||
"required": ["path", "finding", "severity"],
|
||
}
|
||
|
||
|
||
# --- Dir loop tools ---
|
||
|
||
register_tool(
|
||
name="read_file",
|
||
description=(
|
||
"Read and return the contents of a file. Path must be inside "
|
||
"the target directory."
|
||
),
|
||
schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"path": {
|
||
"type": "string",
|
||
"description": "Absolute or relative path to the file.",
|
||
},
|
||
"max_bytes": {
|
||
"type": "integer",
|
||
"description": "Maximum bytes to read (default 4096).",
|
||
},
|
||
},
|
||
"required": ["path"],
|
||
},
|
||
scopes=["dir"],
|
||
handler=_tool_read_file,
|
||
)
|
||
|
||
register_tool(
|
||
name="list_directory",
|
||
description=(
|
||
"List the contents of a directory with file sizes and types."
|
||
),
|
||
schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"path": {
|
||
"type": "string",
|
||
"description": "Absolute or relative path to the directory.",
|
||
},
|
||
"show_hidden": {
|
||
"type": "boolean",
|
||
"description": "Include hidden files (default false).",
|
||
},
|
||
},
|
||
"required": ["path"],
|
||
},
|
||
scopes=["dir"],
|
||
handler=_tool_list_directory,
|
||
)
|
||
|
||
register_tool(
|
||
name="run_command",
|
||
description=(
|
||
"Run a read-only shell command. Allowed binaries: "
|
||
"wc, file, grep, head, tail, stat, du, find."
|
||
),
|
||
schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"command": {
|
||
"type": "string",
|
||
"description": "The shell command to execute.",
|
||
},
|
||
},
|
||
"required": ["command"],
|
||
},
|
||
scopes=["dir"],
|
||
handler=_tool_run_command,
|
||
)
|
||
|
||
register_tool(
|
||
name="parse_structure",
|
||
description=(
|
||
"Parse a source file using tree-sitter and return its structural "
|
||
"skeleton: functions, classes, imports, and code metrics. "
|
||
"Supported: Python, JavaScript, TypeScript, Rust, Go."
|
||
),
|
||
schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"path": {
|
||
"type": "string",
|
||
"description": "Path to the source file to parse.",
|
||
},
|
||
},
|
||
"required": ["path"],
|
||
},
|
||
scopes=["dir"],
|
||
handler=_tool_parse_structure,
|
||
)
|
||
|
||
register_tool(
|
||
name="write_cache",
|
||
description=(
|
||
"Write a summary cache entry for a file or directory. The data "
|
||
"must NOT contain raw file contents — summaries only."
|
||
),
|
||
schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"cache_type": {
|
||
"type": "string",
|
||
"enum": ["file", "dir"],
|
||
"description": "'file' or 'dir'.",
|
||
},
|
||
"path": {
|
||
"type": "string",
|
||
"description": "The path being cached.",
|
||
},
|
||
"data": {
|
||
"type": "object",
|
||
"description": (
|
||
"Cache entry. Files: {path, relative_path, size_bytes, "
|
||
"category, summary, notable, notable_reason, "
|
||
"confidence, confidence_reason, cached_at}. "
|
||
"Dirs: {path, relative_path, child_count, summary, "
|
||
"dominant_category, notable_files, "
|
||
"confidence, confidence_reason, cached_at}. "
|
||
"Always set confidence (0.0–1.0); see system prompt "
|
||
"for calibration. Set confidence_reason only when "
|
||
"confidence < 0.7."
|
||
),
|
||
},
|
||
},
|
||
"required": ["cache_type", "path", "data"],
|
||
},
|
||
scopes=["dir"],
|
||
handler=_tool_write_cache,
|
||
)
|
||
|
||
register_tool(
|
||
name="think",
|
||
description=(
|
||
"Record your reasoning before choosing which file or directory "
|
||
"to investigate next. Call this when deciding what to look at "
|
||
"— not before every individual tool call."
|
||
),
|
||
schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"observation": {
|
||
"type": "string",
|
||
"description": "What you have observed so far.",
|
||
},
|
||
"hypothesis": {
|
||
"type": "string",
|
||
"description": "Your hypothesis about the directory.",
|
||
},
|
||
"next_action": {
|
||
"type": "string",
|
||
"description": "What you plan to investigate next and why.",
|
||
},
|
||
},
|
||
"required": ["observation", "hypothesis", "next_action"],
|
||
},
|
||
scopes=["dir"],
|
||
handler=_tool_think,
|
||
)
|
||
|
||
register_tool(
|
||
name="checkpoint",
|
||
description=(
|
||
"Summarize what you have learned so far about this directory "
|
||
"and what you still need to determine. Call this after completing "
|
||
"a significant cluster of files — not after every file."
|
||
),
|
||
schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"learned": {
|
||
"type": "string",
|
||
"description": "What you have learned so far.",
|
||
},
|
||
"still_unknown": {
|
||
"type": "string",
|
||
"description": "What you still need to determine.",
|
||
},
|
||
"next_phase": {
|
||
"type": "string",
|
||
"description": "What you will investigate next.",
|
||
},
|
||
},
|
||
"required": ["learned", "still_unknown", "next_phase"],
|
||
},
|
||
scopes=["dir"],
|
||
handler=_tool_checkpoint,
|
||
)
|
||
|
||
register_tool(
|
||
name="flag",
|
||
description=_FLAG_DESCRIPTION,
|
||
schema=_FLAG_SCHEMA,
|
||
scopes=["dir"],
|
||
handler=_tool_flag,
|
||
)
|
||
|
||
register_tool(
|
||
name="submit_report",
|
||
description=(
|
||
"Submit the directory summary. This ends the investigation loop."
|
||
),
|
||
schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"summary": {
|
||
"type": "string",
|
||
"description": "1-3 sentence summary of the directory.",
|
||
},
|
||
"completeness": {
|
||
"type": "number",
|
||
"description": (
|
||
"Self-rated investigation completeness (0.0-1.0). "
|
||
"1.0 = examined every relevant file thoroughly. "
|
||
"0.5 = examined about half, or skimmed most. "
|
||
"< 0.3 = barely scratched the surface."
|
||
),
|
||
},
|
||
},
|
||
"required": ["summary", "completeness"],
|
||
},
|
||
scopes=["dir"],
|
||
)
|
||
|
||
|
||
# --- Synthesis tools ---
|
||
|
||
register_tool(
|
||
name="read_cache",
|
||
description="Read a previously cached summary for a file or directory.",
|
||
schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"cache_type": {
|
||
"type": "string",
|
||
"enum": ["file", "dir"],
|
||
},
|
||
"path": {
|
||
"type": "string",
|
||
"description": "The path to look up.",
|
||
},
|
||
},
|
||
"required": ["cache_type", "path"],
|
||
},
|
||
scopes=["synthesis"],
|
||
handler=_tool_read_cache,
|
||
)
|
||
|
||
register_tool(
|
||
name="list_cache",
|
||
description="List all cached entry paths of a given type.",
|
||
schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"cache_type": {
|
||
"type": "string",
|
||
"enum": ["file", "dir"],
|
||
},
|
||
},
|
||
"required": ["cache_type"],
|
||
},
|
||
scopes=["synthesis"],
|
||
handler=_tool_list_cache,
|
||
)
|
||
|
||
register_tool(
|
||
name="flag",
|
||
description=_FLAG_DESCRIPTION,
|
||
schema=_FLAG_SCHEMA,
|
||
scopes=["synthesis"],
|
||
)
|
||
|
||
register_tool(
|
||
name="submit_report",
|
||
description="Submit the final analysis report.",
|
||
schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"brief": {
|
||
"type": "string",
|
||
"description": "2-4 sentence summary.",
|
||
},
|
||
"detailed": {
|
||
"type": "string",
|
||
"description": "Thorough breakdown.",
|
||
},
|
||
},
|
||
"required": ["brief", "detailed"],
|
||
},
|
||
scopes=["synthesis"],
|
||
)
|
||
|
||
|
||
# --- Survey tools ---
|
||
|
||
register_tool(
|
||
name="submit_survey",
|
||
description=(
|
||
"Submit the reconnaissance survey. Call exactly once."
|
||
),
|
||
schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"description": {
|
||
"type": "string",
|
||
"description": "Plain-language description of the target.",
|
||
},
|
||
"approach": {
|
||
"type": "string",
|
||
"description": "Recommended analytical approach.",
|
||
},
|
||
"relevant_tools": {
|
||
"type": "array",
|
||
"items": {"type": "string"},
|
||
"description": "Tool names the dir loop should lean on.",
|
||
},
|
||
"skip_tools": {
|
||
"type": "array",
|
||
"items": {"type": "string"},
|
||
"description": "Tool names whose use would be wrong here.",
|
||
},
|
||
"domain_notes": {
|
||
"type": "string",
|
||
"description": "Short actionable hint, or empty string.",
|
||
},
|
||
"confidence": {
|
||
"type": "number",
|
||
"description": "0.0–1.0 confidence in this survey.",
|
||
},
|
||
},
|
||
"required": [
|
||
"description", "approach", "relevant_tools",
|
||
"skip_tools", "domain_notes", "confidence",
|
||
],
|
||
},
|
||
scopes=["survey"],
|
||
)
|
||
|
||
# --- Planning tools ---
|
||
|
||
register_tool(
|
||
name="submit_plan",
|
||
description=(
|
||
"Submit the investigation plan. Call exactly once."
|
||
),
|
||
schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"priority_dirs": {
|
||
"type": "array",
|
||
"items": {
|
||
"type": "object",
|
||
"properties": {
|
||
"path": {
|
||
"type": "string",
|
||
"description": "Relative directory path.",
|
||
},
|
||
"reason": {
|
||
"type": "string",
|
||
"description": "Why this dir deserves deep investigation.",
|
||
},
|
||
"suggested_turns": {
|
||
"type": "integer",
|
||
"description": "Suggested turns (15-20).",
|
||
},
|
||
},
|
||
"required": ["path", "reason", "suggested_turns"],
|
||
},
|
||
"description": "Directories to investigate deeply.",
|
||
},
|
||
"shallow_dirs": {
|
||
"type": "array",
|
||
"items": {
|
||
"type": "object",
|
||
"properties": {
|
||
"path": {
|
||
"type": "string",
|
||
"description": "Relative directory path.",
|
||
},
|
||
"reason": {
|
||
"type": "string",
|
||
"description": "Why a shallow pass is sufficient.",
|
||
},
|
||
},
|
||
"required": ["path", "reason"],
|
||
},
|
||
"description": "Directories needing only a quick pass.",
|
||
},
|
||
"skip_dirs": {
|
||
"type": "array",
|
||
"items": {
|
||
"type": "object",
|
||
"properties": {
|
||
"path": {
|
||
"type": "string",
|
||
"description": "Relative directory path.",
|
||
},
|
||
"reason": {
|
||
"type": "string",
|
||
"description": "Why this dir should be skipped.",
|
||
},
|
||
},
|
||
"required": ["path", "reason"],
|
||
},
|
||
"description": "Directories to skip entirely.",
|
||
},
|
||
"investigation_order": {
|
||
"type": "string",
|
||
"enum": ["leaf-first", "priority-first"],
|
||
"description": "leaf-first or priority-first (leaf-first within bands).",
|
||
},
|
||
"notes": {
|
||
"type": "string",
|
||
"description": "Cross-cutting notes for per-directory agents, or empty.",
|
||
},
|
||
},
|
||
"required": [
|
||
"priority_dirs", "shallow_dirs", "skip_dirs",
|
||
"investigation_order", "notes",
|
||
],
|
||
},
|
||
scopes=["planning"],
|
||
)
|
||
|
||
|
||
def _execute_tool(name, args, target, cache, dir_rel, turn, verbose=False):
|
||
"""Execute a tool by name and return the result string."""
|
||
handler = _TOOL_DISPATCH.get(name)
|
||
if handler is None:
|
||
return f"Error: unknown tool '{name}'."
|
||
result = handler(args, target, cache)
|
||
|
||
cache.log_turn(dir_rel, turn, name,
|
||
{k: v for k, v in args.items() if k != "data"},
|
||
len(result))
|
||
|
||
if verbose:
|
||
preview = result[:200] + "..." if len(result) > 200 else result
|
||
print(f" [AI] <- {len(result)} chars: {preview}", file=sys.stderr)
|
||
|
||
return result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Streaming API caller
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _call_api_streaming(client, system, messages, tools, tracker):
|
||
"""Call Claude via streaming. Print tool decisions in real-time.
|
||
|
||
Returns (content_blocks, usage) where content_blocks is the list of
|
||
content blocks from the response.
|
||
"""
|
||
with client.messages.stream(
|
||
model=MODEL,
|
||
max_tokens=4096,
|
||
system=system,
|
||
messages=messages,
|
||
tools=tools,
|
||
) as stream:
|
||
# Print tool call names as they arrive
|
||
current_tool = None
|
||
for event in stream:
|
||
if event.type == "content_block_start":
|
||
block = event.content_block
|
||
if block.type == "tool_use":
|
||
current_tool = block.name
|
||
# We'll print the full args after the block is complete
|
||
elif event.type == "content_block_stop":
|
||
current_tool = None
|
||
|
||
response = stream.get_final_message()
|
||
|
||
tracker.record(response.usage)
|
||
return response.content, response.usage
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Directory discovery
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _discover_directories(target, show_hidden=False, exclude=None):
|
||
"""Walk the target and return all directories sorted leaves-first."""
|
||
extra = set(exclude or [])
|
||
dirs = []
|
||
target_real = os.path.realpath(target)
|
||
for root, subdirs, _files in os.walk(target_real, topdown=True):
|
||
subdirs[:] = [
|
||
d for d in subdirs
|
||
if not _should_skip_dir(d)
|
||
and d not in extra
|
||
and (show_hidden or not d.startswith("."))
|
||
]
|
||
dirs.append(root)
|
||
dirs.sort(key=lambda d: (-d.count(os.sep), d))
|
||
return dirs
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Per-directory agent loop
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _build_dir_context(dir_path):
|
||
lines = []
|
||
try:
|
||
entries = sorted(os.listdir(dir_path))
|
||
for name in entries:
|
||
if name.startswith("."):
|
||
continue
|
||
full = os.path.join(dir_path, name)
|
||
try:
|
||
st = os.stat(full)
|
||
if os.path.isdir(full):
|
||
lines.append(f" {name}/ (dir)")
|
||
else:
|
||
mime = magic.from_file(full, mime=True)
|
||
lines.append(f" {name} ({st.st_size} bytes) [{mime}]")
|
||
except OSError:
|
||
lines.append(f" {name} (stat failed)")
|
||
except OSError:
|
||
lines.append(" (could not list directory)")
|
||
return "Directory contents:\n" + "\n".join(lines) if lines else "(empty)"
|
||
|
||
|
||
def _get_child_summaries(dir_path, cache):
|
||
parts = []
|
||
try:
|
||
for name in sorted(os.listdir(dir_path)):
|
||
child = os.path.join(dir_path, name)
|
||
if not os.path.isdir(child):
|
||
continue
|
||
entry = cache.read_entry("dir", child)
|
||
if entry:
|
||
rel = entry.get("relative_path", name)
|
||
summary = entry.get("summary", "(no summary)")
|
||
parts.append(f"- {rel}/: {summary}")
|
||
except OSError:
|
||
pass
|
||
if parts:
|
||
return "\n".join(parts)
|
||
# Distinguish actual leaves from parents whose children haven't been
|
||
# investigated yet. The old placeholder claimed "leaf directory" even
|
||
# when children existed but were not yet cached, which silently
|
||
# degraded parent context.
|
||
try:
|
||
has_subdirs = any(
|
||
os.path.isdir(os.path.join(dir_path, name))
|
||
for name in os.listdir(dir_path)
|
||
if not name.startswith(".")
|
||
)
|
||
except OSError:
|
||
has_subdirs = False
|
||
if has_subdirs:
|
||
return "(child directories exist but have not been investigated yet)"
|
||
return "(none: this is a leaf directory)"
|
||
|
||
|
||
_SURVEY_CONFIDENCE_THRESHOLD = 0.5
|
||
_PROTECTED_DIR_TOOLS = {"submit_report"}
|
||
|
||
# Survey-skip thresholds. Skip the survey only when BOTH are below.
|
||
# See #46 for the plan to revisit these with empirical data.
|
||
_SURVEY_MIN_FILES = 5
|
||
_SURVEY_MIN_DIRS = 2
|
||
|
||
|
||
def _default_survey():
|
||
"""Synthetic survey for targets too small to justify the API call.
|
||
|
||
confidence=0.0 ensures _filter_dir_tools() never enforces skip_tools
|
||
based on this synthetic value — the dir loop keeps its full toolbox.
|
||
"""
|
||
return {
|
||
"description": "Small target — survey skipped.",
|
||
"approach": (
|
||
"The target is small enough to investigate exhaustively. "
|
||
"Read every file directly."
|
||
),
|
||
"relevant_tools": [],
|
||
"skip_tools": [],
|
||
"domain_notes": "",
|
||
"confidence": 0.0,
|
||
}
|
||
|
||
|
||
def _format_survey_block(survey):
|
||
"""Render survey output as a labeled text block for the dir prompt."""
|
||
if not survey:
|
||
return "(no survey available)"
|
||
lines = [
|
||
f"Description: {survey.get('description', '')}",
|
||
f"Approach: {survey.get('approach', '')}",
|
||
]
|
||
notes = survey.get("domain_notes", "")
|
||
if notes:
|
||
lines.append(f"Domain notes: {notes}")
|
||
relevant = survey.get("relevant_tools") or []
|
||
if relevant:
|
||
lines.append(f"Relevant tools (lean on these): {', '.join(relevant)}")
|
||
skip = survey.get("skip_tools") or []
|
||
if skip:
|
||
lines.append(f"Skip tools (already removed from your toolbox): "
|
||
f"{', '.join(skip)}")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _filter_dir_tools(survey):
|
||
"""Return _DIR_TOOLS with skip_tools removed, gated on confidence.
|
||
|
||
- Returns full list if survey is None or confidence < threshold.
|
||
- Always preserves control-flow tools in _PROTECTED_DIR_TOOLS.
|
||
- Tool names in skip_tools that don't match anything are silently ignored.
|
||
"""
|
||
if not survey:
|
||
return list(_DIR_TOOLS)
|
||
try:
|
||
confidence = float(survey.get("confidence", 0.0) or 0.0)
|
||
except (TypeError, ValueError):
|
||
confidence = 0.0
|
||
if confidence < _SURVEY_CONFIDENCE_THRESHOLD:
|
||
return list(_DIR_TOOLS)
|
||
skip = set(survey.get("skip_tools") or []) - _PROTECTED_DIR_TOOLS
|
||
if not skip:
|
||
return list(_DIR_TOOLS)
|
||
return [t for t in _DIR_TOOLS if t["name"] not in skip]
|
||
|
||
|
||
_DirLoopContext = namedtuple(
|
||
"_DirLoopContext", ["dir_rel", "system", "dir_tools", "messages"],
|
||
)
|
||
|
||
|
||
def _build_dir_loop_context(dir_path, target, cache, survey, max_turns):
|
||
"""Assemble the static inputs the dir loop needs before its first turn.
|
||
|
||
Pure data assembly: reads the cache for child summaries, builds the
|
||
formatted system prompt, filters the tool list, and returns the seed
|
||
user message. No writes.
|
||
"""
|
||
dir_rel = os.path.relpath(dir_path, target)
|
||
if dir_rel == ".":
|
||
dir_rel = os.path.basename(target)
|
||
|
||
context = _build_dir_context(dir_path)
|
||
child_summaries = _get_child_summaries(dir_path, cache)
|
||
survey_context = _format_survey_block(survey)
|
||
dir_tools = _filter_dir_tools(survey)
|
||
|
||
system = _DIR_SYSTEM_PROMPT.format(
|
||
dir_path=dir_path,
|
||
dir_rel=dir_rel,
|
||
max_turns=max_turns,
|
||
context=context,
|
||
child_summaries=child_summaries,
|
||
survey_context=survey_context,
|
||
)
|
||
|
||
messages = [
|
||
{
|
||
"role": "user",
|
||
"content": (
|
||
"Investigate this directory now. Use parse_structure for "
|
||
"source files, read_file for others, cache summaries, and "
|
||
"call submit_report. Batch tool calls for efficiency."
|
||
),
|
||
},
|
||
]
|
||
|
||
return _DirLoopContext(
|
||
dir_rel=dir_rel, system=system, dir_tools=dir_tools, messages=messages,
|
||
)
|
||
|
||
|
||
def _flush_partial_dir_entry(dir_path, target, cache):
|
||
"""Write a partial dir cache entry from any already-cached file entries.
|
||
|
||
Called when the per-loop context budget is exceeded before the agent
|
||
reaches submit_report. Idempotent: returns "" without writing if a dir
|
||
entry already exists. Returns the partial summary string (empty if no
|
||
file entries were available to synthesize from).
|
||
"""
|
||
if cache.has_entry("dir", dir_path):
|
||
return ""
|
||
|
||
dir_real = os.path.realpath(dir_path)
|
||
file_entries = [
|
||
e for e in cache.read_all_entries("file")
|
||
if os.path.realpath(e.get("path", "")).startswith(dir_real + os.sep)
|
||
or os.path.dirname(
|
||
os.path.join(target, e.get("relative_path", ""))
|
||
) == dir_real
|
||
]
|
||
|
||
if file_entries:
|
||
file_summaries = [
|
||
e["summary"] for e in file_entries if e.get("summary")
|
||
]
|
||
notable = [
|
||
e.get("relative_path", e.get("path", ""))
|
||
for e in file_entries if e.get("notable")
|
||
]
|
||
partial_summary = " ".join(file_summaries)
|
||
cache.write_entry("dir", dir_path, {
|
||
"path": dir_path,
|
||
"relative_path": os.path.relpath(dir_path, target),
|
||
"child_count": len([
|
||
n for n in os.listdir(dir_path)
|
||
if not n.startswith(".")
|
||
]) if os.path.isdir(dir_path) else 0,
|
||
"summary": partial_summary,
|
||
"dominant_category": "unknown",
|
||
"notable_files": notable,
|
||
"partial": True,
|
||
"partial_reason": "context budget reached",
|
||
"cached_at": _now_iso(),
|
||
})
|
||
return partial_summary
|
||
|
||
cache.write_entry("dir", dir_path, {
|
||
"path": dir_path,
|
||
"relative_path": os.path.relpath(dir_path, target),
|
||
"child_count": 0,
|
||
"summary": ("Investigation incomplete — context budget "
|
||
"reached before any files were processed."),
|
||
"dominant_category": "unknown",
|
||
"notable_files": [],
|
||
"partial": True,
|
||
"partial_reason": (
|
||
"context budget reached before files processed"),
|
||
"cached_at": _now_iso(),
|
||
})
|
||
return ""
|
||
|
||
|
||
def _handle_turn_response(content_blocks, messages, target, cache, dir_rel,
|
||
turn, verbose):
|
||
"""Process one turn's response: print, append, dispatch tools.
|
||
|
||
Mutates `messages` in place: appends the assistant message, then either
|
||
a "please call submit_report" nudge (no tool_uses) or the tool_results
|
||
user message. Recognizes submit_report as the loop's done signal and
|
||
extracts its summary. Returns (done, summary).
|
||
"""
|
||
for b in content_blocks:
|
||
if b.type == "text" and b.text.strip():
|
||
for line in b.text.strip().split("\n"):
|
||
print(f" [AI] {line}", file=sys.stderr)
|
||
|
||
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
|
||
for tu in tool_uses:
|
||
arg_summary = ", ".join(
|
||
f"{k}={v!r}" for k, v in tu.input.items() if k != "data"
|
||
) if tu.input else ""
|
||
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
|
||
|
||
messages.append({
|
||
"role": "assistant",
|
||
"content": [_block_to_dict(b) for b in content_blocks],
|
||
})
|
||
|
||
if not tool_uses:
|
||
messages.append({
|
||
"role": "user",
|
||
"content": "Please call submit_report with your summary.",
|
||
})
|
||
return False, None, None
|
||
|
||
tool_results = []
|
||
done = False
|
||
summary = None
|
||
completeness = None
|
||
for tu in tool_uses:
|
||
if tu.name == "submit_report":
|
||
summary = tu.input.get("summary", "")
|
||
try:
|
||
completeness = float(tu.input.get("completeness", 0) or 0)
|
||
except (TypeError, ValueError):
|
||
completeness = None
|
||
tool_results.append({
|
||
"type": "tool_result",
|
||
"tool_use_id": tu.id,
|
||
"content": "Summary submitted.",
|
||
})
|
||
done = True
|
||
else:
|
||
result_text = _execute_tool(
|
||
tu.name, tu.input, target, cache, dir_rel,
|
||
turn + 1, verbose=verbose,
|
||
)
|
||
tool_results.append({
|
||
"type": "tool_result",
|
||
"tool_use_id": tu.id,
|
||
"content": result_text,
|
||
})
|
||
|
||
messages.append({"role": "user", "content": tool_results})
|
||
return done, summary, completeness
|
||
|
||
|
||
def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14,
|
||
verbose=False, survey=None):
|
||
"""Run an isolated agent loop for a single directory.
|
||
|
||
Returns (summary, completeness) where completeness is the agent's
|
||
self-rated investigation thoroughness (0.0-1.0), or None if not reported.
|
||
"""
|
||
ctx = _build_dir_loop_context(
|
||
dir_path, target, cache, survey, max_turns,
|
||
)
|
||
summary = None
|
||
completeness = None
|
||
|
||
for turn in range(max_turns):
|
||
if tracker.budget_exceeded():
|
||
print(f" [AI] Context budget reached — exiting early "
|
||
f"(context size {tracker.last_input:,} > "
|
||
f"{CONTEXT_BUDGET:,} budget; "
|
||
f"loop spend {tracker.loop_total:,} tokens)",
|
||
file=sys.stderr)
|
||
partial = _flush_partial_dir_entry(dir_path, target, cache)
|
||
if partial and not summary:
|
||
summary = partial
|
||
break
|
||
|
||
try:
|
||
content_blocks, _usage = _call_api_streaming(
|
||
client, ctx.system, ctx.messages, ctx.dir_tools, tracker,
|
||
)
|
||
except anthropic.APIError as e:
|
||
print(f" [AI] API error: {e}", file=sys.stderr)
|
||
break
|
||
|
||
done, turn_summary, turn_completeness = _handle_turn_response(
|
||
content_blocks, ctx.messages, target, cache,
|
||
ctx.dir_rel, turn, verbose,
|
||
)
|
||
if turn_summary is not None:
|
||
summary = turn_summary
|
||
if turn_completeness is not None:
|
||
completeness = turn_completeness
|
||
if done:
|
||
break
|
||
else:
|
||
print(f" [AI] Warning: max turns reached for {ctx.dir_rel}",
|
||
file=sys.stderr)
|
||
|
||
return summary, completeness
|
||
|
||
|
||
def _block_to_dict(block):
|
||
"""Convert an SDK content block to a plain dict for message history."""
|
||
if block.type == "text":
|
||
return {"type": "text", "text": block.text}
|
||
elif block.type == "tool_use":
|
||
return {"type": "tool_use", "id": block.id,
|
||
"name": block.name, "input": block.input}
|
||
return {"type": block.type}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Synthesis pass
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _format_survey_signals(signals):
|
||
"""Render the survey_signals dict as a labeled text block."""
|
||
if not signals or not signals.get("total_files"):
|
||
return "(no files classified)"
|
||
|
||
lines = [f"Total files: {signals.get('total_files', 0)}", ""]
|
||
|
||
ext_hist = signals.get("extension_histogram") or {}
|
||
if ext_hist:
|
||
lines.append("Extensions (top, by count):")
|
||
for ext, n in ext_hist.items():
|
||
lines.append(f" {ext}: {n}")
|
||
lines.append("")
|
||
|
||
descs = signals.get("file_descriptions") or {}
|
||
if descs:
|
||
lines.append("file --brief output (top, by count):")
|
||
for desc, n in descs.items():
|
||
lines.append(f" {desc}: {n}")
|
||
lines.append("")
|
||
|
||
samples = signals.get("filename_samples") or []
|
||
if samples:
|
||
lines.append("Filename samples (evenly drawn):")
|
||
for name in samples:
|
||
lines.append(f" {name}")
|
||
|
||
return "\n".join(lines).rstrip()
|
||
|
||
|
||
def _run_survey(client, target, report, tracker, max_turns=3, verbose=False):
|
||
"""Run the reconnaissance survey pass.
|
||
|
||
Returns a survey dict on success, or None on failure / out-of-turns.
|
||
Survey is advisory — callers must treat None as "no survey context".
|
||
"""
|
||
signals = report.get("survey_signals") or {}
|
||
survey_signals_text = _format_survey_signals(signals)
|
||
|
||
try:
|
||
tree_node = build_tree(target, max_depth=2)
|
||
tree_preview = render_tree(tree_node)
|
||
except Exception:
|
||
tree_preview = "(tree unavailable)"
|
||
|
||
tool_names = [t["name"] for t in _DIR_TOOLS if t["name"] != "submit_report"]
|
||
available_tools = ", ".join(tool_names)
|
||
|
||
system = _SURVEY_SYSTEM_PROMPT.format(
|
||
target=target,
|
||
survey_signals=survey_signals_text,
|
||
tree_preview=tree_preview,
|
||
available_tools=available_tools,
|
||
)
|
||
|
||
messages = [
|
||
{
|
||
"role": "user",
|
||
"content": (
|
||
"All inputs are in the system prompt above. Call "
|
||
"submit_survey now — no other tool calls needed."
|
||
),
|
||
},
|
||
]
|
||
|
||
survey = None
|
||
|
||
for turn in range(max_turns):
|
||
try:
|
||
content_blocks, _usage = _call_api_streaming(
|
||
client, system, messages, _SURVEY_TOOLS, tracker,
|
||
)
|
||
except anthropic.APIError as e:
|
||
print(f" [AI] API error: {e}", file=sys.stderr)
|
||
return None
|
||
|
||
for b in content_blocks:
|
||
if b.type == "text" and b.text.strip():
|
||
for line in b.text.strip().split("\n"):
|
||
print(f" [AI] {line}", file=sys.stderr)
|
||
|
||
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
|
||
for tu in tool_uses:
|
||
arg_summary = ", ".join(
|
||
f"{k}={v!r}" for k, v in tu.input.items()
|
||
) if tu.input else ""
|
||
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
|
||
|
||
messages.append({
|
||
"role": "assistant",
|
||
"content": [_block_to_dict(b) for b in content_blocks],
|
||
})
|
||
|
||
if not tool_uses:
|
||
messages.append({
|
||
"role": "user",
|
||
"content": "Please call submit_survey.",
|
||
})
|
||
continue
|
||
|
||
tool_results = []
|
||
done = False
|
||
for tu in tool_uses:
|
||
if tu.name == "submit_survey":
|
||
survey = {
|
||
"description": tu.input.get("description", ""),
|
||
"approach": tu.input.get("approach", ""),
|
||
"relevant_tools": tu.input.get("relevant_tools", []) or [],
|
||
"skip_tools": tu.input.get("skip_tools", []) or [],
|
||
"domain_notes": tu.input.get("domain_notes", ""),
|
||
"confidence": float(tu.input.get("confidence", 0.0) or 0.0),
|
||
}
|
||
tool_results.append({
|
||
"type": "tool_result",
|
||
"tool_use_id": tu.id,
|
||
"content": "Survey received. Thank you.",
|
||
})
|
||
done = True
|
||
else:
|
||
tool_results.append({
|
||
"type": "tool_result",
|
||
"tool_use_id": tu.id,
|
||
"content": "Unknown tool. Call submit_survey.",
|
||
"is_error": True,
|
||
})
|
||
|
||
messages.append({"role": "user", "content": tool_results})
|
||
|
||
if done:
|
||
break
|
||
else:
|
||
print(" [AI] Warning: survey ran out of turns.", file=sys.stderr)
|
||
|
||
return survey
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Planning pass
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# Turn allocation defaults.
|
||
_DEFAULT_TURNS = 10
|
||
_SHALLOW_TURNS = 5
|
||
_MAX_TURNS_CEILING = 25
|
||
_BASE_TURNS_PER_DIR = 10
|
||
|
||
|
||
def _default_plan():
|
||
"""Fallback plan when planning is skipped or fails.
|
||
|
||
All directories get default turns, leaf-first order, no overrides.
|
||
"""
|
||
return {
|
||
"priority_dirs": [],
|
||
"shallow_dirs": [],
|
||
"skip_dirs": [],
|
||
"investigation_order": "leaf-first",
|
||
"notes": "",
|
||
}
|
||
|
||
|
||
def _run_planning(client, target, survey, report, all_dirs, tracker,
|
||
cached_dirs=None, max_turns=3, verbose=False):
|
||
"""Run the planning pass. Returns a plan dict or None on failure.
|
||
|
||
The planning pass decides where to invest investigation depth.
|
||
It runs after the survey and before the per-directory loops.
|
||
"""
|
||
cached_dirs = cached_dirs or []
|
||
dir_count = len(all_dirs)
|
||
global_budget = _BASE_TURNS_PER_DIR * dir_count
|
||
|
||
survey_context = _format_survey_block(survey) if survey else "(no survey available)"
|
||
|
||
try:
|
||
tree_node = build_tree(target, max_depth=6)
|
||
tree_text = render_tree(tree_node)
|
||
except Exception:
|
||
tree_text = "(tree unavailable)"
|
||
|
||
signals = report.get("survey_signals") or {}
|
||
file_signals = _format_survey_signals(signals)
|
||
|
||
cached_rel = []
|
||
for d in cached_dirs:
|
||
cached_rel.append(os.path.relpath(d, target))
|
||
cached_text = ", ".join(cached_rel) if cached_rel else "(none)"
|
||
|
||
system = _PLANNING_SYSTEM_PROMPT.format(
|
||
target=target,
|
||
survey_context=survey_context,
|
||
tree_text=tree_text,
|
||
file_signals=file_signals,
|
||
dir_count=dir_count,
|
||
cached_dirs=cached_text,
|
||
default_turns=_DEFAULT_TURNS,
|
||
global_budget=global_budget,
|
||
)
|
||
|
||
messages = [
|
||
{
|
||
"role": "user",
|
||
"content": (
|
||
"All inputs are in the system prompt above. Call "
|
||
"submit_plan now."
|
||
),
|
||
},
|
||
]
|
||
|
||
plan = None
|
||
|
||
for turn in range(max_turns):
|
||
try:
|
||
content_blocks, _usage = _call_api_streaming(
|
||
client, system, messages, _PLANNING_TOOLS, tracker,
|
||
)
|
||
except anthropic.APIError as e:
|
||
print(f" [AI] API error: {e}", file=sys.stderr)
|
||
return None
|
||
|
||
for b in content_blocks:
|
||
if b.type == "text" and b.text.strip():
|
||
for line in b.text.strip().split("\n"):
|
||
print(f" [AI] {line}", file=sys.stderr)
|
||
|
||
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
|
||
for tu in tool_uses:
|
||
arg_summary = ", ".join(
|
||
f"{k}={v!r}" for k, v in tu.input.items()
|
||
) if tu.input else ""
|
||
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
|
||
|
||
messages.append({
|
||
"role": "assistant",
|
||
"content": [_block_to_dict(b) for b in content_blocks],
|
||
})
|
||
|
||
if not tool_uses:
|
||
messages.append({
|
||
"role": "user",
|
||
"content": "Please call submit_plan.",
|
||
})
|
||
continue
|
||
|
||
tool_results = []
|
||
done = False
|
||
for tu in tool_uses:
|
||
if tu.name == "submit_plan":
|
||
plan = {
|
||
"priority_dirs": tu.input.get("priority_dirs", []) or [],
|
||
"shallow_dirs": tu.input.get("shallow_dirs", []) or [],
|
||
"skip_dirs": tu.input.get("skip_dirs", []) or [],
|
||
"investigation_order": tu.input.get(
|
||
"investigation_order", "leaf-first"
|
||
),
|
||
"notes": tu.input.get("notes", ""),
|
||
}
|
||
tool_results.append({
|
||
"type": "tool_result",
|
||
"tool_use_id": tu.id,
|
||
"content": "Plan received. Thank you.",
|
||
})
|
||
done = True
|
||
else:
|
||
tool_results.append({
|
||
"type": "tool_result",
|
||
"tool_use_id": tu.id,
|
||
"content": "Unknown tool. Call submit_plan.",
|
||
"is_error": True,
|
||
})
|
||
|
||
messages.append({"role": "user", "content": tool_results})
|
||
|
||
if done:
|
||
break
|
||
else:
|
||
print(" [AI] Warning: planning ran out of turns.", file=sys.stderr)
|
||
|
||
return plan
|
||
|
||
|
||
def _apply_plan(all_dirs, to_investigate, plan, target):
|
||
"""Apply the plan to produce an ordered dir list and turn map.
|
||
|
||
Returns (ordered_dirs, turn_map) where:
|
||
- ordered_dirs: list of absolute dir paths in investigation order
|
||
- turn_map: dict of {abs_dir_path: max_turns}
|
||
|
||
Pure function: no I/O, no cache, no API calls.
|
||
"""
|
||
if plan is None:
|
||
return list(to_investigate), {}
|
||
|
||
# Build lookup from relative path to absolute path.
|
||
# The target root maps to "." via relpath, but the planner sees
|
||
# basename(target) in the tree output and uses that as the path.
|
||
# Register both so either form matches (#76).
|
||
rel_to_abs = {}
|
||
for d in all_dirs:
|
||
rel = os.path.relpath(d, target)
|
||
rel_to_abs[rel] = d
|
||
if rel == ".":
|
||
rel_to_abs[os.path.basename(d)] = d
|
||
|
||
# Classify directories by tier.
|
||
skip_set = set()
|
||
priority_set = set()
|
||
shallow_set = set()
|
||
turn_map = {}
|
||
unmatched = []
|
||
|
||
for entry in plan.get("skip_dirs", []):
|
||
rel = entry.get("path", "")
|
||
if rel in rel_to_abs:
|
||
skip_set.add(rel_to_abs[rel])
|
||
else:
|
||
unmatched.append(rel)
|
||
|
||
for entry in plan.get("priority_dirs", []):
|
||
rel = entry.get("path", "")
|
||
suggested = entry.get("suggested_turns", 15)
|
||
capped = min(suggested, _MAX_TURNS_CEILING)
|
||
if rel in rel_to_abs:
|
||
abs_path = rel_to_abs[rel]
|
||
priority_set.add(abs_path)
|
||
turn_map[abs_path] = capped
|
||
else:
|
||
unmatched.append(rel)
|
||
|
||
for entry in plan.get("shallow_dirs", []):
|
||
rel = entry.get("path", "")
|
||
if rel in rel_to_abs:
|
||
abs_path = rel_to_abs[rel]
|
||
shallow_set.add(abs_path)
|
||
turn_map[abs_path] = _SHALLOW_TURNS
|
||
else:
|
||
unmatched.append(rel)
|
||
|
||
if unmatched:
|
||
print(
|
||
f" [AI] Warning: plan referenced unknown dirs: "
|
||
f"{', '.join(unmatched)}",
|
||
file=sys.stderr,
|
||
)
|
||
|
||
# Remove skipped dirs from the investigation list.
|
||
remaining = [d for d in to_investigate if d not in skip_set]
|
||
|
||
# Order by bands. Both strategies preserve leaf-first within bands.
|
||
order = plan.get("investigation_order", "leaf-first")
|
||
|
||
if order == "priority-first":
|
||
priority_band = [d for d in remaining if d in priority_set]
|
||
shallow_band = [d for d in remaining if d in shallow_set]
|
||
default_band = [
|
||
d for d in remaining
|
||
if d not in priority_set and d not in shallow_set
|
||
]
|
||
ordered = priority_band + default_band + shallow_band
|
||
else:
|
||
# leaf-first: keep the original order (already leaf-first from
|
||
# _discover_directories), just remove skipped dirs.
|
||
ordered = remaining
|
||
|
||
return ordered, turn_map
|
||
|
||
|
||
def _write_plan_evaluation(cache, plan, turn_utilization):
|
||
"""Write plan_evaluation.json comparing plan predictions to actual results.
|
||
|
||
This is the planning pass's report card: did we allocate turns well?
|
||
"""
|
||
# Build a lookup of what the plan predicted per dir.
|
||
predicted = {}
|
||
for entry in (plan or {}).get("priority_dirs", []):
|
||
predicted[entry["path"]] = {
|
||
"tier": "priority",
|
||
"suggested_turns": entry.get("suggested_turns", 15),
|
||
}
|
||
for entry in (plan or {}).get("shallow_dirs", []):
|
||
predicted[entry["path"]] = {
|
||
"tier": "shallow",
|
||
"suggested_turns": _SHALLOW_TURNS,
|
||
}
|
||
for entry in (plan or {}).get("skip_dirs", []):
|
||
predicted[entry["path"]] = {
|
||
"tier": "skip",
|
||
"suggested_turns": 0,
|
||
}
|
||
|
||
# Compare predictions to actual turn utilization.
|
||
per_dir = []
|
||
total_allocated = 0
|
||
total_used = 0
|
||
for record in turn_utilization:
|
||
dir_rel = record["dir"]
|
||
allocated = record["turns_allocated"]
|
||
used = record["turns_used"]
|
||
total_allocated += allocated
|
||
total_used += used
|
||
|
||
pred = predicted.get(dir_rel, {})
|
||
entry = {
|
||
"dir": dir_rel,
|
||
"planned_tier": pred.get("tier", "default"),
|
||
"turns_allocated": allocated,
|
||
"turns_used": used,
|
||
"utilization": round(used / allocated, 2) if allocated else 0,
|
||
}
|
||
|
||
# Include completeness from turn utilization record (#74).
|
||
record_completeness = record.get("completeness")
|
||
if record_completeness is not None:
|
||
entry["completeness"] = record_completeness
|
||
|
||
# Read confidence from the cached dir entry if available.
|
||
dir_entry = cache.read_entry("dir", os.path.join(
|
||
cache.target, dir_rel,
|
||
))
|
||
if dir_entry:
|
||
entry["confidence"] = dir_entry.get("confidence")
|
||
|
||
per_dir.append(entry)
|
||
|
||
evaluation = {
|
||
"plan_order": (plan or {}).get("investigation_order", "leaf-first"),
|
||
"total_dirs_investigated": len(turn_utilization),
|
||
"total_turns_allocated": total_allocated,
|
||
"total_turns_used": total_used,
|
||
"overall_utilization": (
|
||
round(total_used / total_allocated, 2) if total_allocated else 0
|
||
),
|
||
"per_directory": per_dir,
|
||
"evaluated_at": _now_iso(),
|
||
}
|
||
|
||
try:
|
||
eval_path = os.path.join(cache.root, "plan_evaluation.json")
|
||
with open(eval_path, "w") as f:
|
||
json.dump(evaluation, f, indent=2)
|
||
print(
|
||
f" [AI] Plan evaluation: {total_used}/{total_allocated} turns used "
|
||
f"({evaluation['overall_utilization']:.0%} utilization)",
|
||
file=sys.stderr,
|
||
)
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False):
|
||
"""Run the final synthesis pass. Returns (brief, detailed)."""
|
||
dir_entries = cache.read_all_entries("dir")
|
||
|
||
summary_lines = []
|
||
for entry in dir_entries:
|
||
rel = entry.get("relative_path", "?")
|
||
summary = entry.get("summary", "(no summary)")
|
||
dominant = entry.get("dominant_category", "?")
|
||
notable = entry.get("notable_files", [])
|
||
summary_lines.append(f"### {rel}/")
|
||
summary_lines.append(f"Category: {dominant}")
|
||
summary_lines.append(f"Summary: {summary}")
|
||
if notable:
|
||
summary_lines.append(f"Notable files: {', '.join(notable)}")
|
||
summary_lines.append("")
|
||
|
||
summaries_text = "\n".join(summary_lines) if summary_lines else "(none)"
|
||
|
||
system = _SYNTHESIS_SYSTEM_PROMPT.format(
|
||
target=target,
|
||
summaries_text=summaries_text,
|
||
)
|
||
|
||
messages = [
|
||
{
|
||
"role": "user",
|
||
"content": (
|
||
"All directory summaries are in the system prompt above. "
|
||
"Synthesize them into a cohesive report and call "
|
||
"submit_report immediately — no other tool calls needed."
|
||
),
|
||
},
|
||
]
|
||
|
||
brief, detailed = "", ""
|
||
|
||
for turn in range(max_turns):
|
||
try:
|
||
content_blocks, usage = _call_api_streaming(
|
||
client, system, messages, _SYNTHESIS_TOOLS, tracker,
|
||
)
|
||
except anthropic.APIError as e:
|
||
print(f" [AI] API error: {e}", file=sys.stderr)
|
||
break
|
||
|
||
# Print text blocks to stderr
|
||
for b in content_blocks:
|
||
if b.type == "text" and b.text.strip():
|
||
for line in b.text.strip().split("\n"):
|
||
print(f" [AI] {line}", file=sys.stderr)
|
||
|
||
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
|
||
for tu in tool_uses:
|
||
arg_summary = ", ".join(
|
||
f"{k}={v!r}" for k, v in tu.input.items() if k != "data"
|
||
) if tu.input else ""
|
||
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
|
||
|
||
messages.append({
|
||
"role": "assistant",
|
||
"content": [_block_to_dict(b) for b in content_blocks],
|
||
})
|
||
|
||
if not tool_uses:
|
||
messages.append({
|
||
"role": "user",
|
||
"content": "Please call submit_report with your analysis.",
|
||
})
|
||
continue
|
||
|
||
tool_results = []
|
||
done = False
|
||
for tu in tool_uses:
|
||
if tu.name == "submit_report":
|
||
brief = tu.input.get("brief", "")
|
||
detailed = tu.input.get("detailed", "")
|
||
tool_results.append({
|
||
"type": "tool_result",
|
||
"tool_use_id": tu.id,
|
||
"content": "Report submitted. Thank you.",
|
||
})
|
||
done = True
|
||
else:
|
||
result_text = _execute_tool(
|
||
tu.name, tu.input, target, cache, "(synthesis)",
|
||
turn + 1, verbose=verbose,
|
||
)
|
||
tool_results.append({
|
||
"type": "tool_result",
|
||
"tool_use_id": tu.id,
|
||
"content": result_text,
|
||
})
|
||
|
||
messages.append({"role": "user", "content": tool_results})
|
||
|
||
if done:
|
||
break
|
||
else:
|
||
print(" [AI] Warning: synthesis ran out of turns.", file=sys.stderr)
|
||
brief, detailed = _synthesize_from_cache(cache)
|
||
|
||
return brief, detailed
|
||
|
||
|
||
def _synthesize_from_cache(cache):
|
||
"""Build a best-effort report from cached directory summaries."""
|
||
dir_entries = cache.read_all_entries("dir")
|
||
if not dir_entries:
|
||
return ("(AI analysis incomplete — no data was cached)", "")
|
||
|
||
brief_parts = []
|
||
detail_parts = []
|
||
for entry in dir_entries:
|
||
rel = entry.get("relative_path", "?")
|
||
summary = entry.get("summary", "")
|
||
if summary:
|
||
detail_parts.append(f"**{rel}/**: {summary}")
|
||
brief_parts.append(summary)
|
||
|
||
brief = brief_parts[0] if brief_parts else "(AI analysis incomplete)"
|
||
detailed = "\n\n".join(detail_parts) if detail_parts else ""
|
||
return brief, detailed
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main orchestrator
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _run_investigation(client, target, report, show_hidden=False,
|
||
fresh=False, verbose=False, exclude=None):
|
||
"""Orchestrate the multi-pass investigation. Returns (brief, detailed, flags)."""
|
||
investigation_id, is_new = _get_investigation_id(target, fresh=fresh)
|
||
cache = _CacheManager(investigation_id, target)
|
||
tracker = _TokenTracker()
|
||
|
||
if is_new:
|
||
cache.write_meta(MODEL, _now_iso())
|
||
|
||
print(f" [AI] Investigation ID: {investigation_id}"
|
||
f"{'' if is_new else ' (resumed)'}", file=sys.stderr)
|
||
print(f" [AI] Cache: {cache.root}/", file=sys.stderr)
|
||
|
||
all_dirs = _discover_directories(target, show_hidden=show_hidden,
|
||
exclude=exclude)
|
||
|
||
total_files = sum((report.get("file_categories") or {}).values())
|
||
total_dirs = len(all_dirs)
|
||
if total_files < _SURVEY_MIN_FILES and total_dirs < _SURVEY_MIN_DIRS:
|
||
print(
|
||
f" [AI] Survey skipped — {total_files} files, {total_dirs} dirs "
|
||
f"(below threshold).",
|
||
file=sys.stderr,
|
||
)
|
||
survey = _default_survey()
|
||
else:
|
||
print(" [AI] Survey pass...", file=sys.stderr)
|
||
survey = _run_survey(client, target, report, tracker, verbose=verbose)
|
||
if survey:
|
||
print(
|
||
f" [AI] Survey: {survey['description']} "
|
||
f"(confidence {survey['confidence']:.2f})",
|
||
file=sys.stderr,
|
||
)
|
||
if survey.get("domain_notes"):
|
||
print(f" [AI] Survey notes: {survey['domain_notes']}", file=sys.stderr)
|
||
if survey.get("relevant_tools"):
|
||
print(
|
||
f" [AI] Survey relevant_tools: {', '.join(survey['relevant_tools'])}",
|
||
file=sys.stderr,
|
||
)
|
||
if survey.get("skip_tools"):
|
||
print(
|
||
f" [AI] Survey skip_tools: {', '.join(survey['skip_tools'])}",
|
||
file=sys.stderr,
|
||
)
|
||
else:
|
||
print(" [AI] Survey unavailable — proceeding without it.", file=sys.stderr)
|
||
|
||
to_investigate = []
|
||
cached_dirs = []
|
||
for d in all_dirs:
|
||
if cache.has_entry("dir", d):
|
||
cached_dirs.append(d)
|
||
rel = os.path.relpath(d, target)
|
||
print(f" [AI] Skipping (cached): {rel}/", file=sys.stderr)
|
||
else:
|
||
to_investigate.append(d)
|
||
|
||
cached_count = len(cached_dirs)
|
||
if cached_count:
|
||
print(f" [AI] Directories cached: {cached_count}", file=sys.stderr)
|
||
print(f" [AI] Directories to investigate: {len(to_investigate)}",
|
||
file=sys.stderr)
|
||
|
||
# Planning pass: decide where to invest depth.
|
||
if total_files < _SURVEY_MIN_FILES and total_dirs < _SURVEY_MIN_DIRS:
|
||
print(" [AI] Planning skipped (small target).", file=sys.stderr)
|
||
plan = _default_plan()
|
||
else:
|
||
plan_path = os.path.join(cache.root, "plan.json")
|
||
if not fresh and os.path.exists(plan_path):
|
||
try:
|
||
with open(plan_path) as f:
|
||
plan = json.load(f)
|
||
print(" [AI] Plan loaded from cache.", file=sys.stderr)
|
||
except (OSError, json.JSONDecodeError):
|
||
plan = None
|
||
else:
|
||
plan = None
|
||
|
||
if plan is None:
|
||
print(" [AI] Planning pass...", file=sys.stderr)
|
||
plan = _run_planning(
|
||
client, target, survey, report, all_dirs, tracker,
|
||
cached_dirs=cached_dirs, verbose=verbose,
|
||
)
|
||
if plan is None:
|
||
print(" [AI] Planning failed, using defaults.",
|
||
file=sys.stderr)
|
||
plan = _default_plan()
|
||
else:
|
||
# Save plan to cache (#11).
|
||
try:
|
||
with open(os.path.join(cache.root, "plan.json"), "w") as f:
|
||
json.dump(plan, f, indent=2)
|
||
except OSError:
|
||
pass
|
||
|
||
ordered, turn_map = _apply_plan(all_dirs, to_investigate, plan, target)
|
||
|
||
# Log plan summary.
|
||
skip_count = len(to_investigate) - len(ordered)
|
||
priority_count = sum(
|
||
1 for d in ordered if turn_map.get(d, _DEFAULT_TURNS) > _DEFAULT_TURNS
|
||
)
|
||
if skip_count or priority_count:
|
||
print(
|
||
f" [AI] Plan: {priority_count} priority, "
|
||
f"{skip_count} skipped, "
|
||
f"{len(ordered) - priority_count} default/shallow",
|
||
file=sys.stderr,
|
||
)
|
||
if plan.get("notes"):
|
||
print(f" [AI] Plan notes: {plan['notes']}", file=sys.stderr)
|
||
|
||
total = len(ordered)
|
||
turn_utilization = []
|
||
|
||
for i, dir_path in enumerate(ordered, 1):
|
||
dir_rel = os.path.relpath(dir_path, target)
|
||
if dir_rel == ".":
|
||
dir_rel = os.path.basename(target)
|
||
max_turns = turn_map.get(dir_path, _DEFAULT_TURNS)
|
||
print(
|
||
f" [AI] Investigating: {dir_rel}/ ({i}/{total}, "
|
||
f"{max_turns} turns)",
|
||
file=sys.stderr,
|
||
)
|
||
|
||
tracker.reset_loop()
|
||
summary, completeness = _run_dir_loop(
|
||
client, target, cache, tracker, dir_path,
|
||
max_turns=max_turns, verbose=verbose, survey=survey,
|
||
)
|
||
|
||
# Track turn utilization for quality metrics (#74).
|
||
turns_used = tracker._loop_turns
|
||
turn_utilization.append({
|
||
"dir": dir_rel,
|
||
"turns_allocated": max_turns,
|
||
"turns_used": turns_used,
|
||
"completeness": completeness,
|
||
})
|
||
|
||
if summary and not cache.has_entry("dir", dir_path):
|
||
entry = {
|
||
"path": dir_path,
|
||
"relative_path": os.path.relpath(dir_path, target),
|
||
"child_count": len([
|
||
n for n in os.listdir(dir_path)
|
||
if not n.startswith(".")
|
||
]) if os.path.isdir(dir_path) else 0,
|
||
"summary": summary,
|
||
"dominant_category": "unknown",
|
||
"notable_files": [],
|
||
"cached_at": _now_iso(),
|
||
}
|
||
if completeness is not None:
|
||
entry["completeness"] = completeness
|
||
cache.write_entry("dir", dir_path, entry)
|
||
|
||
cache.update_meta(
|
||
directories_investigated=total + cached_count,
|
||
end_time=_now_iso(),
|
||
)
|
||
|
||
# Emit plan evaluation (#74).
|
||
_write_plan_evaluation(cache, plan, turn_utilization)
|
||
|
||
print(" [AI] Synthesis pass...", file=sys.stderr)
|
||
brief, detailed = _run_synthesis(
|
||
client, target, cache, tracker, verbose=verbose,
|
||
)
|
||
|
||
# Read flags from flags.jsonl
|
||
flags = []
|
||
flags_path = os.path.join(cache.root, "flags.jsonl")
|
||
try:
|
||
with open(flags_path) as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line:
|
||
flags.append(json.loads(line))
|
||
except (OSError, json.JSONDecodeError):
|
||
pass
|
||
|
||
print(f" [AI] Total tokens used: {tracker.summary()}", file=sys.stderr)
|
||
|
||
return brief, detailed, flags
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Public interface
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def analyze_directory(report, target, verbose_tools=False, fresh=False,
|
||
exclude=None):
|
||
"""Run AI analysis on the directory. Returns (brief, detailed, flags).
|
||
|
||
Returns ("", "", []) if the API key is missing.
|
||
"""
|
||
api_key = _get_api_key()
|
||
if not api_key:
|
||
return "", "", []
|
||
|
||
print(" [AI] Starting multi-pass investigation...", file=sys.stderr)
|
||
|
||
client = anthropic.Anthropic(api_key=api_key)
|
||
|
||
try:
|
||
brief, detailed, flags = _run_investigation(
|
||
client, target, report, fresh=fresh, verbose=verbose_tools,
|
||
exclude=exclude,
|
||
)
|
||
except Exception as e:
|
||
print(f"Warning: AI analysis failed: {e}", file=sys.stderr)
|
||
return "", "", []
|
||
|
||
if not brief and not detailed:
|
||
print(" [AI] Warning: agent produced no output.", file=sys.stderr)
|
||
|
||
print(" [AI] Investigation complete.", file=sys.stderr)
|
||
return brief, detailed, flags
|