From f324648c106e7bd6ad5485b8a639716f1976fe97 Mon Sep 17 00:00:00 2001 From: Jeff Smith Date: Mon, 30 Mar 2026 13:02:19 -0600 Subject: [PATCH] feat: add chain-of-thought observability tools Adds think, checkpoint, and flag tools for agent reasoning visibility: - think: records observation/hypothesis/next_action before investigation - checkpoint: summarizes learned/unknown/next_phase after file clusters - flag: marks notable findings to flags.jsonl with severity levels Additional changes: - Step numbering in investigation system prompt - Text blocks from agent now printed to stderr (step labels visible) - flag tool available in both investigation and synthesis passes - analyze_directory() returns (brief, detailed, flags) three-tuple - format_flags() in report.py renders flags sorted by severity - Per-directory max_turns increased from 10 to 14 Co-Authored-By: Claude Opus 4.6 (1M context) --- luminos.py | 6 +- luminos_lib/ai.py | 209 ++++++++++++++++++++++++++++++++++++++++-- luminos_lib/report.py | 35 ++++++- 3 files changed, 238 insertions(+), 12 deletions(-) diff --git a/luminos.py b/luminos.py index c8ca1e9..1ed239c 100644 --- a/luminos.py +++ b/luminos.py @@ -99,16 +99,18 @@ def main(): report = scan(target, depth=args.depth, show_hidden=args.all) + flags = [] if args.ai: from luminos_lib.ai import analyze_directory - brief, detailed = analyze_directory(report, target, fresh=args.fresh) + brief, detailed, flags = analyze_directory(report, target, fresh=args.fresh) report["ai_brief"] = brief report["ai_detailed"] = detailed + report["flags"] = flags if args.json_output: output = json.dumps(report, indent=2, default=str) else: - output = format_report(report, target) + output = format_report(report, target, flags=flags) if args.output: try: diff --git a/luminos_lib/ai.py b/luminos_lib/ai.py index 873b195..61f6944 100644 --- a/luminos_lib/ai.py +++ b/luminos_lib/ai.py @@ -435,6 +435,85 @@ _DIR_TOOLS = [ "required": ["cache_type", "path", "data"], }, }, + { + "name": "think", + "description": ( + "Record your reasoning before choosing which file or directory " + "to investigate next. Call this when deciding what to look at " + "— not before every individual tool call." + ), + "input_schema": { + "type": "object", + "properties": { + "observation": { + "type": "string", + "description": "What you have observed so far.", + }, + "hypothesis": { + "type": "string", + "description": "Your hypothesis about the directory.", + }, + "next_action": { + "type": "string", + "description": "What you plan to investigate next and why.", + }, + }, + "required": ["observation", "hypothesis", "next_action"], + }, + }, + { + "name": "checkpoint", + "description": ( + "Summarize what you have learned so far about this directory " + "and what you still need to determine. Call this after completing " + "a significant cluster of files — not after every file." + ), + "input_schema": { + "type": "object", + "properties": { + "learned": { + "type": "string", + "description": "What you have learned so far.", + }, + "still_unknown": { + "type": "string", + "description": "What you still need to determine.", + }, + "next_phase": { + "type": "string", + "description": "What you will investigate next.", + }, + }, + "required": ["learned", "still_unknown", "next_phase"], + }, + }, + { + "name": "flag", + "description": ( + "Mark a file, directory, or finding as notable or anomalous. " + "Call this immediately when you discover something surprising, " + "concerning, or important — do not save it for the report." + ), + "input_schema": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Relative path, or 'general'.", + }, + "finding": { + "type": "string", + "description": "What you found.", + }, + "severity": { + "type": "string", + "enum": ["info", "concern", "critical"], + "description": "info | concern | critical", + }, + }, + "required": ["path", "finding", "severity"], + }, + }, { "name": "submit_report", "description": ( @@ -486,6 +565,33 @@ _SYNTHESIS_TOOLS = [ "required": ["cache_type"], }, }, + { + "name": "flag", + "description": ( + "Mark a file, directory, or finding as notable or anomalous. " + "Call this immediately when you discover something surprising, " + "concerning, or important — do not save it for the report." + ), + "input_schema": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Relative path, or 'general'.", + }, + "finding": { + "type": "string", + "description": "What you found.", + }, + "severity": { + "type": "string", + "enum": ["info", "concern", "critical"], + "description": "info | concern | critical", + }, + }, + "required": ["path", "finding", "severity"], + }, + }, { "name": "submit_report", "description": "Submit the final analysis report.", @@ -806,6 +912,44 @@ def _tool_list_cache(args, _target, cache): return "\n".join(paths) +def _tool_think(args, _target, _cache): + obs = args.get("observation", "") + hyp = args.get("hypothesis", "") + nxt = args.get("next_action", "") + print(f" [AI] THINK", file=sys.stderr) + print(f" observation: {obs}", file=sys.stderr) + print(f" hypothesis: {hyp}", file=sys.stderr) + print(f" next_action: {nxt}", file=sys.stderr) + return "ok" + + +def _tool_checkpoint(args, _target, _cache): + learned = args.get("learned", "") + unknown = args.get("still_unknown", "") + phase = args.get("next_phase", "") + print(f" [AI] CHECKPOINT", file=sys.stderr) + print(f" learned: {learned}", file=sys.stderr) + print(f" still_unknown: {unknown}", file=sys.stderr) + print(f" next_phase: {phase}", file=sys.stderr) + return "ok" + + +def _tool_flag(args, _target, cache): + path = args.get("path", "general") + finding = args.get("finding", "") + severity = args.get("severity", "info") + print(f" [AI] FLAG [{severity.upper()}] {path}", file=sys.stderr) + print(f" {finding}", file=sys.stderr) + flags_path = os.path.join(cache.root, "flags.jsonl") + entry = {"path": path, "finding": finding, "severity": severity} + try: + with open(flags_path, "a") as f: + f.write(json.dumps(entry) + "\n") + except OSError: + pass + return "ok" + + _TOOL_DISPATCH = { "read_file": _tool_read_file, "list_directory": _tool_list_directory, @@ -814,6 +958,9 @@ _TOOL_DISPATCH = { "write_cache": _tool_write_cache, "read_cache": _tool_read_cache, "list_cache": _tool_list_cache, + "think": _tool_think, + "checkpoint": _tool_checkpoint, + "flag": _tool_flag, } @@ -915,6 +1062,26 @@ understand content that structure cannot capture (comments, documentation, \ data files, config values). A file where structure and content appear to \ contradict each other is always worth reading in full. +Use the think tool when choosing which file or directory to investigate \ +next — before starting a new file or switching investigation direction. \ +Do NOT call think before every individual tool call in a sequence. + +Use the checkpoint tool after completing investigation of a meaningful \ +cluster of files. Not after every file — once or twice per directory \ +loop at most. + +Use the flag tool immediately when you find something notable, \ +surprising, or concerning. Severity guide: + info = interesting but not problematic + concern = worth addressing + critical = likely broken or dangerous + +## Step Numbering +Number your investigation steps as you go. Before starting each new \ +file cluster or phase transition, output: +Step N: +Output this as plain text before tool calls, not as a tool call itself. + ## Efficiency Rules - Batch multiple tool calls in a single turn whenever possible - Skip binary/compiled/generated files (.pyc, .class, .o, .min.js, etc.) @@ -979,7 +1146,7 @@ def _get_child_summaries(dir_path, cache): return "\n".join(parts) if parts else "(none — this is a leaf directory)" -def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=10, +def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14, verbose=False): """Run an isolated agent loop for a single directory.""" dir_rel = os.path.relpath(dir_path, target) @@ -1076,6 +1243,12 @@ def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=10, print(f" [AI] API error: {e}", file=sys.stderr) break + # Print text blocks (step numbering, reasoning) to stderr + for b in content_blocks: + if b.type == "text" and b.text.strip(): + for line in b.text.strip().split("\n"): + print(f" [AI] {line}", file=sys.stderr) + # Print tool decisions now that we have the full response tool_uses = [b for b in content_blocks if b.type == "tool_use"] for tu in tool_uses: @@ -1215,6 +1388,12 @@ def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False): print(f" [AI] API error: {e}", file=sys.stderr) break + # Print text blocks to stderr + for b in content_blocks: + if b.type == "text" and b.text.strip(): + for line in b.text.strip().split("\n"): + print(f" [AI] {line}", file=sys.stderr) + tool_uses = [b for b in content_blocks if b.type == "tool_use"] for tu in tool_uses: arg_summary = ", ".join( @@ -1294,7 +1473,7 @@ def _synthesize_from_cache(cache): def _run_investigation(client, target, report, show_hidden=False, fresh=False, verbose=False): - """Orchestrate the multi-pass investigation. Returns (brief, detailed).""" + """Orchestrate the multi-pass investigation. Returns (brief, detailed, flags).""" investigation_id, is_new = _get_investigation_id(target, fresh=fresh) cache = _CacheManager(investigation_id, target) tracker = _TokenTracker() @@ -1358,9 +1537,21 @@ def _run_investigation(client, target, report, show_hidden=False, client, target, cache, tracker, verbose=verbose, ) + # Read flags from flags.jsonl + flags = [] + flags_path = os.path.join(cache.root, "flags.jsonl") + try: + with open(flags_path) as f: + for line in f: + line = line.strip() + if line: + flags.append(json.loads(line)) + except (OSError, json.JSONDecodeError): + pass + print(f" [AI] Total tokens used: {tracker.summary()}", file=sys.stderr) - return brief, detailed + return brief, detailed, flags # --------------------------------------------------------------------------- @@ -1383,31 +1574,31 @@ def clear_cache(): # --------------------------------------------------------------------------- def analyze_directory(report, target, verbose_tools=False, fresh=False): - """Run AI analysis on the directory. Returns (brief, detailed) strings. + """Run AI analysis on the directory. Returns (brief, detailed, flags). - Returns ("", "") if the API key is missing or dependencies are not met. + Returns ("", "", []) if the API key is missing or dependencies are not met. """ if not check_ai_dependencies(): sys.exit(1) api_key = _get_api_key() if not api_key: - return "", "" + return "", "", [] print(" [AI] Starting multi-pass investigation...", file=sys.stderr) client = anthropic.Anthropic(api_key=api_key) try: - brief, detailed = _run_investigation( + brief, detailed, flags = _run_investigation( client, target, report, fresh=fresh, verbose=verbose_tools, ) except Exception as e: print(f"Warning: AI analysis failed: {e}", file=sys.stderr) - return "", "" + return "", "", [] if not brief and not detailed: print(" [AI] Warning: agent produced no output.", file=sys.stderr) print(" [AI] Investigation complete.", file=sys.stderr) - return brief, detailed + return brief, detailed, flags diff --git a/luminos_lib/report.py b/luminos_lib/report.py index bcd98df..185ca96 100644 --- a/luminos_lib/report.py +++ b/luminos_lib/report.py @@ -1,7 +1,35 @@ """Report formatting — human-readable terminal output.""" -def format_report(report, target): +_SEVERITY_ORDER = {"critical": 0, "concern": 1, "info": 2} + + +def format_flags(flags): + """Format a list of flag dicts as a human-readable string. + + Returns empty string if flags is empty. + """ + if not flags: + return "" + + sorted_flags = sorted(flags, key=lambda f: _SEVERITY_ORDER.get( + f.get("severity", "info"), 99)) + + lines = [] + lines.append("") + lines.append(">> FLAGS") + lines.append("-" * 40) + for f in sorted_flags: + severity = f.get("severity", "info").upper() + path = f.get("path", "general") + finding = f.get("finding", "") + lines.append(f" [{severity:<8s}] {path}") + lines.append(f" {finding}") + + return "\n".join(lines) + + +def format_report(report, target, flags=None): """Format the full report as a human-readable string.""" sep = "=" * 60 lines = [] @@ -96,6 +124,11 @@ def format_report(report, target): for paragraph in ai_detailed.split("\n"): lines.append(f" {paragraph}") + # Flags + flags_text = format_flags(flags or []) + if flags_text: + lines.append(flags_text) + lines.append("") lines.append(sep) lines.append(" End of report.")