From 2adbed9d28c1dd5de45c8a97ceecb27131484db3 Mon Sep 17 00:00:00 2001 From: Jeff Smith Date: Sun, 12 Apr 2026 20:21:49 -0600 Subject: [PATCH] feat(ai): implement Phase 3 investigation planning (#8, #9, #10, #11, #74) Add a planning pass that runs after survey and before dir loops. The planner classifies directories into priority/shallow/skip tiers and allocates turns accordingly, replacing the fixed max_turns=14 per directory with dynamic allocation from a global budget. Planning pass: - _PLANNING_SYSTEM_PROMPT in prompts.py with submit_plan tool - _run_planning() follows the same single-turn pattern as _run_survey() - submit_plan tool registered in new "planning" scope - _apply_plan() pure function: band-sorted ordering (leaf-first within bands), turn map, skip-dir removal - _default_plan() fallback when planning is skipped or fails - Plan cached as plan.json for resumed runs Dynamic turn allocation: - Priority dirs: 15-20 turns (capped at 25) - Shallow dirs: 5 turns - Default: 10 turns - Skip dirs: excluded entirely - Orchestrator passes per-dir max_turns to _run_dir_loop() Quality instrumentation: - _TokenTracker._loop_turns counts API calls per dir loop - completeness field (0.0-1.0) added to dir-scope submit_report - plan_evaluation.json emitted after dir loops comparing plan predictions to actual turn utilization, completeness, and confidence - Turn utilization logged per directory during investigation Also fixes _get_child_summaries() to distinguish actual leaf directories from parents whose children have not been investigated yet, replacing the misleading "this is a leaf directory" placeholder. 26 new tests (260 total, all passing). Co-Authored-By: Claude Opus 4.6 (1M context) --- luminos_lib/ai.py | 539 +++++++++++++++++++++++++++++++++++++++-- luminos_lib/prompts.py | 81 +++++++ tests/test_ai_pure.py | 345 ++++++++++++++++++++++++++ 3 files changed, 944 insertions(+), 21 deletions(-) diff --git a/luminos_lib/ai.py b/luminos_lib/ai.py index e8a1ed3..845b960 100644 --- a/luminos_lib/ai.py +++ b/luminos_lib/ai.py @@ -24,6 +24,7 @@ from luminos_lib.ast_parser import parse_structure from luminos_lib.cache import _CacheManager, _get_investigation_id from luminos_lib.prompts import ( _DIR_SYSTEM_PROMPT, + _PLANNING_SYSTEM_PROMPT, _SURVEY_SYSTEM_PROMPT, _SYNTHESIS_SYSTEM_PROMPT, ) @@ -111,6 +112,7 @@ class _TokenTracker: self.loop_input = 0 self.loop_output = 0 self.last_input = 0 + self._loop_turns = 0 def record(self, usage): """Record usage from a single API call.""" @@ -121,12 +123,14 @@ class _TokenTracker: self.loop_input += inp self.loop_output += out self.last_input = inp + self._loop_turns += 1 def reset_loop(self): """Reset per-loop counters (called between directory loops).""" self.loop_input = 0 self.loop_output = 0 self.last_input = 0 + self._loop_turns = 0 @property def loop_total(self): @@ -163,12 +167,14 @@ class _TokenTracker: _DIR_TOOLS = [] _SYNTHESIS_TOOLS = [] _SURVEY_TOOLS = [] +_PLANNING_TOOLS = [] _TOOL_DISPATCH = {} _TOOL_REGISTRIES = { "dir": _DIR_TOOLS, "synthesis": _SYNTHESIS_TOOLS, "survey": _SURVEY_TOOLS, + "planning": _PLANNING_TOOLS, } @@ -595,8 +601,17 @@ register_tool( "type": "string", "description": "1-3 sentence summary of the directory.", }, + "completeness": { + "type": "number", + "description": ( + "Self-rated investigation completeness (0.0-1.0). " + "1.0 = examined every relevant file thoroughly. " + "0.5 = examined about half, or skimmed most. " + "< 0.3 = barely scratched the surface." + ), + }, }, - "required": ["summary"], + "required": ["summary", "completeness"], }, scopes=["dir"], ) @@ -715,6 +730,92 @@ register_tool( scopes=["survey"], ) +# --- Planning tools --- + +register_tool( + name="submit_plan", + description=( + "Submit the investigation plan. Call exactly once." + ), + schema={ + "type": "object", + "properties": { + "priority_dirs": { + "type": "array", + "items": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Relative directory path.", + }, + "reason": { + "type": "string", + "description": "Why this dir deserves deep investigation.", + }, + "suggested_turns": { + "type": "integer", + "description": "Suggested turns (15-20).", + }, + }, + "required": ["path", "reason", "suggested_turns"], + }, + "description": "Directories to investigate deeply.", + }, + "shallow_dirs": { + "type": "array", + "items": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Relative directory path.", + }, + "reason": { + "type": "string", + "description": "Why a shallow pass is sufficient.", + }, + }, + "required": ["path", "reason"], + }, + "description": "Directories needing only a quick pass.", + }, + "skip_dirs": { + "type": "array", + "items": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Relative directory path.", + }, + "reason": { + "type": "string", + "description": "Why this dir should be skipped.", + }, + }, + "required": ["path", "reason"], + }, + "description": "Directories to skip entirely.", + }, + "investigation_order": { + "type": "string", + "enum": ["leaf-first", "priority-first"], + "description": "leaf-first or priority-first (leaf-first within bands).", + }, + "notes": { + "type": "string", + "description": "Cross-cutting notes for per-directory agents, or empty.", + }, + }, + "required": [ + "priority_dirs", "shallow_dirs", "skip_dirs", + "investigation_order", "notes", + ], + }, + scopes=["planning"], +) + def _execute_tool(name, args, target, cache, dir_rel, turn, verbose=False): """Execute a tool by name and return the result string.""" @@ -829,7 +930,23 @@ def _get_child_summaries(dir_path, cache): parts.append(f"- {rel}/: {summary}") except OSError: pass - return "\n".join(parts) if parts else "(none — this is a leaf directory)" + if parts: + return "\n".join(parts) + # Distinguish actual leaves from parents whose children haven't been + # investigated yet. The old placeholder claimed "leaf directory" even + # when children existed but were not yet cached, which silently + # degraded parent context. + try: + has_subdirs = any( + os.path.isdir(os.path.join(dir_path, name)) + for name in os.listdir(dir_path) + if not name.startswith(".") + ) + except OSError: + has_subdirs = False + if has_subdirs: + return "(child directories exist but have not been investigated yet)" + return "(none: this is a leaf directory)" _SURVEY_CONFIDENCE_THRESHOLD = 0.5 @@ -1040,14 +1157,19 @@ def _handle_turn_response(content_blocks, messages, target, cache, dir_rel, "role": "user", "content": "Please call submit_report with your summary.", }) - return False, None + return False, None, None tool_results = [] done = False summary = None + completeness = None for tu in tool_uses: if tu.name == "submit_report": summary = tu.input.get("summary", "") + try: + completeness = float(tu.input.get("completeness", 0) or 0) + except (TypeError, ValueError): + completeness = None tool_results.append({ "type": "tool_result", "tool_use_id": tu.id, @@ -1066,17 +1188,21 @@ def _handle_turn_response(content_blocks, messages, target, cache, dir_rel, }) messages.append({"role": "user", "content": tool_results}) - return done, summary + return done, summary, completeness def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14, verbose=False, survey=None): - """Run an isolated agent loop for a single directory.""" + """Run an isolated agent loop for a single directory. + + Returns (summary, completeness) where completeness is the agent's + self-rated investigation thoroughness (0.0-1.0), or None if not reported. + """ ctx = _build_dir_loop_context( dir_path, target, cache, survey, max_turns, ) - tracker.reset_loop() summary = None + completeness = None for turn in range(max_turns): if tracker.budget_exceeded(): @@ -1098,19 +1224,21 @@ def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14, print(f" [AI] API error: {e}", file=sys.stderr) break - done, turn_summary = _handle_turn_response( + done, turn_summary, turn_completeness = _handle_turn_response( content_blocks, ctx.messages, target, cache, ctx.dir_rel, turn, verbose, ) if turn_summary is not None: summary = turn_summary + if turn_completeness is not None: + completeness = turn_completeness if done: break else: print(f" [AI] Warning: max turns reached for {ctx.dir_rel}", file=sys.stderr) - return summary + return summary, completeness def _block_to_dict(block): @@ -1263,6 +1391,300 @@ def _run_survey(client, target, report, tracker, max_turns=3, verbose=False): return survey +# --------------------------------------------------------------------------- +# Planning pass +# --------------------------------------------------------------------------- + +# Turn allocation defaults. +_DEFAULT_TURNS = 10 +_SHALLOW_TURNS = 5 +_MAX_TURNS_CEILING = 25 +_BASE_TURNS_PER_DIR = 10 + + +def _default_plan(): + """Fallback plan when planning is skipped or fails. + + All directories get default turns, leaf-first order, no overrides. + """ + return { + "priority_dirs": [], + "shallow_dirs": [], + "skip_dirs": [], + "investigation_order": "leaf-first", + "notes": "", + } + + +def _run_planning(client, target, survey, report, all_dirs, tracker, + cached_dirs=None, max_turns=3, verbose=False): + """Run the planning pass. Returns a plan dict or None on failure. + + The planning pass decides where to invest investigation depth. + It runs after the survey and before the per-directory loops. + """ + cached_dirs = cached_dirs or [] + dir_count = len(all_dirs) + global_budget = _BASE_TURNS_PER_DIR * dir_count + + survey_context = _format_survey_block(survey) if survey else "(no survey available)" + + try: + tree_node = build_tree(target, max_depth=6) + tree_text = render_tree(tree_node) + except Exception: + tree_text = "(tree unavailable)" + + signals = report.get("survey_signals") or {} + file_signals = _format_survey_signals(signals) + + cached_rel = [] + for d in cached_dirs: + cached_rel.append(os.path.relpath(d, target)) + cached_text = ", ".join(cached_rel) if cached_rel else "(none)" + + system = _PLANNING_SYSTEM_PROMPT.format( + target=target, + survey_context=survey_context, + tree_text=tree_text, + file_signals=file_signals, + dir_count=dir_count, + cached_dirs=cached_text, + default_turns=_DEFAULT_TURNS, + global_budget=global_budget, + ) + + messages = [ + { + "role": "user", + "content": ( + "All inputs are in the system prompt above. Call " + "submit_plan now." + ), + }, + ] + + plan = None + + for turn in range(max_turns): + try: + content_blocks, _usage = _call_api_streaming( + client, system, messages, _PLANNING_TOOLS, tracker, + ) + except anthropic.APIError as e: + print(f" [AI] API error: {e}", file=sys.stderr) + return None + + for b in content_blocks: + if b.type == "text" and b.text.strip(): + for line in b.text.strip().split("\n"): + print(f" [AI] {line}", file=sys.stderr) + + tool_uses = [b for b in content_blocks if b.type == "tool_use"] + for tu in tool_uses: + arg_summary = ", ".join( + f"{k}={v!r}" for k, v in tu.input.items() + ) if tu.input else "" + print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr) + + messages.append({ + "role": "assistant", + "content": [_block_to_dict(b) for b in content_blocks], + }) + + if not tool_uses: + messages.append({ + "role": "user", + "content": "Please call submit_plan.", + }) + continue + + tool_results = [] + done = False + for tu in tool_uses: + if tu.name == "submit_plan": + plan = { + "priority_dirs": tu.input.get("priority_dirs", []) or [], + "shallow_dirs": tu.input.get("shallow_dirs", []) or [], + "skip_dirs": tu.input.get("skip_dirs", []) or [], + "investigation_order": tu.input.get( + "investigation_order", "leaf-first" + ), + "notes": tu.input.get("notes", ""), + } + tool_results.append({ + "type": "tool_result", + "tool_use_id": tu.id, + "content": "Plan received. Thank you.", + }) + done = True + else: + tool_results.append({ + "type": "tool_result", + "tool_use_id": tu.id, + "content": "Unknown tool. Call submit_plan.", + "is_error": True, + }) + + messages.append({"role": "user", "content": tool_results}) + + if done: + break + else: + print(" [AI] Warning: planning ran out of turns.", file=sys.stderr) + + return plan + + +def _apply_plan(all_dirs, to_investigate, plan, target): + """Apply the plan to produce an ordered dir list and turn map. + + Returns (ordered_dirs, turn_map) where: + - ordered_dirs: list of absolute dir paths in investigation order + - turn_map: dict of {abs_dir_path: max_turns} + + Pure function: no I/O, no cache, no API calls. + """ + if plan is None: + return list(to_investigate), {} + + # Build lookup from relative path to absolute path. + rel_to_abs = {} + for d in all_dirs: + rel = os.path.relpath(d, target) + rel_to_abs[rel] = d + + # Classify directories by tier. + skip_set = set() + priority_set = set() + shallow_set = set() + turn_map = {} + + for entry in plan.get("skip_dirs", []): + rel = entry.get("path", "") + if rel in rel_to_abs: + skip_set.add(rel_to_abs[rel]) + + for entry in plan.get("priority_dirs", []): + rel = entry.get("path", "") + suggested = entry.get("suggested_turns", 15) + capped = min(suggested, _MAX_TURNS_CEILING) + if rel in rel_to_abs: + abs_path = rel_to_abs[rel] + priority_set.add(abs_path) + turn_map[abs_path] = capped + + for entry in plan.get("shallow_dirs", []): + rel = entry.get("path", "") + if rel in rel_to_abs: + abs_path = rel_to_abs[rel] + shallow_set.add(abs_path) + turn_map[abs_path] = _SHALLOW_TURNS + + # Remove skipped dirs from the investigation list. + remaining = [d for d in to_investigate if d not in skip_set] + + # Order by bands. Both strategies preserve leaf-first within bands. + order = plan.get("investigation_order", "leaf-first") + + if order == "priority-first": + priority_band = [d for d in remaining if d in priority_set] + shallow_band = [d for d in remaining if d in shallow_set] + default_band = [ + d for d in remaining + if d not in priority_set and d not in shallow_set + ] + ordered = priority_band + default_band + shallow_band + else: + # leaf-first: keep the original order (already leaf-first from + # _discover_directories), just remove skipped dirs. + ordered = remaining + + return ordered, turn_map + + +def _write_plan_evaluation(cache, plan, turn_utilization): + """Write plan_evaluation.json comparing plan predictions to actual results. + + This is the planning pass's report card: did we allocate turns well? + """ + # Build a lookup of what the plan predicted per dir. + predicted = {} + for entry in (plan or {}).get("priority_dirs", []): + predicted[entry["path"]] = { + "tier": "priority", + "suggested_turns": entry.get("suggested_turns", 15), + } + for entry in (plan or {}).get("shallow_dirs", []): + predicted[entry["path"]] = { + "tier": "shallow", + "suggested_turns": _SHALLOW_TURNS, + } + for entry in (plan or {}).get("skip_dirs", []): + predicted[entry["path"]] = { + "tier": "skip", + "suggested_turns": 0, + } + + # Compare predictions to actual turn utilization. + per_dir = [] + total_allocated = 0 + total_used = 0 + for record in turn_utilization: + dir_rel = record["dir"] + allocated = record["turns_allocated"] + used = record["turns_used"] + total_allocated += allocated + total_used += used + + pred = predicted.get(dir_rel, {}) + entry = { + "dir": dir_rel, + "planned_tier": pred.get("tier", "default"), + "turns_allocated": allocated, + "turns_used": used, + "utilization": round(used / allocated, 2) if allocated else 0, + } + + # Include completeness from turn utilization record (#74). + record_completeness = record.get("completeness") + if record_completeness is not None: + entry["completeness"] = record_completeness + + # Read confidence from the cached dir entry if available. + dir_entry = cache.read_entry("dir", os.path.join( + cache.target, dir_rel, + )) + if dir_entry: + entry["confidence"] = dir_entry.get("confidence") + + per_dir.append(entry) + + evaluation = { + "plan_order": (plan or {}).get("investigation_order", "leaf-first"), + "total_dirs_investigated": len(turn_utilization), + "total_turns_allocated": total_allocated, + "total_turns_used": total_used, + "overall_utilization": ( + round(total_used / total_allocated, 2) if total_allocated else 0 + ), + "per_directory": per_dir, + "evaluated_at": _now_iso(), + } + + try: + eval_path = os.path.join(cache.root, "plan_evaluation.json") + with open(eval_path, "w") as f: + json.dump(evaluation, f, indent=2) + print( + f" [AI] Plan evaluation: {total_used}/{total_allocated} turns used " + f"({evaluation['overall_utilization']:.0%} utilization)", + file=sys.stderr, + ) + except OSError: + pass + + def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False): """Run the final synthesis pass. Returns (brief, detailed).""" dir_entries = cache.read_all_entries("dir") @@ -1443,34 +1865,103 @@ def _run_investigation(client, target, report, show_hidden=False, print(" [AI] Survey unavailable — proceeding without it.", file=sys.stderr) to_investigate = [] - cached_count = 0 + cached_dirs = [] for d in all_dirs: if cache.has_entry("dir", d): - cached_count += 1 + cached_dirs.append(d) rel = os.path.relpath(d, target) print(f" [AI] Skipping (cached): {rel}/", file=sys.stderr) else: to_investigate.append(d) - total = len(to_investigate) + cached_count = len(cached_dirs) if cached_count: print(f" [AI] Directories cached: {cached_count}", file=sys.stderr) - print(f" [AI] Directories to investigate: {total}", file=sys.stderr) + print(f" [AI] Directories to investigate: {len(to_investigate)}", + file=sys.stderr) - for i, dir_path in enumerate(to_investigate, 1): + # Planning pass: decide where to invest depth. + if total_files < _SURVEY_MIN_FILES and total_dirs < _SURVEY_MIN_DIRS: + print(" [AI] Planning skipped (small target).", file=sys.stderr) + plan = _default_plan() + else: + plan_path = os.path.join(cache.root, "plan.json") + if not fresh and os.path.exists(plan_path): + try: + with open(plan_path) as f: + plan = json.load(f) + print(" [AI] Plan loaded from cache.", file=sys.stderr) + except (OSError, json.JSONDecodeError): + plan = None + else: + plan = None + + if plan is None: + print(" [AI] Planning pass...", file=sys.stderr) + plan = _run_planning( + client, target, survey, report, all_dirs, tracker, + cached_dirs=cached_dirs, verbose=verbose, + ) + if plan is None: + print(" [AI] Planning failed, using defaults.", + file=sys.stderr) + plan = _default_plan() + else: + # Save plan to cache (#11). + try: + with open(os.path.join(cache.root, "plan.json"), "w") as f: + json.dump(plan, f, indent=2) + except OSError: + pass + + ordered, turn_map = _apply_plan(all_dirs, to_investigate, plan, target) + + # Log plan summary. + skip_count = len(to_investigate) - len(ordered) + priority_count = sum( + 1 for d in ordered if turn_map.get(d, _DEFAULT_TURNS) > _DEFAULT_TURNS + ) + if skip_count or priority_count: + print( + f" [AI] Plan: {priority_count} priority, " + f"{skip_count} skipped, " + f"{len(ordered) - priority_count} default/shallow", + file=sys.stderr, + ) + if plan.get("notes"): + print(f" [AI] Plan notes: {plan['notes']}", file=sys.stderr) + + total = len(ordered) + turn_utilization = [] + + for i, dir_path in enumerate(ordered, 1): dir_rel = os.path.relpath(dir_path, target) if dir_rel == ".": dir_rel = os.path.basename(target) - print(f" [AI] Investigating: {dir_rel}/ ({i}/{total})", - file=sys.stderr) - - summary = _run_dir_loop( - client, target, cache, tracker, dir_path, verbose=verbose, - survey=survey, + max_turns = turn_map.get(dir_path, _DEFAULT_TURNS) + print( + f" [AI] Investigating: {dir_rel}/ ({i}/{total}, " + f"{max_turns} turns)", + file=sys.stderr, ) + tracker.reset_loop() + summary, completeness = _run_dir_loop( + client, target, cache, tracker, dir_path, + max_turns=max_turns, verbose=verbose, survey=survey, + ) + + # Track turn utilization for quality metrics (#74). + turns_used = tracker._loop_turns + turn_utilization.append({ + "dir": dir_rel, + "turns_allocated": max_turns, + "turns_used": turns_used, + "completeness": completeness, + }) + if summary and not cache.has_entry("dir", dir_path): - cache.write_entry("dir", dir_path, { + entry = { "path": dir_path, "relative_path": os.path.relpath(dir_path, target), "child_count": len([ @@ -1481,13 +1972,19 @@ def _run_investigation(client, target, report, show_hidden=False, "dominant_category": "unknown", "notable_files": [], "cached_at": _now_iso(), - }) + } + if completeness is not None: + entry["completeness"] = completeness + cache.write_entry("dir", dir_path, entry) cache.update_meta( directories_investigated=total + cached_count, end_time=_now_iso(), ) + # Emit plan evaluation (#74). + _write_plan_evaluation(cache, plan, turn_utilization) + print(" [AI] Synthesis pass...", file=sys.stderr) brief, detailed = _run_synthesis( client, target, cache, tracker, verbose=verbose, diff --git a/luminos_lib/prompts.py b/luminos_lib/prompts.py index 6cba386..1db7659 100644 --- a/luminos_lib/prompts.py +++ b/luminos_lib/prompts.py @@ -209,3 +209,84 @@ Call `submit_survey` exactly once with: You have at most 3 turns. In almost all cases you should call `submit_survey` on your first turn. Use a second turn only if you genuinely need to think before committing.""" + +_PLANNING_SYSTEM_PROMPT = """\ +You are an investigation planner. Your job is to decide where to invest +investigative depth across a directory tree, BEFORE the per-directory +investigation begins. You allocate turns (agent reasoning steps) to +directories based on their likely complexity and importance. + +## Your Task +Create an investigation plan for the target: {target} + +## Inputs + +Survey assessment (from a prior reconnaissance pass): +{survey_context} + +Full directory tree: +{tree_text} + +File signals: +{file_signals} + +Total directories to investigate: {dir_count} +Directories already cached (will be skipped): {cached_dirs} + +## How to Allocate + +Classify each directory into one of three tiers: + +**priority** (15-20 turns): directories that are likely complex, central, +or important. Signs: many source files, core application logic, complex +configuration, entry points, schemas, migrations. These deserve deep +investigation with multiple tool calls per file. + +**shallow** (5 turns): directories that are simple, peripheral, or +predictable. Signs: few files, generated/vendored content, test fixtures, +static assets, documentation-only dirs. A quick pass is sufficient. + +**skip** (0 turns): directories that should be skipped entirely. Signs: +build output, dependency caches, vendored code, generated artifacts. The +investigation would waste turns and produce noise. + +Directories you do not mention go into a default tier ({default_turns} +turns). You do NOT need to list every directory. Focus on the ones where +the default allocation would clearly be wrong (too many turns for a +trivial dir, or too few for a complex one). + +## Investigation Order + +Choose one of these ordering strategies: + +- **leaf-first**: deepest directories first, parents last. This is the + default and ensures parent directories always have child summaries + available. Best for most codebases. + +- **priority-first**: priority directories before shallow ones, but + still leaf-first within each tier. Good when certain subtrees are + clearly more important and you want findings from them to inform + the rest of the investigation. + +Both strategies preserve the leaf-first invariant (children before +parents) to ensure child summaries are available when investigating +parent directories. + +## Budget + +The global turn budget is {global_budget} turns across all directories. +Your allocations should roughly respect this budget, though small +overages are fine. If you allocate significantly more than the budget, +the orchestrator will cap individual directories. + +## Notes Field + +Use `notes` to communicate anything the per-directory agents should +know that the survey did not capture. Cross-cutting concerns, suspected +relationships between directories, or investigation priorities. Leave +empty if you have nothing to add beyond the tier assignments. + +## Output +Call `submit_plan` exactly once. You have at most 3 turns, but you +should almost always submit on your first turn. Use additional turns +only if you genuinely need to reason through a complex target layout.""" diff --git a/tests/test_ai_pure.py b/tests/test_ai_pure.py index fbc64b8..7562afd 100644 --- a/tests/test_ai_pure.py +++ b/tests/test_ai_pure.py @@ -14,20 +14,28 @@ from types import SimpleNamespace from luminos_lib.ai import ( CONTEXT_BUDGET, + _DEFAULT_TURNS, _DIR_TOOLS, + _MAX_TURNS_CEILING, + _PLANNING_TOOLS, _PROTECTED_DIR_TOOLS, + _SHALLOW_TURNS, _SURVEY_CONFIDENCE_THRESHOLD, _TokenTracker, + _apply_plan, _block_to_dict, + _default_plan, _default_survey, _discover_directories, _filter_dir_tools, _flush_partial_dir_entry, _format_survey_block, _format_survey_signals, + _get_child_summaries, _path_is_safe, _should_skip_dir, _synthesize_from_cache, + _write_plan_evaluation, ) from luminos_lib.cache import _CacheManager @@ -717,5 +725,342 @@ class TestDiscoverDirectories(unittest.TestCase): self.assertNotIn(".git", rels) +# --------------------------------------------------------------------------- +# _default_plan +# --------------------------------------------------------------------------- + +class TestDefaultPlan(unittest.TestCase): + def test_returns_empty_plan(self): + plan = _default_plan() + self.assertEqual(plan["priority_dirs"], []) + self.assertEqual(plan["shallow_dirs"], []) + self.assertEqual(plan["skip_dirs"], []) + self.assertEqual(plan["investigation_order"], "leaf-first") + self.assertEqual(plan["notes"], "") + + def test_returns_fresh_dict_each_call(self): + a = _default_plan() + b = _default_plan() + self.assertIsNot(a, b) + a["notes"] = "mutated" + self.assertEqual(b["notes"], "") + + +# --------------------------------------------------------------------------- +# _apply_plan +# --------------------------------------------------------------------------- + +class TestApplyPlan(unittest.TestCase): + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.target = self.tmp + # Create directories: a/x, a/y, b, c (leaves first in sorted order) + for p in ["a/x", "a/y", "b", "c"]: + os.makedirs(os.path.join(self.tmp, p), exist_ok=True) + # all_dirs sorted leaf-first (deepest first, then alphabetical) + self.all_dirs = [ + os.path.join(self.tmp, "a", "x"), + os.path.join(self.tmp, "a", "y"), + os.path.join(self.tmp, "a"), + os.path.join(self.tmp, "b"), + os.path.join(self.tmp, "c"), + self.tmp, + ] + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_none_plan_returns_original_order(self): + ordered, turn_map = _apply_plan( + self.all_dirs, list(self.all_dirs), None, self.target, + ) + self.assertEqual(ordered, self.all_dirs) + self.assertEqual(turn_map, {}) + + def test_default_plan_returns_original_order(self): + plan = _default_plan() + ordered, turn_map = _apply_plan( + self.all_dirs, list(self.all_dirs), plan, self.target, + ) + self.assertEqual(ordered, self.all_dirs) + self.assertEqual(turn_map, {}) + + def test_skip_dirs_removed(self): + plan = _default_plan() + plan["skip_dirs"] = [{"path": "b", "reason": "vendored"}] + ordered, turn_map = _apply_plan( + self.all_dirs, list(self.all_dirs), plan, self.target, + ) + b_path = os.path.join(self.tmp, "b") + self.assertNotIn(b_path, ordered) + + def test_priority_dirs_get_custom_turns(self): + plan = _default_plan() + plan["priority_dirs"] = [ + {"path": "a", "reason": "core", "suggested_turns": 18}, + ] + ordered, turn_map = _apply_plan( + self.all_dirs, list(self.all_dirs), plan, self.target, + ) + a_path = os.path.join(self.tmp, "a") + self.assertEqual(turn_map[a_path], 18) + + def test_priority_turns_capped_at_ceiling(self): + plan = _default_plan() + plan["priority_dirs"] = [ + {"path": "a", "reason": "core", "suggested_turns": 50}, + ] + _, turn_map = _apply_plan( + self.all_dirs, list(self.all_dirs), plan, self.target, + ) + a_path = os.path.join(self.tmp, "a") + self.assertEqual(turn_map[a_path], _MAX_TURNS_CEILING) + + def test_shallow_dirs_get_shallow_turns(self): + plan = _default_plan() + plan["shallow_dirs"] = [{"path": "c", "reason": "docs only"}] + _, turn_map = _apply_plan( + self.all_dirs, list(self.all_dirs), plan, self.target, + ) + c_path = os.path.join(self.tmp, "c") + self.assertEqual(turn_map[c_path], _SHALLOW_TURNS) + + def test_priority_first_reorders_bands(self): + plan = _default_plan() + plan["investigation_order"] = "priority-first" + plan["priority_dirs"] = [ + {"path": "c", "reason": "entry point", "suggested_turns": 15}, + ] + plan["shallow_dirs"] = [{"path": "b", "reason": "tests"}] + ordered, _ = _apply_plan( + self.all_dirs, list(self.all_dirs), plan, self.target, + ) + c_path = os.path.join(self.tmp, "c") + b_path = os.path.join(self.tmp, "b") + # Priority dirs come before shallow dirs. + self.assertLess(ordered.index(c_path), ordered.index(b_path)) + + def test_leaf_first_preserved_within_priority_band(self): + plan = _default_plan() + plan["investigation_order"] = "priority-first" + plan["priority_dirs"] = [ + {"path": os.path.join("a", "x"), "reason": "deep", + "suggested_turns": 15}, + {"path": "a", "reason": "parent", "suggested_turns": 15}, + ] + ordered, _ = _apply_plan( + self.all_dirs, list(self.all_dirs), plan, self.target, + ) + ax_path = os.path.join(self.tmp, "a", "x") + a_path = os.path.join(self.tmp, "a") + # a/x (leaf) comes before a (parent), preserving leaf-first. + self.assertLess(ordered.index(ax_path), ordered.index(a_path)) + + def test_unknown_paths_in_plan_ignored(self): + plan = _default_plan() + plan["skip_dirs"] = [{"path": "nonexistent", "reason": "gone"}] + plan["priority_dirs"] = [ + {"path": "also_missing", "reason": "?", "suggested_turns": 20}, + ] + ordered, turn_map = _apply_plan( + self.all_dirs, list(self.all_dirs), plan, self.target, + ) + # All original dirs still present, no crash. + self.assertEqual(len(ordered), len(self.all_dirs)) + self.assertEqual(turn_map, {}) + + def test_to_investigate_subset_respected(self): + """Only dirs in to_investigate appear in output, even if plan mentions all.""" + plan = _default_plan() + subset = self.all_dirs[:3] + ordered, _ = _apply_plan( + self.all_dirs, subset, plan, self.target, + ) + self.assertEqual(len(ordered), len(subset)) + + +# --------------------------------------------------------------------------- +# _get_child_summaries (updated placeholder behavior) +# --------------------------------------------------------------------------- + +class TestGetChildSummaries(unittest.TestCase): + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.cache = _make_manager(self.tmp) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_leaf_directory_no_subdirs(self): + leaf = os.path.join(self.tmp, "leaf") + os.makedirs(leaf) + result = _get_child_summaries(leaf, self.cache) + self.assertIn("leaf directory", result) + self.assertNotIn("not been investigated", result) + + def test_parent_with_uninvestigated_children(self): + parent = os.path.join(self.tmp, "parent") + child = os.path.join(parent, "child") + os.makedirs(child) + result = _get_child_summaries(parent, self.cache) + self.assertIn("not been investigated", result) + self.assertNotIn("leaf directory", result) + + def test_parent_with_cached_children(self): + parent = os.path.join(self.tmp, "parent") + child = os.path.join(parent, "child") + os.makedirs(child) + self.cache.write_entry("dir", child, { + "path": child, + "relative_path": "parent/child", + "child_count": 0, + "summary": "A child directory with stuff.", + "dominant_category": "source", + "notable_files": [], + "cached_at": "2026-01-01T00:00:00+00:00", + }) + result = _get_child_summaries(parent, self.cache) + self.assertIn("parent/child/", result) + self.assertIn("A child directory with stuff.", result) + + def test_hidden_dirs_ignored(self): + parent = os.path.join(self.tmp, "parent") + os.makedirs(os.path.join(parent, ".hidden")) + result = _get_child_summaries(parent, self.cache) + # .hidden is ignored, so this looks like a leaf. + self.assertIn("leaf directory", result) + + +# --------------------------------------------------------------------------- +# _TokenTracker._loop_turns +# --------------------------------------------------------------------------- + +class TestTokenTrackerLoopTurns(unittest.TestCase): + def test_loop_turns_increments_on_record(self): + t = _TokenTracker() + self.assertEqual(t._loop_turns, 0) + t.record(SimpleNamespace(input_tokens=100, output_tokens=50)) + self.assertEqual(t._loop_turns, 1) + t.record(SimpleNamespace(input_tokens=200, output_tokens=75)) + self.assertEqual(t._loop_turns, 2) + + def test_loop_turns_reset(self): + t = _TokenTracker() + t.record(SimpleNamespace(input_tokens=100, output_tokens=50)) + t.record(SimpleNamespace(input_tokens=200, output_tokens=75)) + self.assertEqual(t._loop_turns, 2) + t.reset_loop() + self.assertEqual(t._loop_turns, 0) + + def test_loop_turns_independent_of_totals(self): + t = _TokenTracker() + t.record(SimpleNamespace(input_tokens=100, output_tokens=50)) + t.reset_loop() + t.record(SimpleNamespace(input_tokens=300, output_tokens=100)) + self.assertEqual(t._loop_turns, 1) + self.assertEqual(t.total_input, 400) + + +# --------------------------------------------------------------------------- +# _write_plan_evaluation +# --------------------------------------------------------------------------- + +class TestWritePlanEvaluation(unittest.TestCase): + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.cache = _make_manager(self.tmp) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_writes_evaluation_file(self): + plan = { + "priority_dirs": [ + {"path": "src", "reason": "core", "suggested_turns": 18}, + ], + "shallow_dirs": [ + {"path": "docs", "reason": "docs"}, + ], + "skip_dirs": [], + "investigation_order": "leaf-first", + "notes": "", + } + utilization = [ + {"dir": "src", "turns_allocated": 18, "turns_used": 12, + "completeness": 0.85}, + {"dir": "docs", "turns_allocated": 5, "turns_used": 3, + "completeness": 0.7}, + ] + _write_plan_evaluation(self.cache, plan, utilization) + + import json + eval_path = os.path.join(self.cache.root, "plan_evaluation.json") + self.assertTrue(os.path.exists(eval_path)) + with open(eval_path) as f: + data = json.load(f) + self.assertEqual(data["total_dirs_investigated"], 2) + self.assertEqual(data["total_turns_allocated"], 23) + self.assertEqual(data["total_turns_used"], 15) + self.assertEqual(len(data["per_directory"]), 2) + # Check that tier classification came through. + src_entry = [d for d in data["per_directory"] if d["dir"] == "src"][0] + self.assertEqual(src_entry["planned_tier"], "priority") + self.assertEqual(src_entry["completeness"], 0.85) + + def test_handles_none_plan(self): + utilization = [ + {"dir": "a", "turns_allocated": 10, "turns_used": 8}, + ] + _write_plan_evaluation(self.cache, None, utilization) + eval_path = os.path.join(self.cache.root, "plan_evaluation.json") + self.assertTrue(os.path.exists(eval_path)) + + def test_handles_empty_utilization(self): + plan = _default_plan() + _write_plan_evaluation(self.cache, plan, []) + import json + eval_path = os.path.join(self.cache.root, "plan_evaluation.json") + with open(eval_path) as f: + data = json.load(f) + self.assertEqual(data["total_dirs_investigated"], 0) + self.assertEqual(data["overall_utilization"], 0) + + def test_zero_allocated_turns_no_division_error(self): + plan = _default_plan() + utilization = [ + {"dir": "x", "turns_allocated": 0, "turns_used": 0}, + ] + _write_plan_evaluation(self.cache, plan, utilization) + import json + eval_path = os.path.join(self.cache.root, "plan_evaluation.json") + with open(eval_path) as f: + data = json.load(f) + self.assertEqual(data["per_directory"][0]["utilization"], 0) + + +# --------------------------------------------------------------------------- +# Planning tool registry +# --------------------------------------------------------------------------- + +class TestPlanningToolRegistry(unittest.TestCase): + def test_submit_plan_registered(self): + names = [t["name"] for t in _PLANNING_TOOLS] + self.assertIn("submit_plan", names) + + def test_submit_plan_has_required_fields(self): + tool = [t for t in _PLANNING_TOOLS if t["name"] == "submit_plan"][0] + required = tool["input_schema"]["required"] + self.assertIn("priority_dirs", required) + self.assertIn("shallow_dirs", required) + self.assertIn("skip_dirs", required) + self.assertIn("investigation_order", required) + self.assertIn("notes", required) + + def test_submit_plan_order_enum(self): + tool = [t for t in _PLANNING_TOOLS if t["name"] == "submit_plan"][0] + order_prop = tool["input_schema"]["properties"]["investigation_order"] + self.assertEqual(order_prop["enum"], ["leaf-first", "priority-first"]) + + if __name__ == "__main__": unittest.main() -- 2.45.2