luminos/luminos_lib/ai.py
Jeff Smith fecb24d6e1 feat(ai): add _run_survey() and submit_survey tool (#5)
Adds the reconnaissance survey pass: a fast, ≤3-turn LLM call that
characterizes the target before any directory investigation begins.
The survey receives the file-type distribution (from the base scan),
a top-2-level tree preview, and the list of available dir-loop tools,
and returns description / approach / relevant_tools / skip_tools /
domain_notes / confidence via a single submit_survey tool call.

Wired into _run_investigation() before the directory loop. Output is
logged but not yet consumed — that wiring is #6. Survey failure is
non-fatal: if the call errors or runs out of turns, the investigation
proceeds without survey context.

Also adds a Band-Aid to _SURVEY_SYSTEM_PROMPT warning the LLM that
the file-type histogram is biased toward source code (the underlying
classifier has no concept of mail, notebooks, ledgers, etc.) and to
trust the tree preview when they conflict. The proper fix is #42.
2026-04-06 21:49:59 -06:00

1309 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""AI-powered directory analysis using a multi-pass, cache-driven agent loop.
Architecture:
1. Discover all directories under the target
2. Sort leaves-first (deepest directories first)
3. Run an isolated agent loop per directory (max 10 turns each)
4. Cache every file and directory summary to disk
5. Run a final synthesis pass reading only directory cache entries
Uses the Anthropic SDK for streaming, automatic retries, and token counting.
Uses tree-sitter for AST parsing and python-magic for file classification.
"""
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
import anthropic
import magic
from luminos_lib.ast_parser import parse_structure
from luminos_lib.cache import _CacheManager, _get_investigation_id
from luminos_lib.capabilities import check_ai_dependencies
from luminos_lib.prompts import (
_DIR_SYSTEM_PROMPT,
_SURVEY_SYSTEM_PROMPT,
_SYNTHESIS_SYSTEM_PROMPT,
)
from luminos_lib.tree import build_tree, render_tree
MODEL = "claude-sonnet-4-20250514"
# Context budget: trigger early exit at 70% of Sonnet's context window.
MAX_CONTEXT = 180_000
CONTEXT_BUDGET = int(MAX_CONTEXT * 0.70)
# Pricing per 1M tokens (Claude Sonnet).
INPUT_PRICE_PER_M = 3.00
OUTPUT_PRICE_PER_M = 15.00
# Directories to always skip during investigation.
_SKIP_DIRS = {
".git", "__pycache__", "node_modules", ".tox", ".mypy_cache",
".pytest_cache", ".venv", "venv", ".env", "dist", "build",
".eggs", "*.egg-info", ".svn", ".hg",
}
# Commands the run_command tool is allowed to execute.
_COMMAND_WHITELIST = {"wc", "file", "grep", "head", "tail", "stat", "du", "find"}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _get_api_key():
"""Read the Anthropic API key from the environment."""
key = os.environ.get("ANTHROPIC_API_KEY", "")
if not key:
print("Warning: ANTHROPIC_API_KEY not set. Skipping AI analysis.",
file=sys.stderr)
return key
def _path_is_safe(path, target):
"""Return True if *path* resolves to somewhere inside *target*."""
real = os.path.realpath(path)
target_real = os.path.realpath(target)
return real == target_real or real.startswith(target_real + os.sep)
def _now_iso():
return datetime.now(timezone.utc).isoformat()
def _should_skip_dir(name):
"""Return True if a directory name matches the skip list."""
if name in _SKIP_DIRS:
return True
for pattern in _SKIP_DIRS:
if pattern.startswith("*") and name.endswith(pattern[1:]):
return True
return False
# ---------------------------------------------------------------------------
# Token tracker
# ---------------------------------------------------------------------------
class _TokenTracker:
"""Track cumulative token usage across API calls."""
def __init__(self):
self.total_input = 0
self.total_output = 0
self.loop_input = 0
self.loop_output = 0
def record(self, usage):
"""Record usage from a single API call."""
inp = getattr(usage, "input_tokens", 0)
out = getattr(usage, "output_tokens", 0)
self.total_input += inp
self.total_output += out
self.loop_input += inp
self.loop_output += out
def reset_loop(self):
"""Reset per-loop counters (called between directory loops)."""
self.loop_input = 0
self.loop_output = 0
@property
def loop_total(self):
return self.loop_input + self.loop_output
def budget_exceeded(self):
return self.loop_total > CONTEXT_BUDGET
def summary(self):
cost_in = self.total_input * INPUT_PRICE_PER_M / 1_000_000
cost_out = self.total_output * OUTPUT_PRICE_PER_M / 1_000_000
cost = cost_in + cost_out
return (f"{self.total_input:,} input / {self.total_output:,} output "
f"(approx ${cost:.2f})")
# ---------------------------------------------------------------------------
# Tool definitions
# ---------------------------------------------------------------------------
_DIR_TOOLS = [
{
"name": "read_file",
"description": (
"Read and return the contents of a file. Path must be inside "
"the target directory."
),
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative path to the file.",
},
"max_bytes": {
"type": "integer",
"description": "Maximum bytes to read (default 4096).",
},
},
"required": ["path"],
},
},
{
"name": "list_directory",
"description": (
"List the contents of a directory with file sizes and types."
),
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative path to the directory.",
},
"show_hidden": {
"type": "boolean",
"description": "Include hidden files (default false).",
},
},
"required": ["path"],
},
},
{
"name": "run_command",
"description": (
"Run a read-only shell command. Allowed binaries: "
"wc, file, grep, head, tail, stat, du, find."
),
"input_schema": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute.",
},
},
"required": ["command"],
},
},
{
"name": "parse_structure",
"description": (
"Parse a source file using tree-sitter and return its structural "
"skeleton: functions, classes, imports, and code metrics. "
"Supported: Python, JavaScript, TypeScript, Rust, Go."
),
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the source file to parse.",
},
},
"required": ["path"],
},
},
{
"name": "write_cache",
"description": (
"Write a summary cache entry for a file or directory. The data "
"must NOT contain raw file contents — summaries only."
),
"input_schema": {
"type": "object",
"properties": {
"cache_type": {
"type": "string",
"enum": ["file", "dir"],
"description": "'file' or 'dir'.",
},
"path": {
"type": "string",
"description": "The path being cached.",
},
"data": {
"type": "object",
"description": (
"Cache entry. Files: {path, relative_path, size_bytes, "
"category, summary, notable, notable_reason, cached_at}. "
"Dirs: {path, relative_path, child_count, summary, "
"dominant_category, notable_files, cached_at}."
),
},
},
"required": ["cache_type", "path", "data"],
},
},
{
"name": "think",
"description": (
"Record your reasoning before choosing which file or directory "
"to investigate next. Call this when deciding what to look at "
"— not before every individual tool call."
),
"input_schema": {
"type": "object",
"properties": {
"observation": {
"type": "string",
"description": "What you have observed so far.",
},
"hypothesis": {
"type": "string",
"description": "Your hypothesis about the directory.",
},
"next_action": {
"type": "string",
"description": "What you plan to investigate next and why.",
},
},
"required": ["observation", "hypothesis", "next_action"],
},
},
{
"name": "checkpoint",
"description": (
"Summarize what you have learned so far about this directory "
"and what you still need to determine. Call this after completing "
"a significant cluster of files — not after every file."
),
"input_schema": {
"type": "object",
"properties": {
"learned": {
"type": "string",
"description": "What you have learned so far.",
},
"still_unknown": {
"type": "string",
"description": "What you still need to determine.",
},
"next_phase": {
"type": "string",
"description": "What you will investigate next.",
},
},
"required": ["learned", "still_unknown", "next_phase"],
},
},
{
"name": "flag",
"description": (
"Mark a file, directory, or finding as notable or anomalous. "
"Call this immediately when you discover something surprising, "
"concerning, or important — do not save it for the report."
),
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative path, or 'general'.",
},
"finding": {
"type": "string",
"description": "What you found.",
},
"severity": {
"type": "string",
"enum": ["info", "concern", "critical"],
"description": "info | concern | critical",
},
},
"required": ["path", "finding", "severity"],
},
},
{
"name": "submit_report",
"description": (
"Submit the directory summary. This ends the investigation loop."
),
"input_schema": {
"type": "object",
"properties": {
"summary": {
"type": "string",
"description": "1-3 sentence summary of the directory.",
},
},
"required": ["summary"],
},
},
]
_SURVEY_TOOLS = [
{
"name": "submit_survey",
"description": (
"Submit the reconnaissance survey. Call exactly once."
),
"input_schema": {
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "Plain-language description of the target.",
},
"approach": {
"type": "string",
"description": "Recommended analytical approach.",
},
"relevant_tools": {
"type": "array",
"items": {"type": "string"},
"description": "Tool names the dir loop should lean on.",
},
"skip_tools": {
"type": "array",
"items": {"type": "string"},
"description": "Tool names whose use would be wrong here.",
},
"domain_notes": {
"type": "string",
"description": "Short actionable hint, or empty string.",
},
"confidence": {
"type": "number",
"description": "0.01.0 confidence in this survey.",
},
},
"required": [
"description", "approach", "relevant_tools",
"skip_tools", "domain_notes", "confidence",
],
},
},
]
_SYNTHESIS_TOOLS = [
{
"name": "read_cache",
"description": "Read a previously cached summary for a file or directory.",
"input_schema": {
"type": "object",
"properties": {
"cache_type": {
"type": "string",
"enum": ["file", "dir"],
},
"path": {
"type": "string",
"description": "The path to look up.",
},
},
"required": ["cache_type", "path"],
},
},
{
"name": "list_cache",
"description": "List all cached entry paths of a given type.",
"input_schema": {
"type": "object",
"properties": {
"cache_type": {
"type": "string",
"enum": ["file", "dir"],
},
},
"required": ["cache_type"],
},
},
{
"name": "flag",
"description": (
"Mark a file, directory, or finding as notable or anomalous. "
"Call this immediately when you discover something surprising, "
"concerning, or important — do not save it for the report."
),
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative path, or 'general'.",
},
"finding": {
"type": "string",
"description": "What you found.",
},
"severity": {
"type": "string",
"enum": ["info", "concern", "critical"],
"description": "info | concern | critical",
},
},
"required": ["path", "finding", "severity"],
},
},
{
"name": "submit_report",
"description": "Submit the final analysis report.",
"input_schema": {
"type": "object",
"properties": {
"brief": {
"type": "string",
"description": "2-4 sentence summary.",
},
"detailed": {
"type": "string",
"description": "Thorough breakdown.",
},
},
"required": ["brief", "detailed"],
},
},
]
# ---------------------------------------------------------------------------
# Tool implementations
# ---------------------------------------------------------------------------
def _tool_read_file(args, target, _cache):
path = args.get("path", "")
max_bytes = args.get("max_bytes", 4096)
if not os.path.isabs(path):
path = os.path.join(target, path)
if not _path_is_safe(path, target):
return f"Error: path '{path}' is outside the target directory."
try:
file_size = os.path.getsize(path)
with open(path, "r", errors="replace") as f:
content = f.read(max_bytes)
if not content:
return "(empty file)"
if file_size > max_bytes:
content += (
f"\n\n[TRUNCATED — showed {max_bytes} of {file_size} bytes. "
f"Call again with a larger max_bytes or use "
f"run_command('tail -n ... {os.path.relpath(path, target)}') "
f"to see the rest.]"
)
return content
except OSError as e:
return f"Error reading file: {e}"
def _tool_list_directory(args, target, _cache):
path = args.get("path", target)
show_hidden = args.get("show_hidden", False)
if not os.path.isabs(path):
path = os.path.join(target, path)
if not _path_is_safe(path, target):
return f"Error: path '{path}' is outside the target directory."
if not os.path.isdir(path):
return f"Error: '{path}' is not a directory."
try:
entries = sorted(os.listdir(path))
lines = []
for name in entries:
if not show_hidden and name.startswith("."):
continue
full = os.path.join(path, name)
try:
st = os.stat(full)
mime = magic.from_file(full, mime=True) if not os.path.isdir(full) else None
if os.path.isdir(full):
lines.append(f" {name}/ (dir)")
else:
mime_str = f" [{mime}]" if mime else ""
lines.append(f" {name} ({st.st_size} bytes){mime_str}")
except OSError:
lines.append(f" {name} (stat failed)")
return "\n".join(lines) if lines else "(empty directory)"
except OSError as e:
return f"Error listing directory: {e}"
def _tool_run_command(args, target, _cache):
command = args.get("command", "")
parts = command.split()
if not parts:
return "Error: empty command."
binary = os.path.basename(parts[0])
if binary not in _COMMAND_WHITELIST:
return (
f"Error: '{binary}' is not allowed. "
f"Whitelist: {', '.join(sorted(_COMMAND_WHITELIST))}"
)
try:
result = subprocess.run(
command, shell=True, capture_output=True, text=True,
timeout=15, cwd=target,
)
output = result.stdout
if result.returncode != 0 and result.stderr:
output += f"\n(stderr: {result.stderr.strip()})"
return output.strip() if output.strip() else "(no output)"
except subprocess.TimeoutExpired:
return "Error: command timed out after 15 seconds."
except OSError as e:
return f"Error running command: {e}"
def _tool_parse_structure(args, target, _cache):
path = args.get("path", "")
if not os.path.isabs(path):
path = os.path.join(target, path)
if not _path_is_safe(path, target):
return f"Error: path '{path}' is outside the target directory."
return parse_structure(path)
def _tool_write_cache(args, _target, cache):
cache_type = args.get("cache_type", "")
path = args.get("path", "")
data = args.get("data", {})
if cache_type not in ("file", "dir"):
return "Error: cache_type must be 'file' or 'dir'."
return cache.write_entry(cache_type, path, data)
def _tool_read_cache(args, _target, cache):
cache_type = args.get("cache_type", "")
path = args.get("path", "")
if cache_type not in ("file", "dir"):
return "Error: cache_type must be 'file' or 'dir'."
entry = cache.read_entry(cache_type, path)
if entry is None:
return "null"
return json.dumps(entry, indent=2)
def _tool_list_cache(args, _target, cache):
cache_type = args.get("cache_type", "")
if cache_type not in ("file", "dir"):
return "Error: cache_type must be 'file' or 'dir'."
paths = cache.list_entries(cache_type)
if not paths:
return "(no cached entries)"
return "\n".join(paths)
def _tool_think(args, _target, _cache):
obs = args.get("observation", "")
hyp = args.get("hypothesis", "")
nxt = args.get("next_action", "")
print(f" [AI] THINK", file=sys.stderr)
print(f" observation: {obs}", file=sys.stderr)
print(f" hypothesis: {hyp}", file=sys.stderr)
print(f" next_action: {nxt}", file=sys.stderr)
return "ok"
def _tool_checkpoint(args, _target, _cache):
learned = args.get("learned", "")
unknown = args.get("still_unknown", "")
phase = args.get("next_phase", "")
print(f" [AI] CHECKPOINT", file=sys.stderr)
print(f" learned: {learned}", file=sys.stderr)
print(f" still_unknown: {unknown}", file=sys.stderr)
print(f" next_phase: {phase}", file=sys.stderr)
return "ok"
def _tool_flag(args, _target, cache):
path = args.get("path", "general")
finding = args.get("finding", "")
severity = args.get("severity", "info")
print(f" [AI] FLAG [{severity.upper()}] {path}", file=sys.stderr)
print(f" {finding}", file=sys.stderr)
flags_path = os.path.join(cache.root, "flags.jsonl")
entry = {"path": path, "finding": finding, "severity": severity}
try:
with open(flags_path, "a") as f:
f.write(json.dumps(entry) + "\n")
except OSError:
pass
return "ok"
_TOOL_DISPATCH = {
"read_file": _tool_read_file,
"list_directory": _tool_list_directory,
"run_command": _tool_run_command,
"parse_structure": _tool_parse_structure,
"write_cache": _tool_write_cache,
"read_cache": _tool_read_cache,
"list_cache": _tool_list_cache,
"think": _tool_think,
"checkpoint": _tool_checkpoint,
"flag": _tool_flag,
}
def _execute_tool(name, args, target, cache, dir_rel, turn, verbose=False):
"""Execute a tool by name and return the result string."""
handler = _TOOL_DISPATCH.get(name)
if handler is None:
return f"Error: unknown tool '{name}'."
result = handler(args, target, cache)
cache.log_turn(dir_rel, turn, name,
{k: v for k, v in args.items() if k != "data"},
len(result))
if verbose:
preview = result[:200] + "..." if len(result) > 200 else result
print(f" [AI] <- {len(result)} chars: {preview}", file=sys.stderr)
return result
# ---------------------------------------------------------------------------
# Streaming API caller
# ---------------------------------------------------------------------------
def _call_api_streaming(client, system, messages, tools, tracker):
"""Call Claude via streaming. Print tool decisions in real-time.
Returns (content_blocks, usage) where content_blocks is the list of
content blocks from the response.
"""
with client.messages.stream(
model=MODEL,
max_tokens=4096,
system=system,
messages=messages,
tools=tools,
) as stream:
# Print tool call names as they arrive
current_tool = None
for event in stream:
if event.type == "content_block_start":
block = event.content_block
if block.type == "tool_use":
current_tool = block.name
# We'll print the full args after the block is complete
elif event.type == "content_block_stop":
current_tool = None
response = stream.get_final_message()
tracker.record(response.usage)
return response.content, response.usage
# ---------------------------------------------------------------------------
# Directory discovery
# ---------------------------------------------------------------------------
def _discover_directories(target, show_hidden=False, exclude=None):
"""Walk the target and return all directories sorted leaves-first."""
extra = set(exclude or [])
dirs = []
target_real = os.path.realpath(target)
for root, subdirs, _files in os.walk(target_real, topdown=True):
subdirs[:] = [
d for d in subdirs
if not _should_skip_dir(d)
and d not in extra
and (show_hidden or not d.startswith("."))
]
dirs.append(root)
dirs.sort(key=lambda d: (-d.count(os.sep), d))
return dirs
# ---------------------------------------------------------------------------
# Per-directory agent loop
# ---------------------------------------------------------------------------
def _build_dir_context(dir_path):
lines = []
try:
entries = sorted(os.listdir(dir_path))
for name in entries:
if name.startswith("."):
continue
full = os.path.join(dir_path, name)
try:
st = os.stat(full)
if os.path.isdir(full):
lines.append(f" {name}/ (dir)")
else:
mime = magic.from_file(full, mime=True)
lines.append(f" {name} ({st.st_size} bytes) [{mime}]")
except OSError:
lines.append(f" {name} (stat failed)")
except OSError:
lines.append(" (could not list directory)")
return "Directory contents:\n" + "\n".join(lines) if lines else "(empty)"
def _get_child_summaries(dir_path, cache):
parts = []
try:
for name in sorted(os.listdir(dir_path)):
child = os.path.join(dir_path, name)
if not os.path.isdir(child):
continue
entry = cache.read_entry("dir", child)
if entry:
rel = entry.get("relative_path", name)
summary = entry.get("summary", "(no summary)")
parts.append(f"- {rel}/: {summary}")
except OSError:
pass
return "\n".join(parts) if parts else "(none — this is a leaf directory)"
def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14,
verbose=False):
"""Run an isolated agent loop for a single directory."""
dir_rel = os.path.relpath(dir_path, target)
if dir_rel == ".":
dir_rel = os.path.basename(target)
context = _build_dir_context(dir_path)
child_summaries = _get_child_summaries(dir_path, cache)
system = _DIR_SYSTEM_PROMPT.format(
dir_path=dir_path,
dir_rel=dir_rel,
max_turns=max_turns,
context=context,
child_summaries=child_summaries,
)
messages = [
{
"role": "user",
"content": (
"Investigate this directory now. Use parse_structure for "
"source files, read_file for others, cache summaries, and "
"call submit_report. Batch tool calls for efficiency."
),
},
]
tracker.reset_loop()
summary = None
for turn in range(max_turns):
# Check context budget
if tracker.budget_exceeded():
print(f" [AI] Context budget reached — exiting early "
f"({tracker.loop_total:,} tokens used)", file=sys.stderr)
# Flush a partial directory summary from cached file entries
if not cache.has_entry("dir", dir_path):
dir_real = os.path.realpath(dir_path)
file_entries = [
e for e in cache.read_all_entries("file")
if os.path.realpath(e.get("path", "")).startswith(
dir_real + os.sep)
or os.path.dirname(
os.path.join(target, e.get("relative_path", ""))
) == dir_real
]
if file_entries:
file_summaries = [
e["summary"] for e in file_entries if e.get("summary")
]
notable = [
e.get("relative_path", e.get("path", ""))
for e in file_entries if e.get("notable")
]
partial_summary = " ".join(file_summaries)
cache.write_entry("dir", dir_path, {
"path": dir_path,
"relative_path": os.path.relpath(dir_path, target),
"child_count": len([
n for n in os.listdir(dir_path)
if not n.startswith(".")
]) if os.path.isdir(dir_path) else 0,
"summary": partial_summary,
"dominant_category": "unknown",
"notable_files": notable,
"partial": True,
"partial_reason": "context budget reached",
"cached_at": _now_iso(),
})
if not summary:
summary = partial_summary
else:
cache.write_entry("dir", dir_path, {
"path": dir_path,
"relative_path": os.path.relpath(dir_path, target),
"child_count": 0,
"summary": ("Investigation incomplete — context budget "
"reached before any files were processed."),
"dominant_category": "unknown",
"notable_files": [],
"partial": True,
"partial_reason": (
"context budget reached before files processed"),
"cached_at": _now_iso(),
})
break
try:
content_blocks, usage = _call_api_streaming(
client, system, messages, _DIR_TOOLS, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
break
# Print text blocks (step numbering, reasoning) to stderr
for b in content_blocks:
if b.type == "text" and b.text.strip():
for line in b.text.strip().split("\n"):
print(f" [AI] {line}", file=sys.stderr)
# Print tool decisions now that we have the full response
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items() if k != "data"
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_report with your summary.",
})
continue
tool_results = []
done = False
for tu in tool_uses:
if tu.name == "submit_report":
summary = tu.input.get("summary", "")
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Summary submitted.",
})
done = True
else:
result_text = _execute_tool(
tu.name, tu.input, target, cache, dir_rel,
turn + 1, verbose=verbose,
)
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": result_text,
})
messages.append({"role": "user", "content": tool_results})
if done:
break
else:
print(f" [AI] Warning: max turns reached for {dir_rel}",
file=sys.stderr)
return summary
def _block_to_dict(block):
"""Convert an SDK content block to a plain dict for message history."""
if block.type == "text":
return {"type": "text", "text": block.text}
elif block.type == "tool_use":
return {"type": "tool_use", "id": block.id,
"name": block.name, "input": block.input}
return {"type": block.type}
# ---------------------------------------------------------------------------
# Synthesis pass
# ---------------------------------------------------------------------------
def _run_survey(client, target, report, tracker, max_turns=3, verbose=False):
"""Run the reconnaissance survey pass.
Returns a survey dict on success, or None on failure / out-of-turns.
Survey is advisory — callers must treat None as "no survey context".
"""
categories = report.get("file_categories", {}) or {}
if categories:
ftd_lines = [
f" {cat}: {n}"
for cat, n in sorted(categories.items(), key=lambda kv: -kv[1])
]
file_type_distribution = "\n".join(ftd_lines)
else:
file_type_distribution = " (no files classified)"
try:
tree_node = build_tree(target, max_depth=2)
tree_preview = render_tree(tree_node)
except Exception:
tree_preview = "(tree unavailable)"
tool_names = [t["name"] for t in _DIR_TOOLS if t["name"] != "submit_report"]
available_tools = ", ".join(tool_names)
system = _SURVEY_SYSTEM_PROMPT.format(
target=target,
file_type_distribution=file_type_distribution,
tree_preview=tree_preview,
available_tools=available_tools,
)
messages = [
{
"role": "user",
"content": (
"All inputs are in the system prompt above. Call "
"submit_survey now — no other tool calls needed."
),
},
]
survey = None
for turn in range(max_turns):
try:
content_blocks, _usage = _call_api_streaming(
client, system, messages, _SURVEY_TOOLS, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
return None
for b in content_blocks:
if b.type == "text" and b.text.strip():
for line in b.text.strip().split("\n"):
print(f" [AI] {line}", file=sys.stderr)
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items()
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_survey.",
})
continue
tool_results = []
done = False
for tu in tool_uses:
if tu.name == "submit_survey":
survey = {
"description": tu.input.get("description", ""),
"approach": tu.input.get("approach", ""),
"relevant_tools": tu.input.get("relevant_tools", []) or [],
"skip_tools": tu.input.get("skip_tools", []) or [],
"domain_notes": tu.input.get("domain_notes", ""),
"confidence": float(tu.input.get("confidence", 0.0) or 0.0),
}
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Survey received. Thank you.",
})
done = True
else:
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Unknown tool. Call submit_survey.",
"is_error": True,
})
messages.append({"role": "user", "content": tool_results})
if done:
break
else:
print(" [AI] Warning: survey ran out of turns.", file=sys.stderr)
return survey
def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False):
"""Run the final synthesis pass. Returns (brief, detailed)."""
dir_entries = cache.read_all_entries("dir")
summary_lines = []
for entry in dir_entries:
rel = entry.get("relative_path", "?")
summary = entry.get("summary", "(no summary)")
dominant = entry.get("dominant_category", "?")
notable = entry.get("notable_files", [])
summary_lines.append(f"### {rel}/")
summary_lines.append(f"Category: {dominant}")
summary_lines.append(f"Summary: {summary}")
if notable:
summary_lines.append(f"Notable files: {', '.join(notable)}")
summary_lines.append("")
summaries_text = "\n".join(summary_lines) if summary_lines else "(none)"
system = _SYNTHESIS_SYSTEM_PROMPT.format(
target=target,
summaries_text=summaries_text,
)
messages = [
{
"role": "user",
"content": (
"All directory summaries are in the system prompt above. "
"Synthesize them into a cohesive report and call "
"submit_report immediately — no other tool calls needed."
),
},
]
brief, detailed = "", ""
for turn in range(max_turns):
try:
content_blocks, usage = _call_api_streaming(
client, system, messages, _SYNTHESIS_TOOLS, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
break
# Print text blocks to stderr
for b in content_blocks:
if b.type == "text" and b.text.strip():
for line in b.text.strip().split("\n"):
print(f" [AI] {line}", file=sys.stderr)
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items() if k != "data"
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_report with your analysis.",
})
continue
tool_results = []
done = False
for tu in tool_uses:
if tu.name == "submit_report":
brief = tu.input.get("brief", "")
detailed = tu.input.get("detailed", "")
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Report submitted. Thank you.",
})
done = True
else:
result_text = _execute_tool(
tu.name, tu.input, target, cache, "(synthesis)",
turn + 1, verbose=verbose,
)
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": result_text,
})
messages.append({"role": "user", "content": tool_results})
if done:
break
else:
print(" [AI] Warning: synthesis ran out of turns.", file=sys.stderr)
brief, detailed = _synthesize_from_cache(cache)
return brief, detailed
def _synthesize_from_cache(cache):
"""Build a best-effort report from cached directory summaries."""
dir_entries = cache.read_all_entries("dir")
if not dir_entries:
return ("(AI analysis incomplete — no data was cached)", "")
brief_parts = []
detail_parts = []
for entry in dir_entries:
rel = entry.get("relative_path", "?")
summary = entry.get("summary", "")
if summary:
detail_parts.append(f"**{rel}/**: {summary}")
brief_parts.append(summary)
brief = brief_parts[0] if brief_parts else "(AI analysis incomplete)"
detailed = "\n\n".join(detail_parts) if detail_parts else ""
return brief, detailed
# ---------------------------------------------------------------------------
# Main orchestrator
# ---------------------------------------------------------------------------
def _run_investigation(client, target, report, show_hidden=False,
fresh=False, verbose=False, exclude=None):
"""Orchestrate the multi-pass investigation. Returns (brief, detailed, flags)."""
investigation_id, is_new = _get_investigation_id(target, fresh=fresh)
cache = _CacheManager(investigation_id, target)
tracker = _TokenTracker()
if is_new:
cache.write_meta(MODEL, _now_iso())
print(f" [AI] Investigation ID: {investigation_id}"
f"{'' if is_new else ' (resumed)'}", file=sys.stderr)
print(f" [AI] Cache: {cache.root}/", file=sys.stderr)
print(" [AI] Survey pass...", file=sys.stderr)
survey = _run_survey(client, target, report, tracker, verbose=verbose)
if survey:
print(
f" [AI] Survey: {survey['description']} "
f"(confidence {survey['confidence']:.2f})",
file=sys.stderr,
)
if survey.get("domain_notes"):
print(f" [AI] Survey notes: {survey['domain_notes']}", file=sys.stderr)
if survey.get("relevant_tools"):
print(
f" [AI] Survey relevant_tools: {', '.join(survey['relevant_tools'])}",
file=sys.stderr,
)
if survey.get("skip_tools"):
print(
f" [AI] Survey skip_tools: {', '.join(survey['skip_tools'])}",
file=sys.stderr,
)
else:
print(" [AI] Survey unavailable — proceeding without it.", file=sys.stderr)
all_dirs = _discover_directories(target, show_hidden=show_hidden,
exclude=exclude)
to_investigate = []
cached_count = 0
for d in all_dirs:
if cache.has_entry("dir", d):
cached_count += 1
rel = os.path.relpath(d, target)
print(f" [AI] Skipping (cached): {rel}/", file=sys.stderr)
else:
to_investigate.append(d)
total = len(to_investigate)
if cached_count:
print(f" [AI] Directories cached: {cached_count}", file=sys.stderr)
print(f" [AI] Directories to investigate: {total}", file=sys.stderr)
for i, dir_path in enumerate(to_investigate, 1):
dir_rel = os.path.relpath(dir_path, target)
if dir_rel == ".":
dir_rel = os.path.basename(target)
print(f" [AI] Investigating: {dir_rel}/ ({i}/{total})",
file=sys.stderr)
summary = _run_dir_loop(
client, target, cache, tracker, dir_path, verbose=verbose,
)
if summary and not cache.has_entry("dir", dir_path):
cache.write_entry("dir", dir_path, {
"path": dir_path,
"relative_path": os.path.relpath(dir_path, target),
"child_count": len([
n for n in os.listdir(dir_path)
if not n.startswith(".")
]) if os.path.isdir(dir_path) else 0,
"summary": summary,
"dominant_category": "unknown",
"notable_files": [],
"cached_at": _now_iso(),
})
cache.update_meta(
directories_investigated=total + cached_count,
end_time=_now_iso(),
)
print(" [AI] Synthesis pass...", file=sys.stderr)
brief, detailed = _run_synthesis(
client, target, cache, tracker, verbose=verbose,
)
# Read flags from flags.jsonl
flags = []
flags_path = os.path.join(cache.root, "flags.jsonl")
try:
with open(flags_path) as f:
for line in f:
line = line.strip()
if line:
flags.append(json.loads(line))
except (OSError, json.JSONDecodeError):
pass
print(f" [AI] Total tokens used: {tracker.summary()}", file=sys.stderr)
return brief, detailed, flags
# ---------------------------------------------------------------------------
# Public interface
# ---------------------------------------------------------------------------
def analyze_directory(report, target, verbose_tools=False, fresh=False,
exclude=None):
"""Run AI analysis on the directory. Returns (brief, detailed, flags).
Returns ("", "", []) if the API key is missing or dependencies are not met.
"""
if not check_ai_dependencies():
sys.exit(1)
api_key = _get_api_key()
if not api_key:
return "", "", []
print(" [AI] Starting multi-pass investigation...", file=sys.stderr)
client = anthropic.Anthropic(api_key=api_key)
try:
brief, detailed, flags = _run_investigation(
client, target, report, fresh=fresh, verbose=verbose_tools,
exclude=exclude,
)
except Exception as e:
print(f"Warning: AI analysis failed: {e}", file=sys.stderr)
return "", "", []
if not brief and not detailed:
print(" [AI] Warning: agent produced no output.", file=sys.stderr)
print(" [AI] Investigation complete.", file=sys.stderr)
return brief, detailed, flags