luminos/luminos_lib/ai.py
Jeff Smith 79bb10b9dc fix(ai): match target root dir by basename in _apply_plan() (#76)
The planner sees basename(target) in the tree output (e.g. "luminos_lib")
and uses that as the path in its plan. But _apply_plan() mapped the
target root to "." via os.path.relpath(), so the planner's path never
matched and the allocation was silently dropped.

Fix: register both "." and basename(target) as aliases for the target
root in the lookup table. Also log a warning when plan paths don't
match any known directory, so future mismatches are visible.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 20:38:55 -06:00

2060 lines
68 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""AI-powered directory analysis using a multi-pass, cache-driven agent loop.
Architecture:
1. Discover all directories under the target
2. Sort leaves-first (deepest directories first)
3. Run an isolated agent loop per directory (max 10 turns each)
4. Cache every file and directory summary to disk
5. Run a final synthesis pass reading only directory cache entries
Uses the Anthropic SDK for streaming, automatic retries, and token counting.
Uses tree-sitter for AST parsing and python-magic for file classification.
"""
import json
import os
import subprocess
import sys
from collections import namedtuple
from datetime import datetime, timezone
import anthropic
import magic
from luminos_lib.ast_parser import parse_structure
from luminos_lib.cache import _CacheManager, _get_investigation_id
from luminos_lib.prompts import (
_DIR_SYSTEM_PROMPT,
_PLANNING_SYSTEM_PROMPT,
_SURVEY_SYSTEM_PROMPT,
_SYNTHESIS_SYSTEM_PROMPT,
)
from luminos_lib.tree import build_tree, render_tree
MODEL = "claude-sonnet-4-20250514"
# Context budget: trigger early exit when a single API call's input_tokens
# (the actual size of the context window in use, NOT the cumulative sum
# across turns) approaches the model's real context limit. Sonnet 4 has
# a 200k context window; we leave a 30% safety margin for the response
# and any tool result we're about to append.
MAX_CONTEXT = 200_000
CONTEXT_BUDGET = int(MAX_CONTEXT * 0.70)
# Pricing per 1M tokens (Claude Sonnet).
INPUT_PRICE_PER_M = 3.00
OUTPUT_PRICE_PER_M = 15.00
# Directories to always skip during investigation.
_SKIP_DIRS = {
".git", "__pycache__", "node_modules", ".tox", ".mypy_cache",
".pytest_cache", ".venv", "venv", ".env", "dist", "build",
".eggs", "*.egg-info", ".svn", ".hg",
}
# Commands the run_command tool is allowed to execute.
_COMMAND_WHITELIST = {"wc", "file", "grep", "head", "tail", "stat", "du", "find"}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _get_api_key():
"""Read the Anthropic API key from the environment."""
key = os.environ.get("ANTHROPIC_API_KEY", "")
if not key:
print("Warning: ANTHROPIC_API_KEY not set. Skipping AI analysis.",
file=sys.stderr)
return key
def _path_is_safe(path, target):
"""Return True if *path* resolves to somewhere inside *target*."""
real = os.path.realpath(path)
target_real = os.path.realpath(target)
return real == target_real or real.startswith(target_real + os.sep)
def _now_iso():
return datetime.now(timezone.utc).isoformat()
def _should_skip_dir(name):
"""Return True if a directory name matches the skip list."""
if name in _SKIP_DIRS:
return True
for pattern in _SKIP_DIRS:
if pattern.startswith("*") and name.endswith(pattern[1:]):
return True
return False
# ---------------------------------------------------------------------------
# Token tracker
# ---------------------------------------------------------------------------
class _TokenTracker:
"""Track token usage across API calls.
Two distinct quantities are tracked:
- cumulative totals (total_*, loop_*) — for cost reporting
- last_input — the size of the context window on the most recent
call, used to detect approaching the model's context limit
Cumulative input is NOT a meaningful proxy for context size: each
turn's input_tokens already includes the full message history, so
summing across turns double-counts everything. Use last_input for
budget decisions, totals for billing. (See #44.)
"""
def __init__(self):
self.total_input = 0
self.total_output = 0
self.loop_input = 0
self.loop_output = 0
self.last_input = 0
self._loop_turns = 0
def record(self, usage):
"""Record usage from a single API call."""
inp = getattr(usage, "input_tokens", 0)
out = getattr(usage, "output_tokens", 0)
self.total_input += inp
self.total_output += out
self.loop_input += inp
self.loop_output += out
self.last_input = inp
self._loop_turns += 1
def reset_loop(self):
"""Reset per-loop counters (called between directory loops)."""
self.loop_input = 0
self.loop_output = 0
self.last_input = 0
self._loop_turns = 0
@property
def loop_total(self):
return self.loop_input + self.loop_output
def budget_exceeded(self):
"""True when the most recent call's context exceeded the budget."""
return self.last_input > CONTEXT_BUDGET
def summary(self):
cost_in = self.total_input * INPUT_PRICE_PER_M / 1_000_000
cost_out = self.total_output * OUTPUT_PRICE_PER_M / 1_000_000
cost = cost_in + cost_out
return (f"{self.total_input:,} input / {self.total_output:,} output "
f"(approx ${cost:.2f})")
# ---------------------------------------------------------------------------
# Tool definitions
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Tool registry
#
# Tools are declared once via register_tool() at the bottom of the tool
# implementations section. Each registration lands its schema in one or
# more scope lists (_DIR_TOOLS / _SYNTHESIS_TOOLS / _SURVEY_TOOLS) and
# its handler in _TOOL_DISPATCH (used by _execute_tool()).
#
# Tools intercepted by the loop body — submit_report and submit_survey —
# register their schema only and have no handler entry.
# ---------------------------------------------------------------------------
_DIR_TOOLS = []
_SYNTHESIS_TOOLS = []
_SURVEY_TOOLS = []
_PLANNING_TOOLS = []
_TOOL_DISPATCH = {}
_TOOL_REGISTRIES = {
"dir": _DIR_TOOLS,
"synthesis": _SYNTHESIS_TOOLS,
"survey": _SURVEY_TOOLS,
"planning": _PLANNING_TOOLS,
}
def register_tool(name, description, schema, scopes, handler=None):
"""Register a tool's schema in one or more loop scopes and its handler.
A single tool can be registered multiple times with different schemas
in different scopes (submit_report has different schemas for the dir
and synthesis loops). The handler is global — pass handler= once and
omit it on subsequent registrations under the same name.
"""
schema_entry = {
"name": name,
"description": description,
"input_schema": schema,
}
for scope in scopes:
_TOOL_REGISTRIES[scope].append(schema_entry)
if handler is not None:
_TOOL_DISPATCH[name] = handler
# ---------------------------------------------------------------------------
# Tool implementations
# ---------------------------------------------------------------------------
def _tool_read_file(args, target, _cache):
path = args.get("path", "")
max_bytes = args.get("max_bytes", 4096)
if not os.path.isabs(path):
path = os.path.join(target, path)
if not _path_is_safe(path, target):
return f"Error: path '{path}' is outside the target directory."
try:
file_size = os.path.getsize(path)
with open(path, "r", errors="replace") as f:
content = f.read(max_bytes)
if not content:
return "(empty file)"
if file_size > max_bytes:
content += (
f"\n\n[TRUNCATED — showed {max_bytes} of {file_size} bytes. "
f"Call again with a larger max_bytes or use "
f"run_command('tail -n ... {os.path.relpath(path, target)}') "
f"to see the rest.]"
)
return content
except OSError as e:
return f"Error reading file: {e}"
def _tool_list_directory(args, target, _cache):
path = args.get("path", target)
show_hidden = args.get("show_hidden", False)
if not os.path.isabs(path):
path = os.path.join(target, path)
if not _path_is_safe(path, target):
return f"Error: path '{path}' is outside the target directory."
if not os.path.isdir(path):
return f"Error: '{path}' is not a directory."
try:
entries = sorted(os.listdir(path))
lines = []
for name in entries:
if not show_hidden and name.startswith("."):
continue
full = os.path.join(path, name)
try:
st = os.stat(full)
mime = magic.from_file(full, mime=True) if not os.path.isdir(full) else None
if os.path.isdir(full):
lines.append(f" {name}/ (dir)")
else:
mime_str = f" [{mime}]" if mime else ""
lines.append(f" {name} ({st.st_size} bytes){mime_str}")
except OSError:
lines.append(f" {name} (stat failed)")
return "\n".join(lines) if lines else "(empty directory)"
except OSError as e:
return f"Error listing directory: {e}"
def _tool_run_command(args, target, _cache):
command = args.get("command", "")
parts = command.split()
if not parts:
return "Error: empty command."
binary = os.path.basename(parts[0])
if binary not in _COMMAND_WHITELIST:
return (
f"Error: '{binary}' is not allowed. "
f"Whitelist: {', '.join(sorted(_COMMAND_WHITELIST))}"
)
try:
result = subprocess.run(
command, shell=True, capture_output=True, text=True,
timeout=15, cwd=target,
)
output = result.stdout
if result.returncode != 0 and result.stderr:
output += f"\n(stderr: {result.stderr.strip()})"
return output.strip() if output.strip() else "(no output)"
except subprocess.TimeoutExpired:
return "Error: command timed out after 15 seconds."
except OSError as e:
return f"Error running command: {e}"
def _tool_parse_structure(args, target, _cache):
path = args.get("path", "")
if not os.path.isabs(path):
path = os.path.join(target, path)
if not _path_is_safe(path, target):
return f"Error: path '{path}' is outside the target directory."
return parse_structure(path)
def _tool_write_cache(args, _target, cache):
cache_type = args.get("cache_type", "")
path = args.get("path", "")
data = args.get("data", {})
if cache_type not in ("file", "dir"):
return "Error: cache_type must be 'file' or 'dir'."
return cache.write_entry(cache_type, path, data)
def _tool_read_cache(args, _target, cache):
cache_type = args.get("cache_type", "")
path = args.get("path", "")
if cache_type not in ("file", "dir"):
return "Error: cache_type must be 'file' or 'dir'."
entry = cache.read_entry(cache_type, path)
if entry is None:
return "null"
return json.dumps(entry, indent=2)
def _tool_list_cache(args, _target, cache):
cache_type = args.get("cache_type", "")
if cache_type not in ("file", "dir"):
return "Error: cache_type must be 'file' or 'dir'."
paths = cache.list_entries(cache_type)
if not paths:
return "(no cached entries)"
return "\n".join(paths)
def _tool_think(args, _target, _cache):
obs = args.get("observation", "")
hyp = args.get("hypothesis", "")
nxt = args.get("next_action", "")
print(f" [AI] THINK", file=sys.stderr)
print(f" observation: {obs}", file=sys.stderr)
print(f" hypothesis: {hyp}", file=sys.stderr)
print(f" next_action: {nxt}", file=sys.stderr)
return "ok"
def _tool_checkpoint(args, _target, _cache):
learned = args.get("learned", "")
unknown = args.get("still_unknown", "")
phase = args.get("next_phase", "")
print(f" [AI] CHECKPOINT", file=sys.stderr)
print(f" learned: {learned}", file=sys.stderr)
print(f" still_unknown: {unknown}", file=sys.stderr)
print(f" next_phase: {phase}", file=sys.stderr)
return "ok"
def _tool_flag(args, _target, cache):
path = args.get("path", "general")
finding = args.get("finding", "")
severity = args.get("severity", "info")
print(f" [AI] FLAG [{severity.upper()}] {path}", file=sys.stderr)
print(f" {finding}", file=sys.stderr)
flags_path = os.path.join(cache.root, "flags.jsonl")
entry = {"path": path, "finding": finding, "severity": severity}
try:
with open(flags_path, "a") as f:
f.write(json.dumps(entry) + "\n")
except OSError:
pass
return "ok"
# ---------------------------------------------------------------------------
# Tool registrations
#
# Order within each scope is preserved to keep the agent-visible tool list
# stable. Tools that appear in two scopes (flag) and tools whose schema
# differs by scope (submit_report) are registered once per scope.
# ---------------------------------------------------------------------------
_FLAG_DESCRIPTION = (
"Mark a file, directory, or finding as notable or anomalous. "
"Call this immediately when you discover something surprising, "
"concerning, or important — do not save it for the report."
)
_FLAG_SCHEMA = {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative path, or 'general'.",
},
"finding": {
"type": "string",
"description": "What you found.",
},
"severity": {
"type": "string",
"enum": ["info", "concern", "critical"],
"description": "info | concern | critical",
},
},
"required": ["path", "finding", "severity"],
}
# --- Dir loop tools ---
register_tool(
name="read_file",
description=(
"Read and return the contents of a file. Path must be inside "
"the target directory."
),
schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative path to the file.",
},
"max_bytes": {
"type": "integer",
"description": "Maximum bytes to read (default 4096).",
},
},
"required": ["path"],
},
scopes=["dir"],
handler=_tool_read_file,
)
register_tool(
name="list_directory",
description=(
"List the contents of a directory with file sizes and types."
),
schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative path to the directory.",
},
"show_hidden": {
"type": "boolean",
"description": "Include hidden files (default false).",
},
},
"required": ["path"],
},
scopes=["dir"],
handler=_tool_list_directory,
)
register_tool(
name="run_command",
description=(
"Run a read-only shell command. Allowed binaries: "
"wc, file, grep, head, tail, stat, du, find."
),
schema={
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute.",
},
},
"required": ["command"],
},
scopes=["dir"],
handler=_tool_run_command,
)
register_tool(
name="parse_structure",
description=(
"Parse a source file using tree-sitter and return its structural "
"skeleton: functions, classes, imports, and code metrics. "
"Supported: Python, JavaScript, TypeScript, Rust, Go."
),
schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the source file to parse.",
},
},
"required": ["path"],
},
scopes=["dir"],
handler=_tool_parse_structure,
)
register_tool(
name="write_cache",
description=(
"Write a summary cache entry for a file or directory. The data "
"must NOT contain raw file contents — summaries only."
),
schema={
"type": "object",
"properties": {
"cache_type": {
"type": "string",
"enum": ["file", "dir"],
"description": "'file' or 'dir'.",
},
"path": {
"type": "string",
"description": "The path being cached.",
},
"data": {
"type": "object",
"description": (
"Cache entry. Files: {path, relative_path, size_bytes, "
"category, summary, notable, notable_reason, "
"confidence, confidence_reason, cached_at}. "
"Dirs: {path, relative_path, child_count, summary, "
"dominant_category, notable_files, "
"confidence, confidence_reason, cached_at}. "
"Always set confidence (0.01.0); see system prompt "
"for calibration. Set confidence_reason only when "
"confidence < 0.7."
),
},
},
"required": ["cache_type", "path", "data"],
},
scopes=["dir"],
handler=_tool_write_cache,
)
register_tool(
name="think",
description=(
"Record your reasoning before choosing which file or directory "
"to investigate next. Call this when deciding what to look at "
"— not before every individual tool call."
),
schema={
"type": "object",
"properties": {
"observation": {
"type": "string",
"description": "What you have observed so far.",
},
"hypothesis": {
"type": "string",
"description": "Your hypothesis about the directory.",
},
"next_action": {
"type": "string",
"description": "What you plan to investigate next and why.",
},
},
"required": ["observation", "hypothesis", "next_action"],
},
scopes=["dir"],
handler=_tool_think,
)
register_tool(
name="checkpoint",
description=(
"Summarize what you have learned so far about this directory "
"and what you still need to determine. Call this after completing "
"a significant cluster of files — not after every file."
),
schema={
"type": "object",
"properties": {
"learned": {
"type": "string",
"description": "What you have learned so far.",
},
"still_unknown": {
"type": "string",
"description": "What you still need to determine.",
},
"next_phase": {
"type": "string",
"description": "What you will investigate next.",
},
},
"required": ["learned", "still_unknown", "next_phase"],
},
scopes=["dir"],
handler=_tool_checkpoint,
)
register_tool(
name="flag",
description=_FLAG_DESCRIPTION,
schema=_FLAG_SCHEMA,
scopes=["dir"],
handler=_tool_flag,
)
register_tool(
name="submit_report",
description=(
"Submit the directory summary. This ends the investigation loop."
),
schema={
"type": "object",
"properties": {
"summary": {
"type": "string",
"description": "1-3 sentence summary of the directory.",
},
"completeness": {
"type": "number",
"description": (
"Self-rated investigation completeness (0.0-1.0). "
"1.0 = examined every relevant file thoroughly. "
"0.5 = examined about half, or skimmed most. "
"< 0.3 = barely scratched the surface."
),
},
},
"required": ["summary", "completeness"],
},
scopes=["dir"],
)
# --- Synthesis tools ---
register_tool(
name="read_cache",
description="Read a previously cached summary for a file or directory.",
schema={
"type": "object",
"properties": {
"cache_type": {
"type": "string",
"enum": ["file", "dir"],
},
"path": {
"type": "string",
"description": "The path to look up.",
},
},
"required": ["cache_type", "path"],
},
scopes=["synthesis"],
handler=_tool_read_cache,
)
register_tool(
name="list_cache",
description="List all cached entry paths of a given type.",
schema={
"type": "object",
"properties": {
"cache_type": {
"type": "string",
"enum": ["file", "dir"],
},
},
"required": ["cache_type"],
},
scopes=["synthesis"],
handler=_tool_list_cache,
)
register_tool(
name="flag",
description=_FLAG_DESCRIPTION,
schema=_FLAG_SCHEMA,
scopes=["synthesis"],
)
register_tool(
name="submit_report",
description="Submit the final analysis report.",
schema={
"type": "object",
"properties": {
"brief": {
"type": "string",
"description": "2-4 sentence summary.",
},
"detailed": {
"type": "string",
"description": "Thorough breakdown.",
},
},
"required": ["brief", "detailed"],
},
scopes=["synthesis"],
)
# --- Survey tools ---
register_tool(
name="submit_survey",
description=(
"Submit the reconnaissance survey. Call exactly once."
),
schema={
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "Plain-language description of the target.",
},
"approach": {
"type": "string",
"description": "Recommended analytical approach.",
},
"relevant_tools": {
"type": "array",
"items": {"type": "string"},
"description": "Tool names the dir loop should lean on.",
},
"skip_tools": {
"type": "array",
"items": {"type": "string"},
"description": "Tool names whose use would be wrong here.",
},
"domain_notes": {
"type": "string",
"description": "Short actionable hint, or empty string.",
},
"confidence": {
"type": "number",
"description": "0.01.0 confidence in this survey.",
},
},
"required": [
"description", "approach", "relevant_tools",
"skip_tools", "domain_notes", "confidence",
],
},
scopes=["survey"],
)
# --- Planning tools ---
register_tool(
name="submit_plan",
description=(
"Submit the investigation plan. Call exactly once."
),
schema={
"type": "object",
"properties": {
"priority_dirs": {
"type": "array",
"items": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative directory path.",
},
"reason": {
"type": "string",
"description": "Why this dir deserves deep investigation.",
},
"suggested_turns": {
"type": "integer",
"description": "Suggested turns (15-20).",
},
},
"required": ["path", "reason", "suggested_turns"],
},
"description": "Directories to investigate deeply.",
},
"shallow_dirs": {
"type": "array",
"items": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative directory path.",
},
"reason": {
"type": "string",
"description": "Why a shallow pass is sufficient.",
},
},
"required": ["path", "reason"],
},
"description": "Directories needing only a quick pass.",
},
"skip_dirs": {
"type": "array",
"items": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative directory path.",
},
"reason": {
"type": "string",
"description": "Why this dir should be skipped.",
},
},
"required": ["path", "reason"],
},
"description": "Directories to skip entirely.",
},
"investigation_order": {
"type": "string",
"enum": ["leaf-first", "priority-first"],
"description": "leaf-first or priority-first (leaf-first within bands).",
},
"notes": {
"type": "string",
"description": "Cross-cutting notes for per-directory agents, or empty.",
},
},
"required": [
"priority_dirs", "shallow_dirs", "skip_dirs",
"investigation_order", "notes",
],
},
scopes=["planning"],
)
def _execute_tool(name, args, target, cache, dir_rel, turn, verbose=False):
"""Execute a tool by name and return the result string."""
handler = _TOOL_DISPATCH.get(name)
if handler is None:
return f"Error: unknown tool '{name}'."
result = handler(args, target, cache)
cache.log_turn(dir_rel, turn, name,
{k: v for k, v in args.items() if k != "data"},
len(result))
if verbose:
preview = result[:200] + "..." if len(result) > 200 else result
print(f" [AI] <- {len(result)} chars: {preview}", file=sys.stderr)
return result
# ---------------------------------------------------------------------------
# Streaming API caller
# ---------------------------------------------------------------------------
def _call_api_streaming(client, system, messages, tools, tracker):
"""Call Claude via streaming. Print tool decisions in real-time.
Returns (content_blocks, usage) where content_blocks is the list of
content blocks from the response.
"""
with client.messages.stream(
model=MODEL,
max_tokens=4096,
system=system,
messages=messages,
tools=tools,
) as stream:
# Print tool call names as they arrive
current_tool = None
for event in stream:
if event.type == "content_block_start":
block = event.content_block
if block.type == "tool_use":
current_tool = block.name
# We'll print the full args after the block is complete
elif event.type == "content_block_stop":
current_tool = None
response = stream.get_final_message()
tracker.record(response.usage)
return response.content, response.usage
# ---------------------------------------------------------------------------
# Directory discovery
# ---------------------------------------------------------------------------
def _discover_directories(target, show_hidden=False, exclude=None):
"""Walk the target and return all directories sorted leaves-first."""
extra = set(exclude or [])
dirs = []
target_real = os.path.realpath(target)
for root, subdirs, _files in os.walk(target_real, topdown=True):
subdirs[:] = [
d for d in subdirs
if not _should_skip_dir(d)
and d not in extra
and (show_hidden or not d.startswith("."))
]
dirs.append(root)
dirs.sort(key=lambda d: (-d.count(os.sep), d))
return dirs
# ---------------------------------------------------------------------------
# Per-directory agent loop
# ---------------------------------------------------------------------------
def _build_dir_context(dir_path):
lines = []
try:
entries = sorted(os.listdir(dir_path))
for name in entries:
if name.startswith("."):
continue
full = os.path.join(dir_path, name)
try:
st = os.stat(full)
if os.path.isdir(full):
lines.append(f" {name}/ (dir)")
else:
mime = magic.from_file(full, mime=True)
lines.append(f" {name} ({st.st_size} bytes) [{mime}]")
except OSError:
lines.append(f" {name} (stat failed)")
except OSError:
lines.append(" (could not list directory)")
return "Directory contents:\n" + "\n".join(lines) if lines else "(empty)"
def _get_child_summaries(dir_path, cache):
parts = []
try:
for name in sorted(os.listdir(dir_path)):
child = os.path.join(dir_path, name)
if not os.path.isdir(child):
continue
entry = cache.read_entry("dir", child)
if entry:
rel = entry.get("relative_path", name)
summary = entry.get("summary", "(no summary)")
parts.append(f"- {rel}/: {summary}")
except OSError:
pass
if parts:
return "\n".join(parts)
# Distinguish actual leaves from parents whose children haven't been
# investigated yet. The old placeholder claimed "leaf directory" even
# when children existed but were not yet cached, which silently
# degraded parent context.
try:
has_subdirs = any(
os.path.isdir(os.path.join(dir_path, name))
for name in os.listdir(dir_path)
if not name.startswith(".")
)
except OSError:
has_subdirs = False
if has_subdirs:
return "(child directories exist but have not been investigated yet)"
return "(none: this is a leaf directory)"
_SURVEY_CONFIDENCE_THRESHOLD = 0.5
_PROTECTED_DIR_TOOLS = {"submit_report"}
# Survey-skip thresholds. Skip the survey only when BOTH are below.
# See #46 for the plan to revisit these with empirical data.
_SURVEY_MIN_FILES = 5
_SURVEY_MIN_DIRS = 2
def _default_survey():
"""Synthetic survey for targets too small to justify the API call.
confidence=0.0 ensures _filter_dir_tools() never enforces skip_tools
based on this synthetic value — the dir loop keeps its full toolbox.
"""
return {
"description": "Small target — survey skipped.",
"approach": (
"The target is small enough to investigate exhaustively. "
"Read every file directly."
),
"relevant_tools": [],
"skip_tools": [],
"domain_notes": "",
"confidence": 0.0,
}
def _format_survey_block(survey):
"""Render survey output as a labeled text block for the dir prompt."""
if not survey:
return "(no survey available)"
lines = [
f"Description: {survey.get('description', '')}",
f"Approach: {survey.get('approach', '')}",
]
notes = survey.get("domain_notes", "")
if notes:
lines.append(f"Domain notes: {notes}")
relevant = survey.get("relevant_tools") or []
if relevant:
lines.append(f"Relevant tools (lean on these): {', '.join(relevant)}")
skip = survey.get("skip_tools") or []
if skip:
lines.append(f"Skip tools (already removed from your toolbox): "
f"{', '.join(skip)}")
return "\n".join(lines)
def _filter_dir_tools(survey):
"""Return _DIR_TOOLS with skip_tools removed, gated on confidence.
- Returns full list if survey is None or confidence < threshold.
- Always preserves control-flow tools in _PROTECTED_DIR_TOOLS.
- Tool names in skip_tools that don't match anything are silently ignored.
"""
if not survey:
return list(_DIR_TOOLS)
try:
confidence = float(survey.get("confidence", 0.0) or 0.0)
except (TypeError, ValueError):
confidence = 0.0
if confidence < _SURVEY_CONFIDENCE_THRESHOLD:
return list(_DIR_TOOLS)
skip = set(survey.get("skip_tools") or []) - _PROTECTED_DIR_TOOLS
if not skip:
return list(_DIR_TOOLS)
return [t for t in _DIR_TOOLS if t["name"] not in skip]
_DirLoopContext = namedtuple(
"_DirLoopContext", ["dir_rel", "system", "dir_tools", "messages"],
)
def _build_dir_loop_context(dir_path, target, cache, survey, max_turns):
"""Assemble the static inputs the dir loop needs before its first turn.
Pure data assembly: reads the cache for child summaries, builds the
formatted system prompt, filters the tool list, and returns the seed
user message. No writes.
"""
dir_rel = os.path.relpath(dir_path, target)
if dir_rel == ".":
dir_rel = os.path.basename(target)
context = _build_dir_context(dir_path)
child_summaries = _get_child_summaries(dir_path, cache)
survey_context = _format_survey_block(survey)
dir_tools = _filter_dir_tools(survey)
system = _DIR_SYSTEM_PROMPT.format(
dir_path=dir_path,
dir_rel=dir_rel,
max_turns=max_turns,
context=context,
child_summaries=child_summaries,
survey_context=survey_context,
)
messages = [
{
"role": "user",
"content": (
"Investigate this directory now. Use parse_structure for "
"source files, read_file for others, cache summaries, and "
"call submit_report. Batch tool calls for efficiency."
),
},
]
return _DirLoopContext(
dir_rel=dir_rel, system=system, dir_tools=dir_tools, messages=messages,
)
def _flush_partial_dir_entry(dir_path, target, cache):
"""Write a partial dir cache entry from any already-cached file entries.
Called when the per-loop context budget is exceeded before the agent
reaches submit_report. Idempotent: returns "" without writing if a dir
entry already exists. Returns the partial summary string (empty if no
file entries were available to synthesize from).
"""
if cache.has_entry("dir", dir_path):
return ""
dir_real = os.path.realpath(dir_path)
file_entries = [
e for e in cache.read_all_entries("file")
if os.path.realpath(e.get("path", "")).startswith(dir_real + os.sep)
or os.path.dirname(
os.path.join(target, e.get("relative_path", ""))
) == dir_real
]
if file_entries:
file_summaries = [
e["summary"] for e in file_entries if e.get("summary")
]
notable = [
e.get("relative_path", e.get("path", ""))
for e in file_entries if e.get("notable")
]
partial_summary = " ".join(file_summaries)
cache.write_entry("dir", dir_path, {
"path": dir_path,
"relative_path": os.path.relpath(dir_path, target),
"child_count": len([
n for n in os.listdir(dir_path)
if not n.startswith(".")
]) if os.path.isdir(dir_path) else 0,
"summary": partial_summary,
"dominant_category": "unknown",
"notable_files": notable,
"partial": True,
"partial_reason": "context budget reached",
"cached_at": _now_iso(),
})
return partial_summary
cache.write_entry("dir", dir_path, {
"path": dir_path,
"relative_path": os.path.relpath(dir_path, target),
"child_count": 0,
"summary": ("Investigation incomplete — context budget "
"reached before any files were processed."),
"dominant_category": "unknown",
"notable_files": [],
"partial": True,
"partial_reason": (
"context budget reached before files processed"),
"cached_at": _now_iso(),
})
return ""
def _handle_turn_response(content_blocks, messages, target, cache, dir_rel,
turn, verbose):
"""Process one turn's response: print, append, dispatch tools.
Mutates `messages` in place: appends the assistant message, then either
a "please call submit_report" nudge (no tool_uses) or the tool_results
user message. Recognizes submit_report as the loop's done signal and
extracts its summary. Returns (done, summary).
"""
for b in content_blocks:
if b.type == "text" and b.text.strip():
for line in b.text.strip().split("\n"):
print(f" [AI] {line}", file=sys.stderr)
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items() if k != "data"
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_report with your summary.",
})
return False, None, None
tool_results = []
done = False
summary = None
completeness = None
for tu in tool_uses:
if tu.name == "submit_report":
summary = tu.input.get("summary", "")
try:
completeness = float(tu.input.get("completeness", 0) or 0)
except (TypeError, ValueError):
completeness = None
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Summary submitted.",
})
done = True
else:
result_text = _execute_tool(
tu.name, tu.input, target, cache, dir_rel,
turn + 1, verbose=verbose,
)
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": result_text,
})
messages.append({"role": "user", "content": tool_results})
return done, summary, completeness
def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14,
verbose=False, survey=None):
"""Run an isolated agent loop for a single directory.
Returns (summary, completeness) where completeness is the agent's
self-rated investigation thoroughness (0.0-1.0), or None if not reported.
"""
ctx = _build_dir_loop_context(
dir_path, target, cache, survey, max_turns,
)
summary = None
completeness = None
for turn in range(max_turns):
if tracker.budget_exceeded():
print(f" [AI] Context budget reached — exiting early "
f"(context size {tracker.last_input:,} > "
f"{CONTEXT_BUDGET:,} budget; "
f"loop spend {tracker.loop_total:,} tokens)",
file=sys.stderr)
partial = _flush_partial_dir_entry(dir_path, target, cache)
if partial and not summary:
summary = partial
break
try:
content_blocks, _usage = _call_api_streaming(
client, ctx.system, ctx.messages, ctx.dir_tools, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
break
done, turn_summary, turn_completeness = _handle_turn_response(
content_blocks, ctx.messages, target, cache,
ctx.dir_rel, turn, verbose,
)
if turn_summary is not None:
summary = turn_summary
if turn_completeness is not None:
completeness = turn_completeness
if done:
break
else:
print(f" [AI] Warning: max turns reached for {ctx.dir_rel}",
file=sys.stderr)
return summary, completeness
def _block_to_dict(block):
"""Convert an SDK content block to a plain dict for message history."""
if block.type == "text":
return {"type": "text", "text": block.text}
elif block.type == "tool_use":
return {"type": "tool_use", "id": block.id,
"name": block.name, "input": block.input}
return {"type": block.type}
# ---------------------------------------------------------------------------
# Synthesis pass
# ---------------------------------------------------------------------------
def _format_survey_signals(signals):
"""Render the survey_signals dict as a labeled text block."""
if not signals or not signals.get("total_files"):
return "(no files classified)"
lines = [f"Total files: {signals.get('total_files', 0)}", ""]
ext_hist = signals.get("extension_histogram") or {}
if ext_hist:
lines.append("Extensions (top, by count):")
for ext, n in ext_hist.items():
lines.append(f" {ext}: {n}")
lines.append("")
descs = signals.get("file_descriptions") or {}
if descs:
lines.append("file --brief output (top, by count):")
for desc, n in descs.items():
lines.append(f" {desc}: {n}")
lines.append("")
samples = signals.get("filename_samples") or []
if samples:
lines.append("Filename samples (evenly drawn):")
for name in samples:
lines.append(f" {name}")
return "\n".join(lines).rstrip()
def _run_survey(client, target, report, tracker, max_turns=3, verbose=False):
"""Run the reconnaissance survey pass.
Returns a survey dict on success, or None on failure / out-of-turns.
Survey is advisory — callers must treat None as "no survey context".
"""
signals = report.get("survey_signals") or {}
survey_signals_text = _format_survey_signals(signals)
try:
tree_node = build_tree(target, max_depth=2)
tree_preview = render_tree(tree_node)
except Exception:
tree_preview = "(tree unavailable)"
tool_names = [t["name"] for t in _DIR_TOOLS if t["name"] != "submit_report"]
available_tools = ", ".join(tool_names)
system = _SURVEY_SYSTEM_PROMPT.format(
target=target,
survey_signals=survey_signals_text,
tree_preview=tree_preview,
available_tools=available_tools,
)
messages = [
{
"role": "user",
"content": (
"All inputs are in the system prompt above. Call "
"submit_survey now — no other tool calls needed."
),
},
]
survey = None
for turn in range(max_turns):
try:
content_blocks, _usage = _call_api_streaming(
client, system, messages, _SURVEY_TOOLS, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
return None
for b in content_blocks:
if b.type == "text" and b.text.strip():
for line in b.text.strip().split("\n"):
print(f" [AI] {line}", file=sys.stderr)
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items()
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_survey.",
})
continue
tool_results = []
done = False
for tu in tool_uses:
if tu.name == "submit_survey":
survey = {
"description": tu.input.get("description", ""),
"approach": tu.input.get("approach", ""),
"relevant_tools": tu.input.get("relevant_tools", []) or [],
"skip_tools": tu.input.get("skip_tools", []) or [],
"domain_notes": tu.input.get("domain_notes", ""),
"confidence": float(tu.input.get("confidence", 0.0) or 0.0),
}
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Survey received. Thank you.",
})
done = True
else:
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Unknown tool. Call submit_survey.",
"is_error": True,
})
messages.append({"role": "user", "content": tool_results})
if done:
break
else:
print(" [AI] Warning: survey ran out of turns.", file=sys.stderr)
return survey
# ---------------------------------------------------------------------------
# Planning pass
# ---------------------------------------------------------------------------
# Turn allocation defaults.
_DEFAULT_TURNS = 10
_SHALLOW_TURNS = 5
_MAX_TURNS_CEILING = 25
_BASE_TURNS_PER_DIR = 10
def _default_plan():
"""Fallback plan when planning is skipped or fails.
All directories get default turns, leaf-first order, no overrides.
"""
return {
"priority_dirs": [],
"shallow_dirs": [],
"skip_dirs": [],
"investigation_order": "leaf-first",
"notes": "",
}
def _run_planning(client, target, survey, report, all_dirs, tracker,
cached_dirs=None, max_turns=3, verbose=False):
"""Run the planning pass. Returns a plan dict or None on failure.
The planning pass decides where to invest investigation depth.
It runs after the survey and before the per-directory loops.
"""
cached_dirs = cached_dirs or []
dir_count = len(all_dirs)
global_budget = _BASE_TURNS_PER_DIR * dir_count
survey_context = _format_survey_block(survey) if survey else "(no survey available)"
try:
tree_node = build_tree(target, max_depth=6)
tree_text = render_tree(tree_node)
except Exception:
tree_text = "(tree unavailable)"
signals = report.get("survey_signals") or {}
file_signals = _format_survey_signals(signals)
cached_rel = []
for d in cached_dirs:
cached_rel.append(os.path.relpath(d, target))
cached_text = ", ".join(cached_rel) if cached_rel else "(none)"
system = _PLANNING_SYSTEM_PROMPT.format(
target=target,
survey_context=survey_context,
tree_text=tree_text,
file_signals=file_signals,
dir_count=dir_count,
cached_dirs=cached_text,
default_turns=_DEFAULT_TURNS,
global_budget=global_budget,
)
messages = [
{
"role": "user",
"content": (
"All inputs are in the system prompt above. Call "
"submit_plan now."
),
},
]
plan = None
for turn in range(max_turns):
try:
content_blocks, _usage = _call_api_streaming(
client, system, messages, _PLANNING_TOOLS, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
return None
for b in content_blocks:
if b.type == "text" and b.text.strip():
for line in b.text.strip().split("\n"):
print(f" [AI] {line}", file=sys.stderr)
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items()
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_plan.",
})
continue
tool_results = []
done = False
for tu in tool_uses:
if tu.name == "submit_plan":
plan = {
"priority_dirs": tu.input.get("priority_dirs", []) or [],
"shallow_dirs": tu.input.get("shallow_dirs", []) or [],
"skip_dirs": tu.input.get("skip_dirs", []) or [],
"investigation_order": tu.input.get(
"investigation_order", "leaf-first"
),
"notes": tu.input.get("notes", ""),
}
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Plan received. Thank you.",
})
done = True
else:
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Unknown tool. Call submit_plan.",
"is_error": True,
})
messages.append({"role": "user", "content": tool_results})
if done:
break
else:
print(" [AI] Warning: planning ran out of turns.", file=sys.stderr)
return plan
def _apply_plan(all_dirs, to_investigate, plan, target):
"""Apply the plan to produce an ordered dir list and turn map.
Returns (ordered_dirs, turn_map) where:
- ordered_dirs: list of absolute dir paths in investigation order
- turn_map: dict of {abs_dir_path: max_turns}
Pure function: no I/O, no cache, no API calls.
"""
if plan is None:
return list(to_investigate), {}
# Build lookup from relative path to absolute path.
# The target root maps to "." via relpath, but the planner sees
# basename(target) in the tree output and uses that as the path.
# Register both so either form matches (#76).
rel_to_abs = {}
for d in all_dirs:
rel = os.path.relpath(d, target)
rel_to_abs[rel] = d
if rel == ".":
rel_to_abs[os.path.basename(d)] = d
# Classify directories by tier.
skip_set = set()
priority_set = set()
shallow_set = set()
turn_map = {}
unmatched = []
for entry in plan.get("skip_dirs", []):
rel = entry.get("path", "")
if rel in rel_to_abs:
skip_set.add(rel_to_abs[rel])
else:
unmatched.append(rel)
for entry in plan.get("priority_dirs", []):
rel = entry.get("path", "")
suggested = entry.get("suggested_turns", 15)
capped = min(suggested, _MAX_TURNS_CEILING)
if rel in rel_to_abs:
abs_path = rel_to_abs[rel]
priority_set.add(abs_path)
turn_map[abs_path] = capped
else:
unmatched.append(rel)
for entry in plan.get("shallow_dirs", []):
rel = entry.get("path", "")
if rel in rel_to_abs:
abs_path = rel_to_abs[rel]
shallow_set.add(abs_path)
turn_map[abs_path] = _SHALLOW_TURNS
else:
unmatched.append(rel)
if unmatched:
print(
f" [AI] Warning: plan referenced unknown dirs: "
f"{', '.join(unmatched)}",
file=sys.stderr,
)
# Remove skipped dirs from the investigation list.
remaining = [d for d in to_investigate if d not in skip_set]
# Order by bands. Both strategies preserve leaf-first within bands.
order = plan.get("investigation_order", "leaf-first")
if order == "priority-first":
priority_band = [d for d in remaining if d in priority_set]
shallow_band = [d for d in remaining if d in shallow_set]
default_band = [
d for d in remaining
if d not in priority_set and d not in shallow_set
]
ordered = priority_band + default_band + shallow_band
else:
# leaf-first: keep the original order (already leaf-first from
# _discover_directories), just remove skipped dirs.
ordered = remaining
return ordered, turn_map
def _write_plan_evaluation(cache, plan, turn_utilization):
"""Write plan_evaluation.json comparing plan predictions to actual results.
This is the planning pass's report card: did we allocate turns well?
"""
# Build a lookup of what the plan predicted per dir.
predicted = {}
for entry in (plan or {}).get("priority_dirs", []):
predicted[entry["path"]] = {
"tier": "priority",
"suggested_turns": entry.get("suggested_turns", 15),
}
for entry in (plan or {}).get("shallow_dirs", []):
predicted[entry["path"]] = {
"tier": "shallow",
"suggested_turns": _SHALLOW_TURNS,
}
for entry in (plan or {}).get("skip_dirs", []):
predicted[entry["path"]] = {
"tier": "skip",
"suggested_turns": 0,
}
# Compare predictions to actual turn utilization.
per_dir = []
total_allocated = 0
total_used = 0
for record in turn_utilization:
dir_rel = record["dir"]
allocated = record["turns_allocated"]
used = record["turns_used"]
total_allocated += allocated
total_used += used
pred = predicted.get(dir_rel, {})
entry = {
"dir": dir_rel,
"planned_tier": pred.get("tier", "default"),
"turns_allocated": allocated,
"turns_used": used,
"utilization": round(used / allocated, 2) if allocated else 0,
}
# Include completeness from turn utilization record (#74).
record_completeness = record.get("completeness")
if record_completeness is not None:
entry["completeness"] = record_completeness
# Read confidence from the cached dir entry if available.
dir_entry = cache.read_entry("dir", os.path.join(
cache.target, dir_rel,
))
if dir_entry:
entry["confidence"] = dir_entry.get("confidence")
per_dir.append(entry)
evaluation = {
"plan_order": (plan or {}).get("investigation_order", "leaf-first"),
"total_dirs_investigated": len(turn_utilization),
"total_turns_allocated": total_allocated,
"total_turns_used": total_used,
"overall_utilization": (
round(total_used / total_allocated, 2) if total_allocated else 0
),
"per_directory": per_dir,
"evaluated_at": _now_iso(),
}
try:
eval_path = os.path.join(cache.root, "plan_evaluation.json")
with open(eval_path, "w") as f:
json.dump(evaluation, f, indent=2)
print(
f" [AI] Plan evaluation: {total_used}/{total_allocated} turns used "
f"({evaluation['overall_utilization']:.0%} utilization)",
file=sys.stderr,
)
except OSError:
pass
def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False):
"""Run the final synthesis pass. Returns (brief, detailed)."""
dir_entries = cache.read_all_entries("dir")
summary_lines = []
for entry in dir_entries:
rel = entry.get("relative_path", "?")
summary = entry.get("summary", "(no summary)")
dominant = entry.get("dominant_category", "?")
notable = entry.get("notable_files", [])
summary_lines.append(f"### {rel}/")
summary_lines.append(f"Category: {dominant}")
summary_lines.append(f"Summary: {summary}")
if notable:
summary_lines.append(f"Notable files: {', '.join(notable)}")
summary_lines.append("")
summaries_text = "\n".join(summary_lines) if summary_lines else "(none)"
system = _SYNTHESIS_SYSTEM_PROMPT.format(
target=target,
summaries_text=summaries_text,
)
messages = [
{
"role": "user",
"content": (
"All directory summaries are in the system prompt above. "
"Synthesize them into a cohesive report and call "
"submit_report immediately — no other tool calls needed."
),
},
]
brief, detailed = "", ""
for turn in range(max_turns):
try:
content_blocks, usage = _call_api_streaming(
client, system, messages, _SYNTHESIS_TOOLS, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
break
# Print text blocks to stderr
for b in content_blocks:
if b.type == "text" and b.text.strip():
for line in b.text.strip().split("\n"):
print(f" [AI] {line}", file=sys.stderr)
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items() if k != "data"
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_report with your analysis.",
})
continue
tool_results = []
done = False
for tu in tool_uses:
if tu.name == "submit_report":
brief = tu.input.get("brief", "")
detailed = tu.input.get("detailed", "")
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Report submitted. Thank you.",
})
done = True
else:
result_text = _execute_tool(
tu.name, tu.input, target, cache, "(synthesis)",
turn + 1, verbose=verbose,
)
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": result_text,
})
messages.append({"role": "user", "content": tool_results})
if done:
break
else:
print(" [AI] Warning: synthesis ran out of turns.", file=sys.stderr)
brief, detailed = _synthesize_from_cache(cache)
return brief, detailed
def _synthesize_from_cache(cache):
"""Build a best-effort report from cached directory summaries."""
dir_entries = cache.read_all_entries("dir")
if not dir_entries:
return ("(AI analysis incomplete — no data was cached)", "")
brief_parts = []
detail_parts = []
for entry in dir_entries:
rel = entry.get("relative_path", "?")
summary = entry.get("summary", "")
if summary:
detail_parts.append(f"**{rel}/**: {summary}")
brief_parts.append(summary)
brief = brief_parts[0] if brief_parts else "(AI analysis incomplete)"
detailed = "\n\n".join(detail_parts) if detail_parts else ""
return brief, detailed
# ---------------------------------------------------------------------------
# Main orchestrator
# ---------------------------------------------------------------------------
def _run_investigation(client, target, report, show_hidden=False,
fresh=False, verbose=False, exclude=None):
"""Orchestrate the multi-pass investigation. Returns (brief, detailed, flags)."""
investigation_id, is_new = _get_investigation_id(target, fresh=fresh)
cache = _CacheManager(investigation_id, target)
tracker = _TokenTracker()
if is_new:
cache.write_meta(MODEL, _now_iso())
print(f" [AI] Investigation ID: {investigation_id}"
f"{'' if is_new else ' (resumed)'}", file=sys.stderr)
print(f" [AI] Cache: {cache.root}/", file=sys.stderr)
all_dirs = _discover_directories(target, show_hidden=show_hidden,
exclude=exclude)
total_files = sum((report.get("file_categories") or {}).values())
total_dirs = len(all_dirs)
if total_files < _SURVEY_MIN_FILES and total_dirs < _SURVEY_MIN_DIRS:
print(
f" [AI] Survey skipped — {total_files} files, {total_dirs} dirs "
f"(below threshold).",
file=sys.stderr,
)
survey = _default_survey()
else:
print(" [AI] Survey pass...", file=sys.stderr)
survey = _run_survey(client, target, report, tracker, verbose=verbose)
if survey:
print(
f" [AI] Survey: {survey['description']} "
f"(confidence {survey['confidence']:.2f})",
file=sys.stderr,
)
if survey.get("domain_notes"):
print(f" [AI] Survey notes: {survey['domain_notes']}", file=sys.stderr)
if survey.get("relevant_tools"):
print(
f" [AI] Survey relevant_tools: {', '.join(survey['relevant_tools'])}",
file=sys.stderr,
)
if survey.get("skip_tools"):
print(
f" [AI] Survey skip_tools: {', '.join(survey['skip_tools'])}",
file=sys.stderr,
)
else:
print(" [AI] Survey unavailable — proceeding without it.", file=sys.stderr)
to_investigate = []
cached_dirs = []
for d in all_dirs:
if cache.has_entry("dir", d):
cached_dirs.append(d)
rel = os.path.relpath(d, target)
print(f" [AI] Skipping (cached): {rel}/", file=sys.stderr)
else:
to_investigate.append(d)
cached_count = len(cached_dirs)
if cached_count:
print(f" [AI] Directories cached: {cached_count}", file=sys.stderr)
print(f" [AI] Directories to investigate: {len(to_investigate)}",
file=sys.stderr)
# Planning pass: decide where to invest depth.
if total_files < _SURVEY_MIN_FILES and total_dirs < _SURVEY_MIN_DIRS:
print(" [AI] Planning skipped (small target).", file=sys.stderr)
plan = _default_plan()
else:
plan_path = os.path.join(cache.root, "plan.json")
if not fresh and os.path.exists(plan_path):
try:
with open(plan_path) as f:
plan = json.load(f)
print(" [AI] Plan loaded from cache.", file=sys.stderr)
except (OSError, json.JSONDecodeError):
plan = None
else:
plan = None
if plan is None:
print(" [AI] Planning pass...", file=sys.stderr)
plan = _run_planning(
client, target, survey, report, all_dirs, tracker,
cached_dirs=cached_dirs, verbose=verbose,
)
if plan is None:
print(" [AI] Planning failed, using defaults.",
file=sys.stderr)
plan = _default_plan()
else:
# Save plan to cache (#11).
try:
with open(os.path.join(cache.root, "plan.json"), "w") as f:
json.dump(plan, f, indent=2)
except OSError:
pass
ordered, turn_map = _apply_plan(all_dirs, to_investigate, plan, target)
# Log plan summary.
skip_count = len(to_investigate) - len(ordered)
priority_count = sum(
1 for d in ordered if turn_map.get(d, _DEFAULT_TURNS) > _DEFAULT_TURNS
)
if skip_count or priority_count:
print(
f" [AI] Plan: {priority_count} priority, "
f"{skip_count} skipped, "
f"{len(ordered) - priority_count} default/shallow",
file=sys.stderr,
)
if plan.get("notes"):
print(f" [AI] Plan notes: {plan['notes']}", file=sys.stderr)
total = len(ordered)
turn_utilization = []
for i, dir_path in enumerate(ordered, 1):
dir_rel = os.path.relpath(dir_path, target)
if dir_rel == ".":
dir_rel = os.path.basename(target)
max_turns = turn_map.get(dir_path, _DEFAULT_TURNS)
print(
f" [AI] Investigating: {dir_rel}/ ({i}/{total}, "
f"{max_turns} turns)",
file=sys.stderr,
)
tracker.reset_loop()
summary, completeness = _run_dir_loop(
client, target, cache, tracker, dir_path,
max_turns=max_turns, verbose=verbose, survey=survey,
)
# Track turn utilization for quality metrics (#74).
turns_used = tracker._loop_turns
turn_utilization.append({
"dir": dir_rel,
"turns_allocated": max_turns,
"turns_used": turns_used,
"completeness": completeness,
})
if summary and not cache.has_entry("dir", dir_path):
entry = {
"path": dir_path,
"relative_path": os.path.relpath(dir_path, target),
"child_count": len([
n for n in os.listdir(dir_path)
if not n.startswith(".")
]) if os.path.isdir(dir_path) else 0,
"summary": summary,
"dominant_category": "unknown",
"notable_files": [],
"cached_at": _now_iso(),
}
if completeness is not None:
entry["completeness"] = completeness
cache.write_entry("dir", dir_path, entry)
cache.update_meta(
directories_investigated=total + cached_count,
end_time=_now_iso(),
)
# Emit plan evaluation (#74).
_write_plan_evaluation(cache, plan, turn_utilization)
print(" [AI] Synthesis pass...", file=sys.stderr)
brief, detailed = _run_synthesis(
client, target, cache, tracker, verbose=verbose,
)
# Read flags from flags.jsonl
flags = []
flags_path = os.path.join(cache.root, "flags.jsonl")
try:
with open(flags_path) as f:
for line in f:
line = line.strip()
if line:
flags.append(json.loads(line))
except (OSError, json.JSONDecodeError):
pass
print(f" [AI] Total tokens used: {tracker.summary()}", file=sys.stderr)
return brief, detailed, flags
# ---------------------------------------------------------------------------
# Public interface
# ---------------------------------------------------------------------------
def analyze_directory(report, target, verbose_tools=False, fresh=False,
exclude=None):
"""Run AI analysis on the directory. Returns (brief, detailed, flags).
Returns ("", "", []) if the API key is missing.
"""
api_key = _get_api_key()
if not api_key:
return "", "", []
print(" [AI] Starting multi-pass investigation...", file=sys.stderr)
client = anthropic.Anthropic(api_key=api_key)
try:
brief, detailed, flags = _run_investigation(
client, target, report, fresh=fresh, verbose=verbose_tools,
exclude=exclude,
)
except Exception as e:
print(f"Warning: AI analysis failed: {e}", file=sys.stderr)
return "", "", []
if not brief and not detailed:
print(" [AI] Warning: agent produced no output.", file=sys.stderr)
print(" [AI] Investigation complete.", file=sys.stderr)
return brief, detailed, flags