luminos/luminos_lib/ai.py
Jeff Smith 907dcf0a37 refactor: replace single-shot API with multi-pass agentic investigation
Rewrites ai.py from a single Claude API call into a multi-pass,
cache-driven agent architecture:

- Per-directory isolated agent loops (max 10 turns each) with context
  discarded between directories
- Leaves-first processing order so child summaries inform parents
- Disk cache (/tmp/luminos/{uuid}/) persists across runs for resumability
- Investigation ID persistence keyed by target realpath
- Separate synthesis pass reads only directory-level cache entries
- Replaces urllib with Anthropic SDK (streaming, automatic retries)
- Token counting with 70% context budget threshold for early exit
- parse_structure tool via tree-sitter (Python, JS, Rust, Go)
- python-magic integration for MIME-aware directory listings
- Cost tracking printed at end of investigation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 12:13:55 -06:00

1363 lines
46 KiB
Python

"""AI-powered directory analysis using a multi-pass, cache-driven agent loop.
Architecture:
1. Discover all directories under the target
2. Sort leaves-first (deepest directories first)
3. Run an isolated agent loop per directory (max 10 turns each)
4. Cache every file and directory summary to disk
5. Run a final synthesis pass reading only directory cache entries
Uses the Anthropic SDK for streaming, automatic retries, and token counting.
Uses tree-sitter for AST parsing and python-magic for file classification.
"""
import hashlib
import json
import os
import subprocess
import sys
import uuid
from datetime import datetime, timezone
import anthropic
import magic
import tree_sitter
import tree_sitter_python
import tree_sitter_javascript
import tree_sitter_rust
import tree_sitter_go
from luminos_lib.capabilities import check_ai_dependencies
MODEL = "claude-sonnet-4-20250514"
CACHE_ROOT = "/tmp/luminos"
INVESTIGATIONS_PATH = os.path.join(CACHE_ROOT, "investigations.json")
# Context budget: trigger early exit at 70% of Sonnet's context window.
MAX_CONTEXT = 180_000
CONTEXT_BUDGET = int(MAX_CONTEXT * 0.70)
# Pricing per 1M tokens (Claude Sonnet).
INPUT_PRICE_PER_M = 3.00
OUTPUT_PRICE_PER_M = 15.00
# Directories to always skip during investigation.
_SKIP_DIRS = {
".git", "__pycache__", "node_modules", ".tox", ".mypy_cache",
".pytest_cache", ".venv", "venv", ".env", "dist", "build",
".eggs", "*.egg-info", ".svn", ".hg",
}
# Commands the run_command tool is allowed to execute.
_COMMAND_WHITELIST = {"wc", "file", "grep", "head", "tail", "stat", "du", "find"}
# tree-sitter language registry: extension → (grammar_module, language_name)
_TS_LANGUAGES = {
".py": (tree_sitter_python, "python"),
".js": (tree_sitter_javascript, "javascript"),
".jsx": (tree_sitter_javascript, "javascript"),
".mjs": (tree_sitter_javascript, "javascript"),
".rs": (tree_sitter_rust, "rust"),
".go": (tree_sitter_go, "go"),
}
# Precompute Language objects once.
_TS_LANG_CACHE = {}
def _get_ts_parser(ext):
"""Return a (Parser, language_name) tuple for a file extension, or None."""
entry = _TS_LANGUAGES.get(ext)
if entry is None:
return None
module, lang_name = entry
if lang_name not in _TS_LANG_CACHE:
_TS_LANG_CACHE[lang_name] = tree_sitter.Language(module.language())
lang = _TS_LANG_CACHE[lang_name]
parser = tree_sitter.Parser(lang)
return parser, lang_name
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _get_api_key():
"""Read the Anthropic API key from the environment."""
key = os.environ.get("ANTHROPIC_API_KEY", "")
if not key:
print("Warning: ANTHROPIC_API_KEY not set. Skipping AI analysis.",
file=sys.stderr)
return key
def _path_is_safe(path, target):
"""Return True if *path* resolves to somewhere inside *target*."""
real = os.path.realpath(path)
target_real = os.path.realpath(target)
return real == target_real or real.startswith(target_real + os.sep)
def _sha256_path(path):
"""Return a hex SHA-256 of a path string, used as cache key."""
return hashlib.sha256(path.encode("utf-8")).hexdigest()
def _now_iso():
return datetime.now(timezone.utc).isoformat()
def _should_skip_dir(name):
"""Return True if a directory name matches the skip list."""
if name in _SKIP_DIRS:
return True
for pattern in _SKIP_DIRS:
if pattern.startswith("*") and name.endswith(pattern[1:]):
return True
return False
# ---------------------------------------------------------------------------
# Token tracker
# ---------------------------------------------------------------------------
class _TokenTracker:
"""Track cumulative token usage across API calls."""
def __init__(self):
self.total_input = 0
self.total_output = 0
self.loop_input = 0
self.loop_output = 0
def record(self, usage):
"""Record usage from a single API call."""
inp = getattr(usage, "input_tokens", 0)
out = getattr(usage, "output_tokens", 0)
self.total_input += inp
self.total_output += out
self.loop_input += inp
self.loop_output += out
def reset_loop(self):
"""Reset per-loop counters (called between directory loops)."""
self.loop_input = 0
self.loop_output = 0
@property
def loop_total(self):
return self.loop_input + self.loop_output
def budget_exceeded(self):
return self.loop_total > CONTEXT_BUDGET
def summary(self):
cost_in = self.total_input * INPUT_PRICE_PER_M / 1_000_000
cost_out = self.total_output * OUTPUT_PRICE_PER_M / 1_000_000
cost = cost_in + cost_out
return (f"{self.total_input:,} input / {self.total_output:,} output "
f"(approx ${cost:.2f})")
# ---------------------------------------------------------------------------
# Investigation ID persistence
# ---------------------------------------------------------------------------
def _load_investigations():
try:
with open(INVESTIGATIONS_PATH) as f:
return json.load(f)
except (OSError, json.JSONDecodeError):
return {}
def _save_investigations(data):
os.makedirs(CACHE_ROOT, exist_ok=True)
with open(INVESTIGATIONS_PATH, "w") as f:
json.dump(data, f, indent=2)
def _get_investigation_id(target, fresh=False):
target_real = os.path.realpath(target)
investigations = _load_investigations()
if not fresh and target_real in investigations:
inv_id = investigations[target_real]
cache_dir = os.path.join(CACHE_ROOT, inv_id)
if os.path.isdir(cache_dir):
return inv_id, False
inv_id = str(uuid.uuid4())
investigations[target_real] = inv_id
_save_investigations(investigations)
return inv_id, True
# ---------------------------------------------------------------------------
# Cache manager
# ---------------------------------------------------------------------------
class _CacheManager:
"""Manages the /tmp/luminos/{investigation_id}/ cache tree."""
def __init__(self, investigation_id, target):
self.investigation_id = investigation_id
self.target = os.path.realpath(target)
self.root = os.path.join(CACHE_ROOT, investigation_id)
self.files_dir = os.path.join(self.root, "files")
self.dirs_dir = os.path.join(self.root, "dirs")
self.log_path = os.path.join(self.root, "investigation.log")
self.meta_path = os.path.join(self.root, "meta.json")
os.makedirs(self.files_dir, exist_ok=True)
os.makedirs(self.dirs_dir, exist_ok=True)
def write_meta(self, model, start_time):
data = {
"investigation_id": self.investigation_id,
"target": self.target,
"start_time": start_time,
"model": model,
"directories_investigated": 0,
"total_turns": 0,
}
with open(self.meta_path, "w") as f:
json.dump(data, f, indent=2)
def update_meta(self, **kwargs):
try:
with open(self.meta_path) as f:
data = json.load(f)
except (OSError, json.JSONDecodeError):
data = {}
data.update(kwargs)
with open(self.meta_path, "w") as f:
json.dump(data, f, indent=2)
def log_turn(self, directory, turn, tool_name, tool_args, result_len):
entry = {
"directory": directory,
"turn": turn,
"timestamp": _now_iso(),
"tool": tool_name,
"args": tool_args,
"result_length": result_len,
}
with open(self.log_path, "a") as f:
f.write(json.dumps(entry) + "\n")
def _cache_path(self, cache_type, path):
subdir = self.files_dir if cache_type == "file" else self.dirs_dir
return os.path.join(subdir, _sha256_path(path) + ".json")
def _cache_safe(self, cache_file):
real = os.path.realpath(cache_file)
root_real = os.path.realpath(self.root)
return real.startswith(root_real + os.sep)
def write_entry(self, cache_type, path, data):
cache_file = self._cache_path(cache_type, path)
if not self._cache_safe(cache_file):
return "Error: cache path escapes cache root."
required = {"path", "summary", "cached_at"}
if cache_type == "file":
required |= {"relative_path", "size_bytes", "category"}
elif cache_type == "dir":
required |= {"relative_path", "child_count", "dominant_category"}
missing = required - set(data.keys())
if missing:
return f"Error: missing required fields: {', '.join(sorted(missing))}"
if "content" in data or "contents" in data or "raw" in data:
return "Error: cache entries must not contain raw file contents."
try:
with open(cache_file, "w") as f:
json.dump(data, f, indent=2)
return "ok"
except OSError as e:
return f"Error writing cache: {e}"
def read_entry(self, cache_type, path):
cache_file = self._cache_path(cache_type, path)
if not self._cache_safe(cache_file):
return None
try:
with open(cache_file) as f:
return json.load(f)
except (OSError, json.JSONDecodeError):
return None
def has_entry(self, cache_type, path):
cache_file = self._cache_path(cache_type, path)
return os.path.exists(cache_file)
def list_entries(self, cache_type):
subdir = self.files_dir if cache_type == "file" else self.dirs_dir
result = []
try:
for name in sorted(os.listdir(subdir)):
if not name.endswith(".json"):
continue
fpath = os.path.join(subdir, name)
try:
with open(fpath) as f:
data = json.load(f)
result.append(data.get("relative_path", data.get("path", name)))
except (OSError, json.JSONDecodeError):
continue
except OSError:
pass
return result
def read_all_entries(self, cache_type):
subdir = self.files_dir if cache_type == "file" else self.dirs_dir
result = []
try:
for name in sorted(os.listdir(subdir)):
if not name.endswith(".json"):
continue
fpath = os.path.join(subdir, name)
try:
with open(fpath) as f:
result.append(json.load(f))
except (OSError, json.JSONDecodeError):
continue
except OSError:
pass
return result
# ---------------------------------------------------------------------------
# Tool definitions
# ---------------------------------------------------------------------------
_DIR_TOOLS = [
{
"name": "read_file",
"description": (
"Read and return the contents of a file. Path must be inside "
"the target directory."
),
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative path to the file.",
},
"max_bytes": {
"type": "integer",
"description": "Maximum bytes to read (default 4096).",
},
},
"required": ["path"],
},
},
{
"name": "list_directory",
"description": (
"List the contents of a directory with file sizes and types."
),
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative path to the directory.",
},
"show_hidden": {
"type": "boolean",
"description": "Include hidden files (default false).",
},
},
"required": ["path"],
},
},
{
"name": "run_command",
"description": (
"Run a read-only shell command. Allowed binaries: "
"wc, file, grep, head, tail, stat, du, find."
),
"input_schema": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute.",
},
},
"required": ["command"],
},
},
{
"name": "parse_structure",
"description": (
"Parse a source file using tree-sitter and return its structural "
"skeleton: functions, classes, imports, and code metrics. "
"Supported: Python, JavaScript, TypeScript, Rust, Go."
),
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the source file to parse.",
},
},
"required": ["path"],
},
},
{
"name": "write_cache",
"description": (
"Write a summary cache entry for a file or directory. The data "
"must NOT contain raw file contents — summaries only."
),
"input_schema": {
"type": "object",
"properties": {
"cache_type": {
"type": "string",
"enum": ["file", "dir"],
"description": "'file' or 'dir'.",
},
"path": {
"type": "string",
"description": "The path being cached.",
},
"data": {
"type": "object",
"description": (
"Cache entry. Files: {path, relative_path, size_bytes, "
"category, summary, notable, notable_reason, cached_at}. "
"Dirs: {path, relative_path, child_count, summary, "
"dominant_category, notable_files, cached_at}."
),
},
},
"required": ["cache_type", "path", "data"],
},
},
{
"name": "submit_report",
"description": (
"Submit the directory summary. This ends the investigation loop."
),
"input_schema": {
"type": "object",
"properties": {
"summary": {
"type": "string",
"description": "1-3 sentence summary of the directory.",
},
},
"required": ["summary"],
},
},
]
_SYNTHESIS_TOOLS = [
{
"name": "read_cache",
"description": "Read a previously cached summary for a file or directory.",
"input_schema": {
"type": "object",
"properties": {
"cache_type": {
"type": "string",
"enum": ["file", "dir"],
},
"path": {
"type": "string",
"description": "The path to look up.",
},
},
"required": ["cache_type", "path"],
},
},
{
"name": "list_cache",
"description": "List all cached entry paths of a given type.",
"input_schema": {
"type": "object",
"properties": {
"cache_type": {
"type": "string",
"enum": ["file", "dir"],
},
},
"required": ["cache_type"],
},
},
{
"name": "submit_report",
"description": "Submit the final analysis report.",
"input_schema": {
"type": "object",
"properties": {
"brief": {
"type": "string",
"description": "2-4 sentence summary.",
},
"detailed": {
"type": "string",
"description": "Thorough breakdown.",
},
},
"required": ["brief", "detailed"],
},
},
]
# ---------------------------------------------------------------------------
# Tool implementations
# ---------------------------------------------------------------------------
def _tool_read_file(args, target, _cache):
path = args.get("path", "")
max_bytes = args.get("max_bytes", 4096)
if not os.path.isabs(path):
path = os.path.join(target, path)
if not _path_is_safe(path, target):
return f"Error: path '{path}' is outside the target directory."
try:
file_size = os.path.getsize(path)
with open(path, "r", errors="replace") as f:
content = f.read(max_bytes)
if not content:
return "(empty file)"
if file_size > max_bytes:
content += (
f"\n\n[TRUNCATED — showed {max_bytes} of {file_size} bytes. "
f"Call again with a larger max_bytes or use "
f"run_command('tail -n ... {os.path.relpath(path, target)}') "
f"to see the rest.]"
)
return content
except OSError as e:
return f"Error reading file: {e}"
def _tool_list_directory(args, target, _cache):
path = args.get("path", target)
show_hidden = args.get("show_hidden", False)
if not os.path.isabs(path):
path = os.path.join(target, path)
if not _path_is_safe(path, target):
return f"Error: path '{path}' is outside the target directory."
if not os.path.isdir(path):
return f"Error: '{path}' is not a directory."
try:
entries = sorted(os.listdir(path))
lines = []
for name in entries:
if not show_hidden and name.startswith("."):
continue
full = os.path.join(path, name)
try:
st = os.stat(full)
mime = magic.from_file(full, mime=True) if not os.path.isdir(full) else None
if os.path.isdir(full):
lines.append(f" {name}/ (dir)")
else:
mime_str = f" [{mime}]" if mime else ""
lines.append(f" {name} ({st.st_size} bytes){mime_str}")
except OSError:
lines.append(f" {name} (stat failed)")
return "\n".join(lines) if lines else "(empty directory)"
except OSError as e:
return f"Error listing directory: {e}"
def _tool_run_command(args, target, _cache):
command = args.get("command", "")
parts = command.split()
if not parts:
return "Error: empty command."
binary = os.path.basename(parts[0])
if binary not in _COMMAND_WHITELIST:
return (
f"Error: '{binary}' is not allowed. "
f"Whitelist: {', '.join(sorted(_COMMAND_WHITELIST))}"
)
try:
result = subprocess.run(
command, shell=True, capture_output=True, text=True,
timeout=15, cwd=target,
)
output = result.stdout
if result.returncode != 0 and result.stderr:
output += f"\n(stderr: {result.stderr.strip()})"
return output.strip() if output.strip() else "(no output)"
except subprocess.TimeoutExpired:
return "Error: command timed out after 15 seconds."
except OSError as e:
return f"Error running command: {e}"
def _tool_parse_structure(args, target, _cache):
path = args.get("path", "")
if not os.path.isabs(path):
path = os.path.join(target, path)
if not _path_is_safe(path, target):
return f"Error: path '{path}' is outside the target directory."
if not os.path.isfile(path):
return f"Error: '{path}' is not a file."
ext = os.path.splitext(path)[1].lower()
ts = _get_ts_parser(ext)
if ts is None:
return f"Error: no grammar for extension '{ext}'. Supported: {', '.join(sorted(_TS_LANGUAGES.keys()))}"
parser, lang_name = ts
try:
with open(path, "rb") as f:
source = f.read()
except OSError as e:
return f"Error reading file: {e}"
tree = parser.parse(source)
root = tree.root_node
source_text = source.decode("utf-8", errors="replace")
lines = source_text.split("\n")
line_count = len(lines)
functions = []
classes = []
imports = []
has_docstrings = False
comment_lines = 0
def _walk(node):
nonlocal has_docstrings, comment_lines
for child in node.children:
ntype = child.type
# Comments
if ntype in ("comment", "line_comment", "block_comment"):
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
# Python
if lang_name == "python":
if ntype == "function_definition":
functions.append(_py_func_sig(child))
elif ntype == "class_definition":
classes.append(_py_class(child))
elif ntype in ("import_statement", "import_from_statement"):
imports.append(child.text.decode("utf-8", errors="replace").strip())
elif ntype == "expression_statement":
first = child.children[0] if child.children else None
if first and first.type == "string":
has_docstrings = True
# JavaScript
elif lang_name == "javascript":
if ntype in ("function_declaration", "arrow_function",
"function"):
functions.append(_js_func_sig(child))
elif ntype == "class_declaration":
classes.append(_js_class(child))
elif ntype in ("import_statement",):
imports.append(child.text.decode("utf-8", errors="replace").strip())
# Rust
elif lang_name == "rust":
if ntype == "function_item":
functions.append(_rust_func_sig(child))
elif ntype in ("struct_item", "enum_item", "impl_item"):
classes.append(_rust_struct(child))
elif ntype == "use_declaration":
imports.append(child.text.decode("utf-8", errors="replace").strip())
# Go
elif lang_name == "go":
if ntype == "function_declaration":
functions.append(_go_func_sig(child))
elif ntype == "type_declaration":
classes.append(_go_type(child))
elif ntype == "import_declaration":
imports.append(child.text.decode("utf-8", errors="replace").strip())
_walk(child)
_walk(root)
code_lines = max(1, line_count - comment_lines)
result = {
"language": lang_name,
"functions": functions[:50],
"classes": classes[:30],
"imports": imports[:30],
"line_count": line_count,
"has_docstrings": has_docstrings,
"has_comments": comment_lines > 0,
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
}
return json.dumps(result, indent=2)
# --- tree-sitter extraction helpers ---
def _child_by_type(node, *types):
for c in node.children:
if c.type in types:
return c
return None
def _text(node):
return node.text.decode("utf-8", errors="replace") if node else ""
def _py_func_sig(node):
name = _text(_child_by_type(node, "identifier"))
params = _text(_child_by_type(node, "parameters"))
ret = _child_by_type(node, "type")
sig = f"{name}{params}"
if ret:
sig += f" -> {_text(ret)}"
return sig
def _py_class(node):
name = _text(_child_by_type(node, "identifier"))
methods = []
body = _child_by_type(node, "block")
if body:
for child in body.children:
if child.type == "function_definition":
methods.append(_py_func_sig(child))
return {"name": name, "methods": methods[:20]}
def _js_func_sig(node):
name = _text(_child_by_type(node, "identifier"))
params = _text(_child_by_type(node, "formal_parameters"))
return f"{name}{params}" if name else f"(anonymous){params}"
def _js_class(node):
name = _text(_child_by_type(node, "identifier"))
methods = []
body = _child_by_type(node, "class_body")
if body:
for child in body.children:
if child.type == "method_definition":
mname = _text(_child_by_type(child, "property_identifier"))
mparams = _text(_child_by_type(child, "formal_parameters"))
methods.append(f"{mname}{mparams}")
return {"name": name, "methods": methods[:20]}
def _rust_func_sig(node):
name = _text(_child_by_type(node, "identifier"))
params = _text(_child_by_type(node, "parameters"))
ret = _child_by_type(node, "type_identifier", "generic_type",
"reference_type", "scoped_type_identifier")
sig = f"{name}{params}"
if ret:
sig += f" -> {_text(ret)}"
return sig
def _rust_struct(node):
name = _text(_child_by_type(node, "type_identifier"))
return {"name": name or _text(node)[:60], "methods": []}
def _go_func_sig(node):
name = _text(_child_by_type(node, "identifier"))
params = _text(_child_by_type(node, "parameter_list"))
return f"{name}{params}"
def _go_type(node):
spec = _child_by_type(node, "type_spec")
name = _text(_child_by_type(spec, "type_identifier")) if spec else ""
return {"name": name or _text(node)[:60], "methods": []}
def _tool_write_cache(args, _target, cache):
cache_type = args.get("cache_type", "")
path = args.get("path", "")
data = args.get("data", {})
if cache_type not in ("file", "dir"):
return "Error: cache_type must be 'file' or 'dir'."
return cache.write_entry(cache_type, path, data)
def _tool_read_cache(args, _target, cache):
cache_type = args.get("cache_type", "")
path = args.get("path", "")
if cache_type not in ("file", "dir"):
return "Error: cache_type must be 'file' or 'dir'."
entry = cache.read_entry(cache_type, path)
if entry is None:
return "null"
return json.dumps(entry, indent=2)
def _tool_list_cache(args, _target, cache):
cache_type = args.get("cache_type", "")
if cache_type not in ("file", "dir"):
return "Error: cache_type must be 'file' or 'dir'."
paths = cache.list_entries(cache_type)
if not paths:
return "(no cached entries)"
return "\n".join(paths)
_TOOL_DISPATCH = {
"read_file": _tool_read_file,
"list_directory": _tool_list_directory,
"run_command": _tool_run_command,
"parse_structure": _tool_parse_structure,
"write_cache": _tool_write_cache,
"read_cache": _tool_read_cache,
"list_cache": _tool_list_cache,
}
def _execute_tool(name, args, target, cache, dir_rel, turn, verbose=False):
"""Execute a tool by name and return the result string."""
handler = _TOOL_DISPATCH.get(name)
if handler is None:
return f"Error: unknown tool '{name}'."
result = handler(args, target, cache)
cache.log_turn(dir_rel, turn, name,
{k: v for k, v in args.items() if k != "data"},
len(result))
if verbose:
preview = result[:200] + "..." if len(result) > 200 else result
print(f" [AI] <- {len(result)} chars: {preview}", file=sys.stderr)
return result
# ---------------------------------------------------------------------------
# Streaming API caller
# ---------------------------------------------------------------------------
def _call_api_streaming(client, system, messages, tools, tracker):
"""Call Claude via streaming. Print tool decisions in real-time.
Returns (content_blocks, usage) where content_blocks is the list of
content blocks from the response.
"""
with client.messages.stream(
model=MODEL,
max_tokens=4096,
system=system,
messages=messages,
tools=tools,
) as stream:
# Print tool call names as they arrive
current_tool = None
for event in stream:
if event.type == "content_block_start":
block = event.content_block
if block.type == "tool_use":
current_tool = block.name
# We'll print the full args after the block is complete
elif event.type == "content_block_stop":
current_tool = None
response = stream.get_final_message()
tracker.record(response.usage)
return response.content, response.usage
# ---------------------------------------------------------------------------
# Directory discovery
# ---------------------------------------------------------------------------
def _discover_directories(target, show_hidden=False):
"""Walk the target and return all directories sorted leaves-first."""
dirs = []
target_real = os.path.realpath(target)
for root, subdirs, _files in os.walk(target_real, topdown=True):
subdirs[:] = [
d for d in subdirs
if not _should_skip_dir(d)
and (show_hidden or not d.startswith("."))
]
dirs.append(root)
dirs.sort(key=lambda d: (-d.count(os.sep), d))
return dirs
# ---------------------------------------------------------------------------
# Per-directory agent loop
# ---------------------------------------------------------------------------
_DIR_SYSTEM_PROMPT = """\
You are an expert analyst investigating a SINGLE directory on a file system.
Do NOT assume the type of content before investigating. Discover what this
directory contains from what you find.
## Your Task
Investigate the directory: {dir_path}
(relative to target: {dir_rel})
You must:
1. Read the important files in THIS directory (not subdirectories)
2. For each file you read, call write_cache to save a summary
3. Call write_cache for the directory itself with a synthesis
4. Call submit_report with a 1-3 sentence summary
## Tools
parse_structure gives you the skeleton of a file. It does NOT replace \
reading the file. Use parse_structure first to understand structure, then \
use read_file if you need to verify intent, check for anomalies, or \
understand content that structure cannot capture (comments, documentation, \
data files, config values). A file where structure and content appear to \
contradict each other is always worth reading in full.
## Efficiency Rules
- Batch multiple tool calls in a single turn whenever possible
- Skip binary/compiled/generated files (.pyc, .class, .o, .min.js, etc.)
- Skip files >100KB unless uniquely important
- Prioritize: README, index, main, config, schema, manifest files
- For source files: try parse_structure first, then read_file if needed
- If read_file returns truncated content, use a larger max_bytes or
run_command('tail ...') — NEVER retry the identical call
- You have only {max_turns} turns — be efficient
## Cache Schemas
File: {{path, relative_path, size_bytes, category, summary, notable,
notable_reason, cached_at}}
Dir: {{path, relative_path, child_count, summary, dominant_category,
notable_files, cached_at}}
category values: source, config, data, document, media, archive, unknown
## Context
{context}
## Child Directory Summaries (already investigated)
{child_summaries}"""
def _build_dir_context(dir_path):
lines = []
try:
entries = sorted(os.listdir(dir_path))
for name in entries:
if name.startswith("."):
continue
full = os.path.join(dir_path, name)
try:
st = os.stat(full)
if os.path.isdir(full):
lines.append(f" {name}/ (dir)")
else:
mime = magic.from_file(full, mime=True)
lines.append(f" {name} ({st.st_size} bytes) [{mime}]")
except OSError:
lines.append(f" {name} (stat failed)")
except OSError:
lines.append(" (could not list directory)")
return "Directory contents:\n" + "\n".join(lines) if lines else "(empty)"
def _get_child_summaries(dir_path, cache):
parts = []
try:
for name in sorted(os.listdir(dir_path)):
child = os.path.join(dir_path, name)
if not os.path.isdir(child):
continue
entry = cache.read_entry("dir", child)
if entry:
rel = entry.get("relative_path", name)
summary = entry.get("summary", "(no summary)")
parts.append(f"- {rel}/: {summary}")
except OSError:
pass
return "\n".join(parts) if parts else "(none — this is a leaf directory)"
def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=10,
verbose=False):
"""Run an isolated agent loop for a single directory."""
dir_rel = os.path.relpath(dir_path, target)
if dir_rel == ".":
dir_rel = os.path.basename(target)
context = _build_dir_context(dir_path)
child_summaries = _get_child_summaries(dir_path, cache)
system = _DIR_SYSTEM_PROMPT.format(
dir_path=dir_path,
dir_rel=dir_rel,
max_turns=max_turns,
context=context,
child_summaries=child_summaries,
)
messages = [
{
"role": "user",
"content": (
"Investigate this directory now. Use parse_structure for "
"source files, read_file for others, cache summaries, and "
"call submit_report. Batch tool calls for efficiency."
),
},
]
tracker.reset_loop()
summary = None
for turn in range(max_turns):
# Check context budget
if tracker.budget_exceeded():
print(f" [AI] Context budget reached — exiting early "
f"({tracker.loop_total:,} tokens used)", file=sys.stderr)
break
try:
content_blocks, usage = _call_api_streaming(
client, system, messages, _DIR_TOOLS, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
break
# Print tool decisions now that we have the full response
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items() if k != "data"
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_report with your summary.",
})
continue
tool_results = []
done = False
for tu in tool_uses:
if tu.name == "submit_report":
summary = tu.input.get("summary", "")
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Summary submitted.",
})
done = True
else:
result_text = _execute_tool(
tu.name, tu.input, target, cache, dir_rel,
turn + 1, verbose=verbose,
)
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": result_text,
})
messages.append({"role": "user", "content": tool_results})
if done:
break
else:
print(f" [AI] Warning: max turns reached for {dir_rel}",
file=sys.stderr)
return summary
def _block_to_dict(block):
"""Convert an SDK content block to a plain dict for message history."""
if block.type == "text":
return {"type": "text", "text": block.text}
elif block.type == "tool_use":
return {"type": "tool_use", "id": block.id,
"name": block.name, "input": block.input}
return {"type": block.type}
# ---------------------------------------------------------------------------
# Synthesis pass
# ---------------------------------------------------------------------------
_SYNTHESIS_SYSTEM_PROMPT = """\
You are an expert analyst synthesizing a final report about a directory tree.
ALL directory summaries are provided below — you do NOT need to call
list_cache or read_cache. Just read the summaries and call submit_report
immediately in your first turn.
Do NOT assume the type of content. Let the summaries speak for themselves.
## Your Goal
Produce two outputs via the submit_report tool:
1. **brief**: A 2-4 sentence summary of what this directory tree is.
2. **detailed**: A thorough breakdown covering purpose, structure, key
components, technologies, notable patterns, and any concerns.
## Rules
- ALL summaries are below — call submit_report directly
- Be specific — reference actual directory and file names
- Do NOT call list_cache or read_cache
## Target
{target}
## Directory Summaries
{summaries_text}"""
def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False):
"""Run the final synthesis pass. Returns (brief, detailed)."""
dir_entries = cache.read_all_entries("dir")
summary_lines = []
for entry in dir_entries:
rel = entry.get("relative_path", "?")
summary = entry.get("summary", "(no summary)")
dominant = entry.get("dominant_category", "?")
notable = entry.get("notable_files", [])
summary_lines.append(f"### {rel}/")
summary_lines.append(f"Category: {dominant}")
summary_lines.append(f"Summary: {summary}")
if notable:
summary_lines.append(f"Notable files: {', '.join(notable)}")
summary_lines.append("")
summaries_text = "\n".join(summary_lines) if summary_lines else "(none)"
system = _SYNTHESIS_SYSTEM_PROMPT.format(
target=target,
summaries_text=summaries_text,
)
messages = [
{
"role": "user",
"content": (
"All directory summaries are in the system prompt above. "
"Synthesize them into a cohesive report and call "
"submit_report immediately — no other tool calls needed."
),
},
]
brief, detailed = "", ""
for turn in range(max_turns):
try:
content_blocks, usage = _call_api_streaming(
client, system, messages, _SYNTHESIS_TOOLS, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
break
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items() if k != "data"
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_report with your analysis.",
})
continue
tool_results = []
done = False
for tu in tool_uses:
if tu.name == "submit_report":
brief = tu.input.get("brief", "")
detailed = tu.input.get("detailed", "")
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Report submitted. Thank you.",
})
done = True
else:
result_text = _execute_tool(
tu.name, tu.input, target, cache, "(synthesis)",
turn + 1, verbose=verbose,
)
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": result_text,
})
messages.append({"role": "user", "content": tool_results})
if done:
break
else:
print(" [AI] Warning: synthesis ran out of turns.", file=sys.stderr)
brief, detailed = _synthesize_from_cache(cache)
return brief, detailed
def _synthesize_from_cache(cache):
"""Build a best-effort report from cached directory summaries."""
dir_entries = cache.read_all_entries("dir")
if not dir_entries:
return ("(AI analysis incomplete — no data was cached)", "")
brief_parts = []
detail_parts = []
for entry in dir_entries:
rel = entry.get("relative_path", "?")
summary = entry.get("summary", "")
if summary:
detail_parts.append(f"**{rel}/**: {summary}")
brief_parts.append(summary)
brief = brief_parts[0] if brief_parts else "(AI analysis incomplete)"
detailed = "\n\n".join(detail_parts) if detail_parts else ""
return brief, detailed
# ---------------------------------------------------------------------------
# Main orchestrator
# ---------------------------------------------------------------------------
def _run_investigation(client, target, report, show_hidden=False,
fresh=False, verbose=False):
"""Orchestrate the multi-pass investigation. Returns (brief, detailed)."""
investigation_id, is_new = _get_investigation_id(target, fresh=fresh)
cache = _CacheManager(investigation_id, target)
tracker = _TokenTracker()
if is_new:
cache.write_meta(MODEL, _now_iso())
print(f" [AI] Investigation ID: {investigation_id}"
f"{'' if is_new else ' (resumed)'}", file=sys.stderr)
print(f" [AI] Cache: {cache.root}/", file=sys.stderr)
all_dirs = _discover_directories(target, show_hidden=show_hidden)
to_investigate = []
cached_count = 0
for d in all_dirs:
if cache.has_entry("dir", d):
cached_count += 1
rel = os.path.relpath(d, target)
print(f" [AI] Skipping (cached): {rel}/", file=sys.stderr)
else:
to_investigate.append(d)
total = len(to_investigate)
if cached_count:
print(f" [AI] Directories cached: {cached_count}", file=sys.stderr)
print(f" [AI] Directories to investigate: {total}", file=sys.stderr)
for i, dir_path in enumerate(to_investigate, 1):
dir_rel = os.path.relpath(dir_path, target)
if dir_rel == ".":
dir_rel = os.path.basename(target)
print(f" [AI] Investigating: {dir_rel}/ ({i}/{total})",
file=sys.stderr)
summary = _run_dir_loop(
client, target, cache, tracker, dir_path, verbose=verbose,
)
if summary and not cache.has_entry("dir", dir_path):
cache.write_entry("dir", dir_path, {
"path": dir_path,
"relative_path": os.path.relpath(dir_path, target),
"child_count": len([
n for n in os.listdir(dir_path)
if not n.startswith(".")
]) if os.path.isdir(dir_path) else 0,
"summary": summary,
"dominant_category": "unknown",
"notable_files": [],
"cached_at": _now_iso(),
})
cache.update_meta(
directories_investigated=total + cached_count,
end_time=_now_iso(),
)
print(" [AI] Synthesis pass...", file=sys.stderr)
brief, detailed = _run_synthesis(
client, target, cache, tracker, verbose=verbose,
)
print(f" [AI] Total tokens used: {tracker.summary()}", file=sys.stderr)
return brief, detailed
# ---------------------------------------------------------------------------
# Cache cleanup
# ---------------------------------------------------------------------------
def clear_cache():
"""Remove all investigation caches under /tmp/luminos/."""
import shutil
if os.path.isdir(CACHE_ROOT):
shutil.rmtree(CACHE_ROOT)
print(f"Cleared cache: {CACHE_ROOT}", file=sys.stderr)
else:
print(f"No cache to clear ({CACHE_ROOT} does not exist).",
file=sys.stderr)
# ---------------------------------------------------------------------------
# Public interface
# ---------------------------------------------------------------------------
def analyze_directory(report, target, verbose_tools=False, fresh=False):
"""Run AI analysis on the directory. Returns (brief, detailed) strings.
Returns ("", "") if the API key is missing or dependencies are not met.
"""
if not check_ai_dependencies():
sys.exit(1)
api_key = _get_api_key()
if not api_key:
return "", ""
print(" [AI] Starting multi-pass investigation...", file=sys.stderr)
client = anthropic.Anthropic(api_key=api_key)
try:
brief, detailed = _run_investigation(
client, target, report, fresh=fresh, verbose=verbose_tools,
)
except Exception as e:
print(f"Warning: AI analysis failed: {e}", file=sys.stderr)
return "", ""
if not brief and not detailed:
print(" [AI] Warning: agent produced no output.", file=sys.stderr)
print(" [AI] Investigation complete.", file=sys.stderr)
return brief, detailed