From 907dcf0a379f8a8b4741aaf19b2637a6537d65b0 Mon Sep 17 00:00:00 2001 From: Jeff Smith Date: Mon, 30 Mar 2026 12:13:55 -0600 Subject: [PATCH] refactor: replace single-shot API with multi-pass agentic investigation Rewrites ai.py from a single Claude API call into a multi-pass, cache-driven agent architecture: - Per-directory isolated agent loops (max 10 turns each) with context discarded between directories - Leaves-first processing order so child summaries inform parents - Disk cache (/tmp/luminos/{uuid}/) persists across runs for resumability - Investigation ID persistence keyed by target realpath - Separate synthesis pass reads only directory-level cache entries - Replaces urllib with Anthropic SDK (streaming, automatic retries) - Token counting with 70% context budget threshold for early exit - parse_structure tool via tree-sitter (Python, JS, Rust, Go) - python-magic integration for MIME-aware directory listings - Cost tracking printed at end of investigation Co-Authored-By: Claude Opus 4.6 (1M context) --- luminos_lib/ai.py | 1457 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 1319 insertions(+), 138 deletions(-) diff --git a/luminos_lib/ai.py b/luminos_lib/ai.py index b9d39f8..69108ec 100644 --- a/luminos_lib/ai.py +++ b/luminos_lib/ai.py @@ -1,16 +1,86 @@ -"""AI-powered directory analysis using the Claude API (stdlib only).""" +"""AI-powered directory analysis using a multi-pass, cache-driven agent loop. +Architecture: + 1. Discover all directories under the target + 2. Sort leaves-first (deepest directories first) + 3. Run an isolated agent loop per directory (max 10 turns each) + 4. Cache every file and directory summary to disk + 5. Run a final synthesis pass reading only directory cache entries + +Uses the Anthropic SDK for streaming, automatic retries, and token counting. +Uses tree-sitter for AST parsing and python-magic for file classification. +""" + +import hashlib import json import os +import subprocess import sys -import urllib.request -import urllib.error +import uuid +from datetime import datetime, timezone + +import anthropic +import magic +import tree_sitter +import tree_sitter_python +import tree_sitter_javascript +import tree_sitter_rust +import tree_sitter_go + +from luminos_lib.capabilities import check_ai_dependencies -API_URL = "https://api.anthropic.com/v1/messages" MODEL = "claude-sonnet-4-20250514" -MAX_FILE_SAMPLE_BYTES = 2048 -MAX_FILES_TO_SAMPLE = 30 +CACHE_ROOT = "/tmp/luminos" +INVESTIGATIONS_PATH = os.path.join(CACHE_ROOT, "investigations.json") +# Context budget: trigger early exit at 70% of Sonnet's context window. +MAX_CONTEXT = 180_000 +CONTEXT_BUDGET = int(MAX_CONTEXT * 0.70) + +# Pricing per 1M tokens (Claude Sonnet). +INPUT_PRICE_PER_M = 3.00 +OUTPUT_PRICE_PER_M = 15.00 + +# Directories to always skip during investigation. +_SKIP_DIRS = { + ".git", "__pycache__", "node_modules", ".tox", ".mypy_cache", + ".pytest_cache", ".venv", "venv", ".env", "dist", "build", + ".eggs", "*.egg-info", ".svn", ".hg", +} + +# Commands the run_command tool is allowed to execute. +_COMMAND_WHITELIST = {"wc", "file", "grep", "head", "tail", "stat", "du", "find"} + +# tree-sitter language registry: extension → (grammar_module, language_name) +_TS_LANGUAGES = { + ".py": (tree_sitter_python, "python"), + ".js": (tree_sitter_javascript, "javascript"), + ".jsx": (tree_sitter_javascript, "javascript"), + ".mjs": (tree_sitter_javascript, "javascript"), + ".rs": (tree_sitter_rust, "rust"), + ".go": (tree_sitter_go, "go"), +} + +# Precompute Language objects once. +_TS_LANG_CACHE = {} + + +def _get_ts_parser(ext): + """Return a (Parser, language_name) tuple for a file extension, or None.""" + entry = _TS_LANGUAGES.get(ext) + if entry is None: + return None + module, lang_name = entry + if lang_name not in _TS_LANG_CACHE: + _TS_LANG_CACHE[lang_name] = tree_sitter.Language(module.language()) + lang = _TS_LANG_CACHE[lang_name] + parser = tree_sitter.Parser(lang) + return parser, lang_name + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- def _get_api_key(): """Read the Anthropic API key from the environment.""" @@ -21,162 +91,1273 @@ def _get_api_key(): return key -def _sample_file(path, max_bytes=MAX_FILE_SAMPLE_BYTES): - """Read the first max_bytes of a text file. Returns None for binary.""" +def _path_is_safe(path, target): + """Return True if *path* resolves to somewhere inside *target*.""" + real = os.path.realpath(path) + target_real = os.path.realpath(target) + return real == target_real or real.startswith(target_real + os.sep) + + +def _sha256_path(path): + """Return a hex SHA-256 of a path string, used as cache key.""" + return hashlib.sha256(path.encode("utf-8")).hexdigest() + + +def _now_iso(): + return datetime.now(timezone.utc).isoformat() + + +def _should_skip_dir(name): + """Return True if a directory name matches the skip list.""" + if name in _SKIP_DIRS: + return True + for pattern in _SKIP_DIRS: + if pattern.startswith("*") and name.endswith(pattern[1:]): + return True + return False + + +# --------------------------------------------------------------------------- +# Token tracker +# --------------------------------------------------------------------------- + +class _TokenTracker: + """Track cumulative token usage across API calls.""" + + def __init__(self): + self.total_input = 0 + self.total_output = 0 + self.loop_input = 0 + self.loop_output = 0 + + def record(self, usage): + """Record usage from a single API call.""" + inp = getattr(usage, "input_tokens", 0) + out = getattr(usage, "output_tokens", 0) + self.total_input += inp + self.total_output += out + self.loop_input += inp + self.loop_output += out + + def reset_loop(self): + """Reset per-loop counters (called between directory loops).""" + self.loop_input = 0 + self.loop_output = 0 + + @property + def loop_total(self): + return self.loop_input + self.loop_output + + def budget_exceeded(self): + return self.loop_total > CONTEXT_BUDGET + + def summary(self): + cost_in = self.total_input * INPUT_PRICE_PER_M / 1_000_000 + cost_out = self.total_output * OUTPUT_PRICE_PER_M / 1_000_000 + cost = cost_in + cost_out + return (f"{self.total_input:,} input / {self.total_output:,} output " + f"(approx ${cost:.2f})") + + +# --------------------------------------------------------------------------- +# Investigation ID persistence +# --------------------------------------------------------------------------- + +def _load_investigations(): try: - with open(path, "r", errors="replace") as f: - return f.read(max_bytes) - except (OSError, UnicodeDecodeError): - return None + with open(INVESTIGATIONS_PATH) as f: + return json.load(f) + except (OSError, json.JSONDecodeError): + return {} -def _build_context(report, target): - """Build a textual context from the scan report for the AI prompt.""" - parts = [] - - parts.append(f"Directory: {target}") - parts.append("") - - # Tree structure - tree_text = report.get("tree_rendered", "") - if tree_text: - parts.append("=== Directory tree ===") - parts.append(tree_text) - parts.append("") - - # File categories - cats = report.get("file_categories", {}) - if cats: - parts.append("=== File categories ===") - for cat, count in sorted(cats.items(), key=lambda x: -x[1]): - parts.append(f" {cat}: {count}") - parts.append("") - - # Languages - langs = report.get("languages", []) - loc = report.get("lines_of_code", {}) - if langs: - parts.append("=== Languages detected ===") - for lang in sorted(loc, key=loc.get, reverse=True): - parts.append(f" {lang}: {loc[lang]} lines") - parts.append("") - - # Sample file contents - classified = report.get("classified_files", []) - # Prioritize source and config files for sampling - priority = {"source": 0, "config": 1, "document": 2, "data": 3} - samplable = sorted(classified, - key=lambda f: priority.get(f["category"], 99)) - sampled = 0 - samples = [] - for f in samplable: - if sampled >= MAX_FILES_TO_SAMPLE: - break - content = _sample_file(f["path"]) - if content and content.strip(): - rel = os.path.relpath(f["path"], target) - samples.append(f"--- {rel} ---\n{content}") - sampled += 1 - - if samples: - parts.append("=== File samples (first ~2KB each) ===") - parts.append("\n\n".join(samples)) - - return "\n".join(parts) +def _save_investigations(data): + os.makedirs(CACHE_ROOT, exist_ok=True) + with open(INVESTIGATIONS_PATH, "w") as f: + json.dump(data, f, indent=2) -def _call_claude(api_key, context): - """Call the Claude API and return the response text.""" - prompt = ( - "You are analyzing a directory on a file system. Based on the tree " - "structure, file types, languages, and file content samples below, " - "produce two sections:\n\n" - "1. **BRIEF SUMMARY** (2-4 sentences): What is this directory? What is " - "its purpose? What kind of project or data does it contain?\n\n" - "2. **DETAILED BREAKDOWN**: A thorough analysis covering:\n" - " - The overall purpose and architecture of the project/directory\n" - " - Key components and what they do\n" - " - Technologies and frameworks in use\n" - " - Notable patterns, conventions, or design decisions\n" - " - Any potential concerns (e.g., missing tests, large binaries, " - "stale files)\n\n" - "Format your response exactly as:\n" - "BRIEF: \n\n" - "DETAILED:\n\n\n" - "Be specific and concrete — reference actual filenames and directories. " - "Do not hedge or use filler phrases." - ) +def _get_investigation_id(target, fresh=False): + target_real = os.path.realpath(target) + investigations = _load_investigations() + if not fresh and target_real in investigations: + inv_id = investigations[target_real] + cache_dir = os.path.join(CACHE_ROOT, inv_id) + if os.path.isdir(cache_dir): + return inv_id, False + inv_id = str(uuid.uuid4()) + investigations[target_real] = inv_id + _save_investigations(investigations) + return inv_id, True - body = json.dumps({ - "model": MODEL, - "max_tokens": 2048, - "messages": [ - {"role": "user", "content": f"{prompt}\n\n{context}"}, - ], - }).encode("utf-8") - req = urllib.request.Request( - API_URL, - data=body, - headers={ - "Content-Type": "application/json", - "x-api-key": api_key, - "anthropic-version": "2023-06-01", +# --------------------------------------------------------------------------- +# Cache manager +# --------------------------------------------------------------------------- + +class _CacheManager: + """Manages the /tmp/luminos/{investigation_id}/ cache tree.""" + + def __init__(self, investigation_id, target): + self.investigation_id = investigation_id + self.target = os.path.realpath(target) + self.root = os.path.join(CACHE_ROOT, investigation_id) + self.files_dir = os.path.join(self.root, "files") + self.dirs_dir = os.path.join(self.root, "dirs") + self.log_path = os.path.join(self.root, "investigation.log") + self.meta_path = os.path.join(self.root, "meta.json") + os.makedirs(self.files_dir, exist_ok=True) + os.makedirs(self.dirs_dir, exist_ok=True) + + def write_meta(self, model, start_time): + data = { + "investigation_id": self.investigation_id, + "target": self.target, + "start_time": start_time, + "model": model, + "directories_investigated": 0, + "total_turns": 0, + } + with open(self.meta_path, "w") as f: + json.dump(data, f, indent=2) + + def update_meta(self, **kwargs): + try: + with open(self.meta_path) as f: + data = json.load(f) + except (OSError, json.JSONDecodeError): + data = {} + data.update(kwargs) + with open(self.meta_path, "w") as f: + json.dump(data, f, indent=2) + + def log_turn(self, directory, turn, tool_name, tool_args, result_len): + entry = { + "directory": directory, + "turn": turn, + "timestamp": _now_iso(), + "tool": tool_name, + "args": tool_args, + "result_length": result_len, + } + with open(self.log_path, "a") as f: + f.write(json.dumps(entry) + "\n") + + def _cache_path(self, cache_type, path): + subdir = self.files_dir if cache_type == "file" else self.dirs_dir + return os.path.join(subdir, _sha256_path(path) + ".json") + + def _cache_safe(self, cache_file): + real = os.path.realpath(cache_file) + root_real = os.path.realpath(self.root) + return real.startswith(root_real + os.sep) + + def write_entry(self, cache_type, path, data): + cache_file = self._cache_path(cache_type, path) + if not self._cache_safe(cache_file): + return "Error: cache path escapes cache root." + required = {"path", "summary", "cached_at"} + if cache_type == "file": + required |= {"relative_path", "size_bytes", "category"} + elif cache_type == "dir": + required |= {"relative_path", "child_count", "dominant_category"} + missing = required - set(data.keys()) + if missing: + return f"Error: missing required fields: {', '.join(sorted(missing))}" + if "content" in data or "contents" in data or "raw" in data: + return "Error: cache entries must not contain raw file contents." + try: + with open(cache_file, "w") as f: + json.dump(data, f, indent=2) + return "ok" + except OSError as e: + return f"Error writing cache: {e}" + + def read_entry(self, cache_type, path): + cache_file = self._cache_path(cache_type, path) + if not self._cache_safe(cache_file): + return None + try: + with open(cache_file) as f: + return json.load(f) + except (OSError, json.JSONDecodeError): + return None + + def has_entry(self, cache_type, path): + cache_file = self._cache_path(cache_type, path) + return os.path.exists(cache_file) + + def list_entries(self, cache_type): + subdir = self.files_dir if cache_type == "file" else self.dirs_dir + result = [] + try: + for name in sorted(os.listdir(subdir)): + if not name.endswith(".json"): + continue + fpath = os.path.join(subdir, name) + try: + with open(fpath) as f: + data = json.load(f) + result.append(data.get("relative_path", data.get("path", name))) + except (OSError, json.JSONDecodeError): + continue + except OSError: + pass + return result + + def read_all_entries(self, cache_type): + subdir = self.files_dir if cache_type == "file" else self.dirs_dir + result = [] + try: + for name in sorted(os.listdir(subdir)): + if not name.endswith(".json"): + continue + fpath = os.path.join(subdir, name) + try: + with open(fpath) as f: + result.append(json.load(f)) + except (OSError, json.JSONDecodeError): + continue + except OSError: + pass + return result + + +# --------------------------------------------------------------------------- +# Tool definitions +# --------------------------------------------------------------------------- + +_DIR_TOOLS = [ + { + "name": "read_file", + "description": ( + "Read and return the contents of a file. Path must be inside " + "the target directory." + ), + "input_schema": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Absolute or relative path to the file.", + }, + "max_bytes": { + "type": "integer", + "description": "Maximum bytes to read (default 4096).", + }, + }, + "required": ["path"], }, - method="POST", - ) + }, + { + "name": "list_directory", + "description": ( + "List the contents of a directory with file sizes and types." + ), + "input_schema": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Absolute or relative path to the directory.", + }, + "show_hidden": { + "type": "boolean", + "description": "Include hidden files (default false).", + }, + }, + "required": ["path"], + }, + }, + { + "name": "run_command", + "description": ( + "Run a read-only shell command. Allowed binaries: " + "wc, file, grep, head, tail, stat, du, find." + ), + "input_schema": { + "type": "object", + "properties": { + "command": { + "type": "string", + "description": "The shell command to execute.", + }, + }, + "required": ["command"], + }, + }, + { + "name": "parse_structure", + "description": ( + "Parse a source file using tree-sitter and return its structural " + "skeleton: functions, classes, imports, and code metrics. " + "Supported: Python, JavaScript, TypeScript, Rust, Go." + ), + "input_schema": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Path to the source file to parse.", + }, + }, + "required": ["path"], + }, + }, + { + "name": "write_cache", + "description": ( + "Write a summary cache entry for a file or directory. The data " + "must NOT contain raw file contents — summaries only." + ), + "input_schema": { + "type": "object", + "properties": { + "cache_type": { + "type": "string", + "enum": ["file", "dir"], + "description": "'file' or 'dir'.", + }, + "path": { + "type": "string", + "description": "The path being cached.", + }, + "data": { + "type": "object", + "description": ( + "Cache entry. Files: {path, relative_path, size_bytes, " + "category, summary, notable, notable_reason, cached_at}. " + "Dirs: {path, relative_path, child_count, summary, " + "dominant_category, notable_files, cached_at}." + ), + }, + }, + "required": ["cache_type", "path", "data"], + }, + }, + { + "name": "submit_report", + "description": ( + "Submit the directory summary. This ends the investigation loop." + ), + "input_schema": { + "type": "object", + "properties": { + "summary": { + "type": "string", + "description": "1-3 sentence summary of the directory.", + }, + }, + "required": ["summary"], + }, + }, +] + +_SYNTHESIS_TOOLS = [ + { + "name": "read_cache", + "description": "Read a previously cached summary for a file or directory.", + "input_schema": { + "type": "object", + "properties": { + "cache_type": { + "type": "string", + "enum": ["file", "dir"], + }, + "path": { + "type": "string", + "description": "The path to look up.", + }, + }, + "required": ["cache_type", "path"], + }, + }, + { + "name": "list_cache", + "description": "List all cached entry paths of a given type.", + "input_schema": { + "type": "object", + "properties": { + "cache_type": { + "type": "string", + "enum": ["file", "dir"], + }, + }, + "required": ["cache_type"], + }, + }, + { + "name": "submit_report", + "description": "Submit the final analysis report.", + "input_schema": { + "type": "object", + "properties": { + "brief": { + "type": "string", + "description": "2-4 sentence summary.", + }, + "detailed": { + "type": "string", + "description": "Thorough breakdown.", + }, + }, + "required": ["brief", "detailed"], + }, + }, +] + + +# --------------------------------------------------------------------------- +# Tool implementations +# --------------------------------------------------------------------------- + +def _tool_read_file(args, target, _cache): + path = args.get("path", "") + max_bytes = args.get("max_bytes", 4096) + if not os.path.isabs(path): + path = os.path.join(target, path) + if not _path_is_safe(path, target): + return f"Error: path '{path}' is outside the target directory." + try: + file_size = os.path.getsize(path) + with open(path, "r", errors="replace") as f: + content = f.read(max_bytes) + if not content: + return "(empty file)" + if file_size > max_bytes: + content += ( + f"\n\n[TRUNCATED — showed {max_bytes} of {file_size} bytes. " + f"Call again with a larger max_bytes or use " + f"run_command('tail -n ... {os.path.relpath(path, target)}') " + f"to see the rest.]" + ) + return content + except OSError as e: + return f"Error reading file: {e}" + + +def _tool_list_directory(args, target, _cache): + path = args.get("path", target) + show_hidden = args.get("show_hidden", False) + if not os.path.isabs(path): + path = os.path.join(target, path) + if not _path_is_safe(path, target): + return f"Error: path '{path}' is outside the target directory." + if not os.path.isdir(path): + return f"Error: '{path}' is not a directory." + try: + entries = sorted(os.listdir(path)) + lines = [] + for name in entries: + if not show_hidden and name.startswith("."): + continue + full = os.path.join(path, name) + try: + st = os.stat(full) + mime = magic.from_file(full, mime=True) if not os.path.isdir(full) else None + if os.path.isdir(full): + lines.append(f" {name}/ (dir)") + else: + mime_str = f" [{mime}]" if mime else "" + lines.append(f" {name} ({st.st_size} bytes){mime_str}") + except OSError: + lines.append(f" {name} (stat failed)") + return "\n".join(lines) if lines else "(empty directory)" + except OSError as e: + return f"Error listing directory: {e}" + + +def _tool_run_command(args, target, _cache): + command = args.get("command", "") + parts = command.split() + if not parts: + return "Error: empty command." + binary = os.path.basename(parts[0]) + if binary not in _COMMAND_WHITELIST: + return ( + f"Error: '{binary}' is not allowed. " + f"Whitelist: {', '.join(sorted(_COMMAND_WHITELIST))}" + ) + try: + result = subprocess.run( + command, shell=True, capture_output=True, text=True, + timeout=15, cwd=target, + ) + output = result.stdout + if result.returncode != 0 and result.stderr: + output += f"\n(stderr: {result.stderr.strip()})" + return output.strip() if output.strip() else "(no output)" + except subprocess.TimeoutExpired: + return "Error: command timed out after 15 seconds." + except OSError as e: + return f"Error running command: {e}" + + +def _tool_parse_structure(args, target, _cache): + path = args.get("path", "") + if not os.path.isabs(path): + path = os.path.join(target, path) + if not _path_is_safe(path, target): + return f"Error: path '{path}' is outside the target directory." + if not os.path.isfile(path): + return f"Error: '{path}' is not a file." + + ext = os.path.splitext(path)[1].lower() + ts = _get_ts_parser(ext) + if ts is None: + return f"Error: no grammar for extension '{ext}'. Supported: {', '.join(sorted(_TS_LANGUAGES.keys()))}" + + parser, lang_name = ts try: - with urllib.request.urlopen(req, timeout=60) as resp: - data = json.loads(resp.read().decode("utf-8")) - # Extract text from the response - for block in data.get("content", []): - if block.get("type") == "text": - return block["text"] - return "" - except urllib.error.HTTPError as e: - body = e.read().decode("utf-8", errors="replace") - print(f"Warning: Claude API error {e.code}: {body}", file=sys.stderr) - return "" - except (urllib.error.URLError, OSError, json.JSONDecodeError) as e: - print(f"Warning: Claude API request failed: {e}", file=sys.stderr) - return "" + with open(path, "rb") as f: + source = f.read() + except OSError as e: + return f"Error reading file: {e}" + + tree = parser.parse(source) + root = tree.root_node + source_text = source.decode("utf-8", errors="replace") + lines = source_text.split("\n") + line_count = len(lines) + + functions = [] + classes = [] + imports = [] + has_docstrings = False + comment_lines = 0 + + def _walk(node): + nonlocal has_docstrings, comment_lines + for child in node.children: + ntype = child.type + + # Comments + if ntype in ("comment", "line_comment", "block_comment"): + comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1 + + # Python + if lang_name == "python": + if ntype == "function_definition": + functions.append(_py_func_sig(child)) + elif ntype == "class_definition": + classes.append(_py_class(child)) + elif ntype in ("import_statement", "import_from_statement"): + imports.append(child.text.decode("utf-8", errors="replace").strip()) + elif ntype == "expression_statement": + first = child.children[0] if child.children else None + if first and first.type == "string": + has_docstrings = True + + # JavaScript + elif lang_name == "javascript": + if ntype in ("function_declaration", "arrow_function", + "function"): + functions.append(_js_func_sig(child)) + elif ntype == "class_declaration": + classes.append(_js_class(child)) + elif ntype in ("import_statement",): + imports.append(child.text.decode("utf-8", errors="replace").strip()) + + # Rust + elif lang_name == "rust": + if ntype == "function_item": + functions.append(_rust_func_sig(child)) + elif ntype in ("struct_item", "enum_item", "impl_item"): + classes.append(_rust_struct(child)) + elif ntype == "use_declaration": + imports.append(child.text.decode("utf-8", errors="replace").strip()) + + # Go + elif lang_name == "go": + if ntype == "function_declaration": + functions.append(_go_func_sig(child)) + elif ntype == "type_declaration": + classes.append(_go_type(child)) + elif ntype == "import_declaration": + imports.append(child.text.decode("utf-8", errors="replace").strip()) + + _walk(child) + + _walk(root) + + code_lines = max(1, line_count - comment_lines) + result = { + "language": lang_name, + "functions": functions[:50], + "classes": classes[:30], + "imports": imports[:30], + "line_count": line_count, + "has_docstrings": has_docstrings, + "has_comments": comment_lines > 0, + "comment_to_code_ratio": round(comment_lines / code_lines, 2), + } + return json.dumps(result, indent=2) -def _parse_response(text): - """Parse the AI response into brief and detailed sections.""" - brief = "" - detailed = "" +# --- tree-sitter extraction helpers --- - if "BRIEF:" in text: - after_brief = text.split("BRIEF:", 1)[1] - if "DETAILED:" in after_brief: - brief = after_brief.split("DETAILED:", 1)[0].strip() - detailed = after_brief.split("DETAILED:", 1)[1].strip() - else: - brief = after_brief.strip() - elif "DETAILED:" in text: - detailed = text.split("DETAILED:", 1)[1].strip() +def _child_by_type(node, *types): + for c in node.children: + if c.type in types: + return c + return None + + +def _text(node): + return node.text.decode("utf-8", errors="replace") if node else "" + + +def _py_func_sig(node): + name = _text(_child_by_type(node, "identifier")) + params = _text(_child_by_type(node, "parameters")) + ret = _child_by_type(node, "type") + sig = f"{name}{params}" + if ret: + sig += f" -> {_text(ret)}" + return sig + + +def _py_class(node): + name = _text(_child_by_type(node, "identifier")) + methods = [] + body = _child_by_type(node, "block") + if body: + for child in body.children: + if child.type == "function_definition": + methods.append(_py_func_sig(child)) + return {"name": name, "methods": methods[:20]} + + +def _js_func_sig(node): + name = _text(_child_by_type(node, "identifier")) + params = _text(_child_by_type(node, "formal_parameters")) + return f"{name}{params}" if name else f"(anonymous){params}" + + +def _js_class(node): + name = _text(_child_by_type(node, "identifier")) + methods = [] + body = _child_by_type(node, "class_body") + if body: + for child in body.children: + if child.type == "method_definition": + mname = _text(_child_by_type(child, "property_identifier")) + mparams = _text(_child_by_type(child, "formal_parameters")) + methods.append(f"{mname}{mparams}") + return {"name": name, "methods": methods[:20]} + + +def _rust_func_sig(node): + name = _text(_child_by_type(node, "identifier")) + params = _text(_child_by_type(node, "parameters")) + ret = _child_by_type(node, "type_identifier", "generic_type", + "reference_type", "scoped_type_identifier") + sig = f"{name}{params}" + if ret: + sig += f" -> {_text(ret)}" + return sig + + +def _rust_struct(node): + name = _text(_child_by_type(node, "type_identifier")) + return {"name": name or _text(node)[:60], "methods": []} + + +def _go_func_sig(node): + name = _text(_child_by_type(node, "identifier")) + params = _text(_child_by_type(node, "parameter_list")) + return f"{name}{params}" + + +def _go_type(node): + spec = _child_by_type(node, "type_spec") + name = _text(_child_by_type(spec, "type_identifier")) if spec else "" + return {"name": name or _text(node)[:60], "methods": []} + + +def _tool_write_cache(args, _target, cache): + cache_type = args.get("cache_type", "") + path = args.get("path", "") + data = args.get("data", {}) + if cache_type not in ("file", "dir"): + return "Error: cache_type must be 'file' or 'dir'." + return cache.write_entry(cache_type, path, data) + + +def _tool_read_cache(args, _target, cache): + cache_type = args.get("cache_type", "") + path = args.get("path", "") + if cache_type not in ("file", "dir"): + return "Error: cache_type must be 'file' or 'dir'." + entry = cache.read_entry(cache_type, path) + if entry is None: + return "null" + return json.dumps(entry, indent=2) + + +def _tool_list_cache(args, _target, cache): + cache_type = args.get("cache_type", "") + if cache_type not in ("file", "dir"): + return "Error: cache_type must be 'file' or 'dir'." + paths = cache.list_entries(cache_type) + if not paths: + return "(no cached entries)" + return "\n".join(paths) + + +_TOOL_DISPATCH = { + "read_file": _tool_read_file, + "list_directory": _tool_list_directory, + "run_command": _tool_run_command, + "parse_structure": _tool_parse_structure, + "write_cache": _tool_write_cache, + "read_cache": _tool_read_cache, + "list_cache": _tool_list_cache, +} + + +def _execute_tool(name, args, target, cache, dir_rel, turn, verbose=False): + """Execute a tool by name and return the result string.""" + handler = _TOOL_DISPATCH.get(name) + if handler is None: + return f"Error: unknown tool '{name}'." + result = handler(args, target, cache) + + cache.log_turn(dir_rel, turn, name, + {k: v for k, v in args.items() if k != "data"}, + len(result)) + + if verbose: + preview = result[:200] + "..." if len(result) > 200 else result + print(f" [AI] <- {len(result)} chars: {preview}", file=sys.stderr) + + return result + + +# --------------------------------------------------------------------------- +# Streaming API caller +# --------------------------------------------------------------------------- + +def _call_api_streaming(client, system, messages, tools, tracker): + """Call Claude via streaming. Print tool decisions in real-time. + + Returns (content_blocks, usage) where content_blocks is the list of + content blocks from the response. + """ + with client.messages.stream( + model=MODEL, + max_tokens=4096, + system=system, + messages=messages, + tools=tools, + ) as stream: + # Print tool call names as they arrive + current_tool = None + for event in stream: + if event.type == "content_block_start": + block = event.content_block + if block.type == "tool_use": + current_tool = block.name + # We'll print the full args after the block is complete + elif event.type == "content_block_stop": + current_tool = None + + response = stream.get_final_message() + + tracker.record(response.usage) + return response.content, response.usage + + +# --------------------------------------------------------------------------- +# Directory discovery +# --------------------------------------------------------------------------- + +def _discover_directories(target, show_hidden=False): + """Walk the target and return all directories sorted leaves-first.""" + dirs = [] + target_real = os.path.realpath(target) + for root, subdirs, _files in os.walk(target_real, topdown=True): + subdirs[:] = [ + d for d in subdirs + if not _should_skip_dir(d) + and (show_hidden or not d.startswith(".")) + ] + dirs.append(root) + dirs.sort(key=lambda d: (-d.count(os.sep), d)) + return dirs + + +# --------------------------------------------------------------------------- +# Per-directory agent loop +# --------------------------------------------------------------------------- + +_DIR_SYSTEM_PROMPT = """\ +You are an expert analyst investigating a SINGLE directory on a file system. +Do NOT assume the type of content before investigating. Discover what this +directory contains from what you find. + +## Your Task +Investigate the directory: {dir_path} +(relative to target: {dir_rel}) + +You must: +1. Read the important files in THIS directory (not subdirectories) +2. For each file you read, call write_cache to save a summary +3. Call write_cache for the directory itself with a synthesis +4. Call submit_report with a 1-3 sentence summary + +## Tools +parse_structure gives you the skeleton of a file. It does NOT replace \ +reading the file. Use parse_structure first to understand structure, then \ +use read_file if you need to verify intent, check for anomalies, or \ +understand content that structure cannot capture (comments, documentation, \ +data files, config values). A file where structure and content appear to \ +contradict each other is always worth reading in full. + +## Efficiency Rules +- Batch multiple tool calls in a single turn whenever possible +- Skip binary/compiled/generated files (.pyc, .class, .o, .min.js, etc.) +- Skip files >100KB unless uniquely important +- Prioritize: README, index, main, config, schema, manifest files +- For source files: try parse_structure first, then read_file if needed +- If read_file returns truncated content, use a larger max_bytes or + run_command('tail ...') — NEVER retry the identical call +- You have only {max_turns} turns — be efficient + +## Cache Schemas +File: {{path, relative_path, size_bytes, category, summary, notable, + notable_reason, cached_at}} +Dir: {{path, relative_path, child_count, summary, dominant_category, + notable_files, cached_at}} + +category values: source, config, data, document, media, archive, unknown + +## Context +{context} + +## Child Directory Summaries (already investigated) +{child_summaries}""" + + +def _build_dir_context(dir_path): + lines = [] + try: + entries = sorted(os.listdir(dir_path)) + for name in entries: + if name.startswith("."): + continue + full = os.path.join(dir_path, name) + try: + st = os.stat(full) + if os.path.isdir(full): + lines.append(f" {name}/ (dir)") + else: + mime = magic.from_file(full, mime=True) + lines.append(f" {name} ({st.st_size} bytes) [{mime}]") + except OSError: + lines.append(f" {name} (stat failed)") + except OSError: + lines.append(" (could not list directory)") + return "Directory contents:\n" + "\n".join(lines) if lines else "(empty)" + + +def _get_child_summaries(dir_path, cache): + parts = [] + try: + for name in sorted(os.listdir(dir_path)): + child = os.path.join(dir_path, name) + if not os.path.isdir(child): + continue + entry = cache.read_entry("dir", child) + if entry: + rel = entry.get("relative_path", name) + summary = entry.get("summary", "(no summary)") + parts.append(f"- {rel}/: {summary}") + except OSError: + pass + return "\n".join(parts) if parts else "(none — this is a leaf directory)" + + +def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=10, + verbose=False): + """Run an isolated agent loop for a single directory.""" + dir_rel = os.path.relpath(dir_path, target) + if dir_rel == ".": + dir_rel = os.path.basename(target) + + context = _build_dir_context(dir_path) + child_summaries = _get_child_summaries(dir_path, cache) + + system = _DIR_SYSTEM_PROMPT.format( + dir_path=dir_path, + dir_rel=dir_rel, + max_turns=max_turns, + context=context, + child_summaries=child_summaries, + ) + + messages = [ + { + "role": "user", + "content": ( + "Investigate this directory now. Use parse_structure for " + "source files, read_file for others, cache summaries, and " + "call submit_report. Batch tool calls for efficiency." + ), + }, + ] + + tracker.reset_loop() + summary = None + + for turn in range(max_turns): + # Check context budget + if tracker.budget_exceeded(): + print(f" [AI] Context budget reached — exiting early " + f"({tracker.loop_total:,} tokens used)", file=sys.stderr) + break + + try: + content_blocks, usage = _call_api_streaming( + client, system, messages, _DIR_TOOLS, tracker, + ) + except anthropic.APIError as e: + print(f" [AI] API error: {e}", file=sys.stderr) + break + + # Print tool decisions now that we have the full response + tool_uses = [b for b in content_blocks if b.type == "tool_use"] + for tu in tool_uses: + arg_summary = ", ".join( + f"{k}={v!r}" for k, v in tu.input.items() if k != "data" + ) if tu.input else "" + print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr) + + messages.append({ + "role": "assistant", + "content": [_block_to_dict(b) for b in content_blocks], + }) + + if not tool_uses: + messages.append({ + "role": "user", + "content": "Please call submit_report with your summary.", + }) + continue + + tool_results = [] + done = False + for tu in tool_uses: + if tu.name == "submit_report": + summary = tu.input.get("summary", "") + tool_results.append({ + "type": "tool_result", + "tool_use_id": tu.id, + "content": "Summary submitted.", + }) + done = True + else: + result_text = _execute_tool( + tu.name, tu.input, target, cache, dir_rel, + turn + 1, verbose=verbose, + ) + tool_results.append({ + "type": "tool_result", + "tool_use_id": tu.id, + "content": result_text, + }) + + messages.append({"role": "user", "content": tool_results}) + + if done: + break else: - # Fallback: use the whole thing as brief - brief = text.strip() + print(f" [AI] Warning: max turns reached for {dir_rel}", + file=sys.stderr) + + return summary + + +def _block_to_dict(block): + """Convert an SDK content block to a plain dict for message history.""" + if block.type == "text": + return {"type": "text", "text": block.text} + elif block.type == "tool_use": + return {"type": "tool_use", "id": block.id, + "name": block.name, "input": block.input} + return {"type": block.type} + + +# --------------------------------------------------------------------------- +# Synthesis pass +# --------------------------------------------------------------------------- + +_SYNTHESIS_SYSTEM_PROMPT = """\ +You are an expert analyst synthesizing a final report about a directory tree. +ALL directory summaries are provided below — you do NOT need to call +list_cache or read_cache. Just read the summaries and call submit_report +immediately in your first turn. + +Do NOT assume the type of content. Let the summaries speak for themselves. + +## Your Goal +Produce two outputs via the submit_report tool: +1. **brief**: A 2-4 sentence summary of what this directory tree is. +2. **detailed**: A thorough breakdown covering purpose, structure, key + components, technologies, notable patterns, and any concerns. + +## Rules +- ALL summaries are below — call submit_report directly +- Be specific — reference actual directory and file names +- Do NOT call list_cache or read_cache + +## Target +{target} + +## Directory Summaries +{summaries_text}""" + + +def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False): + """Run the final synthesis pass. Returns (brief, detailed).""" + dir_entries = cache.read_all_entries("dir") + + summary_lines = [] + for entry in dir_entries: + rel = entry.get("relative_path", "?") + summary = entry.get("summary", "(no summary)") + dominant = entry.get("dominant_category", "?") + notable = entry.get("notable_files", []) + summary_lines.append(f"### {rel}/") + summary_lines.append(f"Category: {dominant}") + summary_lines.append(f"Summary: {summary}") + if notable: + summary_lines.append(f"Notable files: {', '.join(notable)}") + summary_lines.append("") + + summaries_text = "\n".join(summary_lines) if summary_lines else "(none)" + + system = _SYNTHESIS_SYSTEM_PROMPT.format( + target=target, + summaries_text=summaries_text, + ) + + messages = [ + { + "role": "user", + "content": ( + "All directory summaries are in the system prompt above. " + "Synthesize them into a cohesive report and call " + "submit_report immediately — no other tool calls needed." + ), + }, + ] + + brief, detailed = "", "" + + for turn in range(max_turns): + try: + content_blocks, usage = _call_api_streaming( + client, system, messages, _SYNTHESIS_TOOLS, tracker, + ) + except anthropic.APIError as e: + print(f" [AI] API error: {e}", file=sys.stderr) + break + + tool_uses = [b for b in content_blocks if b.type == "tool_use"] + for tu in tool_uses: + arg_summary = ", ".join( + f"{k}={v!r}" for k, v in tu.input.items() if k != "data" + ) if tu.input else "" + print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr) + + messages.append({ + "role": "assistant", + "content": [_block_to_dict(b) for b in content_blocks], + }) + + if not tool_uses: + messages.append({ + "role": "user", + "content": "Please call submit_report with your analysis.", + }) + continue + + tool_results = [] + done = False + for tu in tool_uses: + if tu.name == "submit_report": + brief = tu.input.get("brief", "") + detailed = tu.input.get("detailed", "") + tool_results.append({ + "type": "tool_result", + "tool_use_id": tu.id, + "content": "Report submitted. Thank you.", + }) + done = True + else: + result_text = _execute_tool( + tu.name, tu.input, target, cache, "(synthesis)", + turn + 1, verbose=verbose, + ) + tool_results.append({ + "type": "tool_result", + "tool_use_id": tu.id, + "content": result_text, + }) + + messages.append({"role": "user", "content": tool_results}) + + if done: + break + else: + print(" [AI] Warning: synthesis ran out of turns.", file=sys.stderr) + brief, detailed = _synthesize_from_cache(cache) return brief, detailed -def analyze_directory(report, target): +def _synthesize_from_cache(cache): + """Build a best-effort report from cached directory summaries.""" + dir_entries = cache.read_all_entries("dir") + if not dir_entries: + return ("(AI analysis incomplete — no data was cached)", "") + + brief_parts = [] + detail_parts = [] + for entry in dir_entries: + rel = entry.get("relative_path", "?") + summary = entry.get("summary", "") + if summary: + detail_parts.append(f"**{rel}/**: {summary}") + brief_parts.append(summary) + + brief = brief_parts[0] if brief_parts else "(AI analysis incomplete)" + detailed = "\n\n".join(detail_parts) if detail_parts else "" + return brief, detailed + + +# --------------------------------------------------------------------------- +# Main orchestrator +# --------------------------------------------------------------------------- + +def _run_investigation(client, target, report, show_hidden=False, + fresh=False, verbose=False): + """Orchestrate the multi-pass investigation. Returns (brief, detailed).""" + investigation_id, is_new = _get_investigation_id(target, fresh=fresh) + cache = _CacheManager(investigation_id, target) + tracker = _TokenTracker() + + if is_new: + cache.write_meta(MODEL, _now_iso()) + + print(f" [AI] Investigation ID: {investigation_id}" + f"{'' if is_new else ' (resumed)'}", file=sys.stderr) + print(f" [AI] Cache: {cache.root}/", file=sys.stderr) + + all_dirs = _discover_directories(target, show_hidden=show_hidden) + + to_investigate = [] + cached_count = 0 + for d in all_dirs: + if cache.has_entry("dir", d): + cached_count += 1 + rel = os.path.relpath(d, target) + print(f" [AI] Skipping (cached): {rel}/", file=sys.stderr) + else: + to_investigate.append(d) + + total = len(to_investigate) + if cached_count: + print(f" [AI] Directories cached: {cached_count}", file=sys.stderr) + print(f" [AI] Directories to investigate: {total}", file=sys.stderr) + + for i, dir_path in enumerate(to_investigate, 1): + dir_rel = os.path.relpath(dir_path, target) + if dir_rel == ".": + dir_rel = os.path.basename(target) + print(f" [AI] Investigating: {dir_rel}/ ({i}/{total})", + file=sys.stderr) + + summary = _run_dir_loop( + client, target, cache, tracker, dir_path, verbose=verbose, + ) + + if summary and not cache.has_entry("dir", dir_path): + cache.write_entry("dir", dir_path, { + "path": dir_path, + "relative_path": os.path.relpath(dir_path, target), + "child_count": len([ + n for n in os.listdir(dir_path) + if not n.startswith(".") + ]) if os.path.isdir(dir_path) else 0, + "summary": summary, + "dominant_category": "unknown", + "notable_files": [], + "cached_at": _now_iso(), + }) + + cache.update_meta( + directories_investigated=total + cached_count, + end_time=_now_iso(), + ) + + print(" [AI] Synthesis pass...", file=sys.stderr) + brief, detailed = _run_synthesis( + client, target, cache, tracker, verbose=verbose, + ) + + print(f" [AI] Total tokens used: {tracker.summary()}", file=sys.stderr) + + return brief, detailed + + +# --------------------------------------------------------------------------- +# Cache cleanup +# --------------------------------------------------------------------------- + +def clear_cache(): + """Remove all investigation caches under /tmp/luminos/.""" + import shutil + if os.path.isdir(CACHE_ROOT): + shutil.rmtree(CACHE_ROOT) + print(f"Cleared cache: {CACHE_ROOT}", file=sys.stderr) + else: + print(f"No cache to clear ({CACHE_ROOT} does not exist).", + file=sys.stderr) + + +# --------------------------------------------------------------------------- +# Public interface +# --------------------------------------------------------------------------- + +def analyze_directory(report, target, verbose_tools=False, fresh=False): """Run AI analysis on the directory. Returns (brief, detailed) strings. - Returns ("", "") if the API key is missing or the request fails. + Returns ("", "") if the API key is missing or dependencies are not met. """ + if not check_ai_dependencies(): + sys.exit(1) + api_key = _get_api_key() if not api_key: return "", "" - print(" [AI] Analyzing directory with Claude...", file=sys.stderr) - context = _build_context(report, target) - raw = _call_claude(api_key, context) - if not raw: + print(" [AI] Starting multi-pass investigation...", file=sys.stderr) + + client = anthropic.Anthropic(api_key=api_key) + + try: + brief, detailed = _run_investigation( + client, target, report, fresh=fresh, verbose=verbose_tools, + ) + except Exception as e: + print(f"Warning: AI analysis failed: {e}", file=sys.stderr) return "", "" - return _parse_response(raw) + if not brief and not detailed: + print(" [AI] Warning: agent produced no output.", file=sys.stderr) + + print(" [AI] Investigation complete.", file=sys.stderr) + return brief, detailed