feat(ai): Phase 3 investigation planning #75

Merged
claude-code merged 1 commit from feat/phase-3-investigation-planning into main 2026-04-12 20:26:22 -06:00
3 changed files with 944 additions and 21 deletions

View file

@ -24,6 +24,7 @@ from luminos_lib.ast_parser import parse_structure
from luminos_lib.cache import _CacheManager, _get_investigation_id from luminos_lib.cache import _CacheManager, _get_investigation_id
from luminos_lib.prompts import ( from luminos_lib.prompts import (
_DIR_SYSTEM_PROMPT, _DIR_SYSTEM_PROMPT,
_PLANNING_SYSTEM_PROMPT,
_SURVEY_SYSTEM_PROMPT, _SURVEY_SYSTEM_PROMPT,
_SYNTHESIS_SYSTEM_PROMPT, _SYNTHESIS_SYSTEM_PROMPT,
) )
@ -111,6 +112,7 @@ class _TokenTracker:
self.loop_input = 0 self.loop_input = 0
self.loop_output = 0 self.loop_output = 0
self.last_input = 0 self.last_input = 0
self._loop_turns = 0
def record(self, usage): def record(self, usage):
"""Record usage from a single API call.""" """Record usage from a single API call."""
@ -121,12 +123,14 @@ class _TokenTracker:
self.loop_input += inp self.loop_input += inp
self.loop_output += out self.loop_output += out
self.last_input = inp self.last_input = inp
self._loop_turns += 1
def reset_loop(self): def reset_loop(self):
"""Reset per-loop counters (called between directory loops).""" """Reset per-loop counters (called between directory loops)."""
self.loop_input = 0 self.loop_input = 0
self.loop_output = 0 self.loop_output = 0
self.last_input = 0 self.last_input = 0
self._loop_turns = 0
@property @property
def loop_total(self): def loop_total(self):
@ -163,12 +167,14 @@ class _TokenTracker:
_DIR_TOOLS = [] _DIR_TOOLS = []
_SYNTHESIS_TOOLS = [] _SYNTHESIS_TOOLS = []
_SURVEY_TOOLS = [] _SURVEY_TOOLS = []
_PLANNING_TOOLS = []
_TOOL_DISPATCH = {} _TOOL_DISPATCH = {}
_TOOL_REGISTRIES = { _TOOL_REGISTRIES = {
"dir": _DIR_TOOLS, "dir": _DIR_TOOLS,
"synthesis": _SYNTHESIS_TOOLS, "synthesis": _SYNTHESIS_TOOLS,
"survey": _SURVEY_TOOLS, "survey": _SURVEY_TOOLS,
"planning": _PLANNING_TOOLS,
} }
@ -595,8 +601,17 @@ register_tool(
"type": "string", "type": "string",
"description": "1-3 sentence summary of the directory.", "description": "1-3 sentence summary of the directory.",
}, },
"completeness": {
"type": "number",
"description": (
"Self-rated investigation completeness (0.0-1.0). "
"1.0 = examined every relevant file thoroughly. "
"0.5 = examined about half, or skimmed most. "
"< 0.3 = barely scratched the surface."
),
},
}, },
"required": ["summary"], "required": ["summary", "completeness"],
}, },
scopes=["dir"], scopes=["dir"],
) )
@ -715,6 +730,92 @@ register_tool(
scopes=["survey"], scopes=["survey"],
) )
# --- Planning tools ---
register_tool(
name="submit_plan",
description=(
"Submit the investigation plan. Call exactly once."
),
schema={
"type": "object",
"properties": {
"priority_dirs": {
"type": "array",
"items": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative directory path.",
},
"reason": {
"type": "string",
"description": "Why this dir deserves deep investigation.",
},
"suggested_turns": {
"type": "integer",
"description": "Suggested turns (15-20).",
},
},
"required": ["path", "reason", "suggested_turns"],
},
"description": "Directories to investigate deeply.",
},
"shallow_dirs": {
"type": "array",
"items": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative directory path.",
},
"reason": {
"type": "string",
"description": "Why a shallow pass is sufficient.",
},
},
"required": ["path", "reason"],
},
"description": "Directories needing only a quick pass.",
},
"skip_dirs": {
"type": "array",
"items": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative directory path.",
},
"reason": {
"type": "string",
"description": "Why this dir should be skipped.",
},
},
"required": ["path", "reason"],
},
"description": "Directories to skip entirely.",
},
"investigation_order": {
"type": "string",
"enum": ["leaf-first", "priority-first"],
"description": "leaf-first or priority-first (leaf-first within bands).",
},
"notes": {
"type": "string",
"description": "Cross-cutting notes for per-directory agents, or empty.",
},
},
"required": [
"priority_dirs", "shallow_dirs", "skip_dirs",
"investigation_order", "notes",
],
},
scopes=["planning"],
)
def _execute_tool(name, args, target, cache, dir_rel, turn, verbose=False): def _execute_tool(name, args, target, cache, dir_rel, turn, verbose=False):
"""Execute a tool by name and return the result string.""" """Execute a tool by name and return the result string."""
@ -829,7 +930,23 @@ def _get_child_summaries(dir_path, cache):
parts.append(f"- {rel}/: {summary}") parts.append(f"- {rel}/: {summary}")
except OSError: except OSError:
pass pass
return "\n".join(parts) if parts else "(none — this is a leaf directory)" if parts:
return "\n".join(parts)
# Distinguish actual leaves from parents whose children haven't been
# investigated yet. The old placeholder claimed "leaf directory" even
# when children existed but were not yet cached, which silently
# degraded parent context.
try:
has_subdirs = any(
os.path.isdir(os.path.join(dir_path, name))
for name in os.listdir(dir_path)
if not name.startswith(".")
)
except OSError:
has_subdirs = False
if has_subdirs:
return "(child directories exist but have not been investigated yet)"
return "(none: this is a leaf directory)"
_SURVEY_CONFIDENCE_THRESHOLD = 0.5 _SURVEY_CONFIDENCE_THRESHOLD = 0.5
@ -1040,14 +1157,19 @@ def _handle_turn_response(content_blocks, messages, target, cache, dir_rel,
"role": "user", "role": "user",
"content": "Please call submit_report with your summary.", "content": "Please call submit_report with your summary.",
}) })
return False, None return False, None, None
tool_results = [] tool_results = []
done = False done = False
summary = None summary = None
completeness = None
for tu in tool_uses: for tu in tool_uses:
if tu.name == "submit_report": if tu.name == "submit_report":
summary = tu.input.get("summary", "") summary = tu.input.get("summary", "")
try:
completeness = float(tu.input.get("completeness", 0) or 0)
except (TypeError, ValueError):
completeness = None
tool_results.append({ tool_results.append({
"type": "tool_result", "type": "tool_result",
"tool_use_id": tu.id, "tool_use_id": tu.id,
@ -1066,17 +1188,21 @@ def _handle_turn_response(content_blocks, messages, target, cache, dir_rel,
}) })
messages.append({"role": "user", "content": tool_results}) messages.append({"role": "user", "content": tool_results})
return done, summary return done, summary, completeness
def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14, def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14,
verbose=False, survey=None): verbose=False, survey=None):
"""Run an isolated agent loop for a single directory.""" """Run an isolated agent loop for a single directory.
Returns (summary, completeness) where completeness is the agent's
self-rated investigation thoroughness (0.0-1.0), or None if not reported.
"""
ctx = _build_dir_loop_context( ctx = _build_dir_loop_context(
dir_path, target, cache, survey, max_turns, dir_path, target, cache, survey, max_turns,
) )
tracker.reset_loop()
summary = None summary = None
completeness = None
for turn in range(max_turns): for turn in range(max_turns):
if tracker.budget_exceeded(): if tracker.budget_exceeded():
@ -1098,19 +1224,21 @@ def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14,
print(f" [AI] API error: {e}", file=sys.stderr) print(f" [AI] API error: {e}", file=sys.stderr)
break break
done, turn_summary = _handle_turn_response( done, turn_summary, turn_completeness = _handle_turn_response(
content_blocks, ctx.messages, target, cache, content_blocks, ctx.messages, target, cache,
ctx.dir_rel, turn, verbose, ctx.dir_rel, turn, verbose,
) )
if turn_summary is not None: if turn_summary is not None:
summary = turn_summary summary = turn_summary
if turn_completeness is not None:
completeness = turn_completeness
if done: if done:
break break
else: else:
print(f" [AI] Warning: max turns reached for {ctx.dir_rel}", print(f" [AI] Warning: max turns reached for {ctx.dir_rel}",
file=sys.stderr) file=sys.stderr)
return summary return summary, completeness
def _block_to_dict(block): def _block_to_dict(block):
@ -1263,6 +1391,300 @@ def _run_survey(client, target, report, tracker, max_turns=3, verbose=False):
return survey return survey
# ---------------------------------------------------------------------------
# Planning pass
# ---------------------------------------------------------------------------
# Turn allocation defaults.
_DEFAULT_TURNS = 10
_SHALLOW_TURNS = 5
_MAX_TURNS_CEILING = 25
_BASE_TURNS_PER_DIR = 10
def _default_plan():
"""Fallback plan when planning is skipped or fails.
All directories get default turns, leaf-first order, no overrides.
"""
return {
"priority_dirs": [],
"shallow_dirs": [],
"skip_dirs": [],
"investigation_order": "leaf-first",
"notes": "",
}
def _run_planning(client, target, survey, report, all_dirs, tracker,
cached_dirs=None, max_turns=3, verbose=False):
"""Run the planning pass. Returns a plan dict or None on failure.
The planning pass decides where to invest investigation depth.
It runs after the survey and before the per-directory loops.
"""
cached_dirs = cached_dirs or []
dir_count = len(all_dirs)
global_budget = _BASE_TURNS_PER_DIR * dir_count
survey_context = _format_survey_block(survey) if survey else "(no survey available)"
try:
tree_node = build_tree(target, max_depth=6)
tree_text = render_tree(tree_node)
except Exception:
tree_text = "(tree unavailable)"
signals = report.get("survey_signals") or {}
file_signals = _format_survey_signals(signals)
cached_rel = []
for d in cached_dirs:
cached_rel.append(os.path.relpath(d, target))
cached_text = ", ".join(cached_rel) if cached_rel else "(none)"
system = _PLANNING_SYSTEM_PROMPT.format(
target=target,
survey_context=survey_context,
tree_text=tree_text,
file_signals=file_signals,
dir_count=dir_count,
cached_dirs=cached_text,
default_turns=_DEFAULT_TURNS,
global_budget=global_budget,
)
messages = [
{
"role": "user",
"content": (
"All inputs are in the system prompt above. Call "
"submit_plan now."
),
},
]
plan = None
for turn in range(max_turns):
try:
content_blocks, _usage = _call_api_streaming(
client, system, messages, _PLANNING_TOOLS, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
return None
for b in content_blocks:
if b.type == "text" and b.text.strip():
for line in b.text.strip().split("\n"):
print(f" [AI] {line}", file=sys.stderr)
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items()
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_plan.",
})
continue
tool_results = []
done = False
for tu in tool_uses:
if tu.name == "submit_plan":
plan = {
"priority_dirs": tu.input.get("priority_dirs", []) or [],
"shallow_dirs": tu.input.get("shallow_dirs", []) or [],
"skip_dirs": tu.input.get("skip_dirs", []) or [],
"investigation_order": tu.input.get(
"investigation_order", "leaf-first"
),
"notes": tu.input.get("notes", ""),
}
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Plan received. Thank you.",
})
done = True
else:
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Unknown tool. Call submit_plan.",
"is_error": True,
})
messages.append({"role": "user", "content": tool_results})
if done:
break
else:
print(" [AI] Warning: planning ran out of turns.", file=sys.stderr)
return plan
def _apply_plan(all_dirs, to_investigate, plan, target):
"""Apply the plan to produce an ordered dir list and turn map.
Returns (ordered_dirs, turn_map) where:
- ordered_dirs: list of absolute dir paths in investigation order
- turn_map: dict of {abs_dir_path: max_turns}
Pure function: no I/O, no cache, no API calls.
"""
if plan is None:
return list(to_investigate), {}
# Build lookup from relative path to absolute path.
rel_to_abs = {}
for d in all_dirs:
rel = os.path.relpath(d, target)
rel_to_abs[rel] = d
# Classify directories by tier.
skip_set = set()
priority_set = set()
shallow_set = set()
turn_map = {}
for entry in plan.get("skip_dirs", []):
rel = entry.get("path", "")
if rel in rel_to_abs:
skip_set.add(rel_to_abs[rel])
for entry in plan.get("priority_dirs", []):
rel = entry.get("path", "")
suggested = entry.get("suggested_turns", 15)
capped = min(suggested, _MAX_TURNS_CEILING)
if rel in rel_to_abs:
abs_path = rel_to_abs[rel]
priority_set.add(abs_path)
turn_map[abs_path] = capped
for entry in plan.get("shallow_dirs", []):
rel = entry.get("path", "")
if rel in rel_to_abs:
abs_path = rel_to_abs[rel]
shallow_set.add(abs_path)
turn_map[abs_path] = _SHALLOW_TURNS
# Remove skipped dirs from the investigation list.
remaining = [d for d in to_investigate if d not in skip_set]
# Order by bands. Both strategies preserve leaf-first within bands.
order = plan.get("investigation_order", "leaf-first")
if order == "priority-first":
priority_band = [d for d in remaining if d in priority_set]
shallow_band = [d for d in remaining if d in shallow_set]
default_band = [
d for d in remaining
if d not in priority_set and d not in shallow_set
]
ordered = priority_band + default_band + shallow_band
else:
# leaf-first: keep the original order (already leaf-first from
# _discover_directories), just remove skipped dirs.
ordered = remaining
return ordered, turn_map
def _write_plan_evaluation(cache, plan, turn_utilization):
"""Write plan_evaluation.json comparing plan predictions to actual results.
This is the planning pass's report card: did we allocate turns well?
"""
# Build a lookup of what the plan predicted per dir.
predicted = {}
for entry in (plan or {}).get("priority_dirs", []):
predicted[entry["path"]] = {
"tier": "priority",
"suggested_turns": entry.get("suggested_turns", 15),
}
for entry in (plan or {}).get("shallow_dirs", []):
predicted[entry["path"]] = {
"tier": "shallow",
"suggested_turns": _SHALLOW_TURNS,
}
for entry in (plan or {}).get("skip_dirs", []):
predicted[entry["path"]] = {
"tier": "skip",
"suggested_turns": 0,
}
# Compare predictions to actual turn utilization.
per_dir = []
total_allocated = 0
total_used = 0
for record in turn_utilization:
dir_rel = record["dir"]
allocated = record["turns_allocated"]
used = record["turns_used"]
total_allocated += allocated
total_used += used
pred = predicted.get(dir_rel, {})
entry = {
"dir": dir_rel,
"planned_tier": pred.get("tier", "default"),
"turns_allocated": allocated,
"turns_used": used,
"utilization": round(used / allocated, 2) if allocated else 0,
}
# Include completeness from turn utilization record (#74).
record_completeness = record.get("completeness")
if record_completeness is not None:
entry["completeness"] = record_completeness
# Read confidence from the cached dir entry if available.
dir_entry = cache.read_entry("dir", os.path.join(
cache.target, dir_rel,
))
if dir_entry:
entry["confidence"] = dir_entry.get("confidence")
per_dir.append(entry)
evaluation = {
"plan_order": (plan or {}).get("investigation_order", "leaf-first"),
"total_dirs_investigated": len(turn_utilization),
"total_turns_allocated": total_allocated,
"total_turns_used": total_used,
"overall_utilization": (
round(total_used / total_allocated, 2) if total_allocated else 0
),
"per_directory": per_dir,
"evaluated_at": _now_iso(),
}
try:
eval_path = os.path.join(cache.root, "plan_evaluation.json")
with open(eval_path, "w") as f:
json.dump(evaluation, f, indent=2)
print(
f" [AI] Plan evaluation: {total_used}/{total_allocated} turns used "
f"({evaluation['overall_utilization']:.0%} utilization)",
file=sys.stderr,
)
except OSError:
pass
def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False): def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False):
"""Run the final synthesis pass. Returns (brief, detailed).""" """Run the final synthesis pass. Returns (brief, detailed)."""
dir_entries = cache.read_all_entries("dir") dir_entries = cache.read_all_entries("dir")
@ -1443,34 +1865,103 @@ def _run_investigation(client, target, report, show_hidden=False,
print(" [AI] Survey unavailable — proceeding without it.", file=sys.stderr) print(" [AI] Survey unavailable — proceeding without it.", file=sys.stderr)
to_investigate = [] to_investigate = []
cached_count = 0 cached_dirs = []
for d in all_dirs: for d in all_dirs:
if cache.has_entry("dir", d): if cache.has_entry("dir", d):
cached_count += 1 cached_dirs.append(d)
rel = os.path.relpath(d, target) rel = os.path.relpath(d, target)
print(f" [AI] Skipping (cached): {rel}/", file=sys.stderr) print(f" [AI] Skipping (cached): {rel}/", file=sys.stderr)
else: else:
to_investigate.append(d) to_investigate.append(d)
total = len(to_investigate) cached_count = len(cached_dirs)
if cached_count: if cached_count:
print(f" [AI] Directories cached: {cached_count}", file=sys.stderr) print(f" [AI] Directories cached: {cached_count}", file=sys.stderr)
print(f" [AI] Directories to investigate: {total}", file=sys.stderr) print(f" [AI] Directories to investigate: {len(to_investigate)}",
file=sys.stderr)
for i, dir_path in enumerate(to_investigate, 1): # Planning pass: decide where to invest depth.
if total_files < _SURVEY_MIN_FILES and total_dirs < _SURVEY_MIN_DIRS:
print(" [AI] Planning skipped (small target).", file=sys.stderr)
plan = _default_plan()
else:
plan_path = os.path.join(cache.root, "plan.json")
if not fresh and os.path.exists(plan_path):
try:
with open(plan_path) as f:
plan = json.load(f)
print(" [AI] Plan loaded from cache.", file=sys.stderr)
except (OSError, json.JSONDecodeError):
plan = None
else:
plan = None
if plan is None:
print(" [AI] Planning pass...", file=sys.stderr)
plan = _run_planning(
client, target, survey, report, all_dirs, tracker,
cached_dirs=cached_dirs, verbose=verbose,
)
if plan is None:
print(" [AI] Planning failed, using defaults.",
file=sys.stderr)
plan = _default_plan()
else:
# Save plan to cache (#11).
try:
with open(os.path.join(cache.root, "plan.json"), "w") as f:
json.dump(plan, f, indent=2)
except OSError:
pass
ordered, turn_map = _apply_plan(all_dirs, to_investigate, plan, target)
# Log plan summary.
skip_count = len(to_investigate) - len(ordered)
priority_count = sum(
1 for d in ordered if turn_map.get(d, _DEFAULT_TURNS) > _DEFAULT_TURNS
)
if skip_count or priority_count:
print(
f" [AI] Plan: {priority_count} priority, "
f"{skip_count} skipped, "
f"{len(ordered) - priority_count} default/shallow",
file=sys.stderr,
)
if plan.get("notes"):
print(f" [AI] Plan notes: {plan['notes']}", file=sys.stderr)
total = len(ordered)
turn_utilization = []
for i, dir_path in enumerate(ordered, 1):
dir_rel = os.path.relpath(dir_path, target) dir_rel = os.path.relpath(dir_path, target)
if dir_rel == ".": if dir_rel == ".":
dir_rel = os.path.basename(target) dir_rel = os.path.basename(target)
print(f" [AI] Investigating: {dir_rel}/ ({i}/{total})", max_turns = turn_map.get(dir_path, _DEFAULT_TURNS)
file=sys.stderr) print(
f" [AI] Investigating: {dir_rel}/ ({i}/{total}, "
summary = _run_dir_loop( f"{max_turns} turns)",
client, target, cache, tracker, dir_path, verbose=verbose, file=sys.stderr,
survey=survey,
) )
tracker.reset_loop()
summary, completeness = _run_dir_loop(
client, target, cache, tracker, dir_path,
max_turns=max_turns, verbose=verbose, survey=survey,
)
# Track turn utilization for quality metrics (#74).
turns_used = tracker._loop_turns
turn_utilization.append({
"dir": dir_rel,
"turns_allocated": max_turns,
"turns_used": turns_used,
"completeness": completeness,
})
if summary and not cache.has_entry("dir", dir_path): if summary and not cache.has_entry("dir", dir_path):
cache.write_entry("dir", dir_path, { entry = {
"path": dir_path, "path": dir_path,
"relative_path": os.path.relpath(dir_path, target), "relative_path": os.path.relpath(dir_path, target),
"child_count": len([ "child_count": len([
@ -1481,13 +1972,19 @@ def _run_investigation(client, target, report, show_hidden=False,
"dominant_category": "unknown", "dominant_category": "unknown",
"notable_files": [], "notable_files": [],
"cached_at": _now_iso(), "cached_at": _now_iso(),
}) }
if completeness is not None:
entry["completeness"] = completeness
cache.write_entry("dir", dir_path, entry)
cache.update_meta( cache.update_meta(
directories_investigated=total + cached_count, directories_investigated=total + cached_count,
end_time=_now_iso(), end_time=_now_iso(),
) )
# Emit plan evaluation (#74).
_write_plan_evaluation(cache, plan, turn_utilization)
print(" [AI] Synthesis pass...", file=sys.stderr) print(" [AI] Synthesis pass...", file=sys.stderr)
brief, detailed = _run_synthesis( brief, detailed = _run_synthesis(
client, target, cache, tracker, verbose=verbose, client, target, cache, tracker, verbose=verbose,

View file

@ -209,3 +209,84 @@ Call `submit_survey` exactly once with:
You have at most 3 turns. In almost all cases you should call You have at most 3 turns. In almost all cases you should call
`submit_survey` on your first turn. Use a second turn only if you `submit_survey` on your first turn. Use a second turn only if you
genuinely need to think before committing.""" genuinely need to think before committing."""
_PLANNING_SYSTEM_PROMPT = """\
You are an investigation planner. Your job is to decide where to invest
investigative depth across a directory tree, BEFORE the per-directory
investigation begins. You allocate turns (agent reasoning steps) to
directories based on their likely complexity and importance.
## Your Task
Create an investigation plan for the target: {target}
## Inputs
Survey assessment (from a prior reconnaissance pass):
{survey_context}
Full directory tree:
{tree_text}
File signals:
{file_signals}
Total directories to investigate: {dir_count}
Directories already cached (will be skipped): {cached_dirs}
## How to Allocate
Classify each directory into one of three tiers:
**priority** (15-20 turns): directories that are likely complex, central,
or important. Signs: many source files, core application logic, complex
configuration, entry points, schemas, migrations. These deserve deep
investigation with multiple tool calls per file.
**shallow** (5 turns): directories that are simple, peripheral, or
predictable. Signs: few files, generated/vendored content, test fixtures,
static assets, documentation-only dirs. A quick pass is sufficient.
**skip** (0 turns): directories that should be skipped entirely. Signs:
build output, dependency caches, vendored code, generated artifacts. The
investigation would waste turns and produce noise.
Directories you do not mention go into a default tier ({default_turns}
turns). You do NOT need to list every directory. Focus on the ones where
the default allocation would clearly be wrong (too many turns for a
trivial dir, or too few for a complex one).
## Investigation Order
Choose one of these ordering strategies:
- **leaf-first**: deepest directories first, parents last. This is the
default and ensures parent directories always have child summaries
available. Best for most codebases.
- **priority-first**: priority directories before shallow ones, but
still leaf-first within each tier. Good when certain subtrees are
clearly more important and you want findings from them to inform
the rest of the investigation.
Both strategies preserve the leaf-first invariant (children before
parents) to ensure child summaries are available when investigating
parent directories.
## Budget
The global turn budget is {global_budget} turns across all directories.
Your allocations should roughly respect this budget, though small
overages are fine. If you allocate significantly more than the budget,
the orchestrator will cap individual directories.
## Notes Field
Use `notes` to communicate anything the per-directory agents should
know that the survey did not capture. Cross-cutting concerns, suspected
relationships between directories, or investigation priorities. Leave
empty if you have nothing to add beyond the tier assignments.
## Output
Call `submit_plan` exactly once. You have at most 3 turns, but you
should almost always submit on your first turn. Use additional turns
only if you genuinely need to reason through a complex target layout."""

View file

@ -14,20 +14,28 @@ from types import SimpleNamespace
from luminos_lib.ai import ( from luminos_lib.ai import (
CONTEXT_BUDGET, CONTEXT_BUDGET,
_DEFAULT_TURNS,
_DIR_TOOLS, _DIR_TOOLS,
_MAX_TURNS_CEILING,
_PLANNING_TOOLS,
_PROTECTED_DIR_TOOLS, _PROTECTED_DIR_TOOLS,
_SHALLOW_TURNS,
_SURVEY_CONFIDENCE_THRESHOLD, _SURVEY_CONFIDENCE_THRESHOLD,
_TokenTracker, _TokenTracker,
_apply_plan,
_block_to_dict, _block_to_dict,
_default_plan,
_default_survey, _default_survey,
_discover_directories, _discover_directories,
_filter_dir_tools, _filter_dir_tools,
_flush_partial_dir_entry, _flush_partial_dir_entry,
_format_survey_block, _format_survey_block,
_format_survey_signals, _format_survey_signals,
_get_child_summaries,
_path_is_safe, _path_is_safe,
_should_skip_dir, _should_skip_dir,
_synthesize_from_cache, _synthesize_from_cache,
_write_plan_evaluation,
) )
from luminos_lib.cache import _CacheManager from luminos_lib.cache import _CacheManager
@ -717,5 +725,342 @@ class TestDiscoverDirectories(unittest.TestCase):
self.assertNotIn(".git", rels) self.assertNotIn(".git", rels)
# ---------------------------------------------------------------------------
# _default_plan
# ---------------------------------------------------------------------------
class TestDefaultPlan(unittest.TestCase):
def test_returns_empty_plan(self):
plan = _default_plan()
self.assertEqual(plan["priority_dirs"], [])
self.assertEqual(plan["shallow_dirs"], [])
self.assertEqual(plan["skip_dirs"], [])
self.assertEqual(plan["investigation_order"], "leaf-first")
self.assertEqual(plan["notes"], "")
def test_returns_fresh_dict_each_call(self):
a = _default_plan()
b = _default_plan()
self.assertIsNot(a, b)
a["notes"] = "mutated"
self.assertEqual(b["notes"], "")
# ---------------------------------------------------------------------------
# _apply_plan
# ---------------------------------------------------------------------------
class TestApplyPlan(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.target = self.tmp
# Create directories: a/x, a/y, b, c (leaves first in sorted order)
for p in ["a/x", "a/y", "b", "c"]:
os.makedirs(os.path.join(self.tmp, p), exist_ok=True)
# all_dirs sorted leaf-first (deepest first, then alphabetical)
self.all_dirs = [
os.path.join(self.tmp, "a", "x"),
os.path.join(self.tmp, "a", "y"),
os.path.join(self.tmp, "a"),
os.path.join(self.tmp, "b"),
os.path.join(self.tmp, "c"),
self.tmp,
]
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def test_none_plan_returns_original_order(self):
ordered, turn_map = _apply_plan(
self.all_dirs, list(self.all_dirs), None, self.target,
)
self.assertEqual(ordered, self.all_dirs)
self.assertEqual(turn_map, {})
def test_default_plan_returns_original_order(self):
plan = _default_plan()
ordered, turn_map = _apply_plan(
self.all_dirs, list(self.all_dirs), plan, self.target,
)
self.assertEqual(ordered, self.all_dirs)
self.assertEqual(turn_map, {})
def test_skip_dirs_removed(self):
plan = _default_plan()
plan["skip_dirs"] = [{"path": "b", "reason": "vendored"}]
ordered, turn_map = _apply_plan(
self.all_dirs, list(self.all_dirs), plan, self.target,
)
b_path = os.path.join(self.tmp, "b")
self.assertNotIn(b_path, ordered)
def test_priority_dirs_get_custom_turns(self):
plan = _default_plan()
plan["priority_dirs"] = [
{"path": "a", "reason": "core", "suggested_turns": 18},
]
ordered, turn_map = _apply_plan(
self.all_dirs, list(self.all_dirs), plan, self.target,
)
a_path = os.path.join(self.tmp, "a")
self.assertEqual(turn_map[a_path], 18)
def test_priority_turns_capped_at_ceiling(self):
plan = _default_plan()
plan["priority_dirs"] = [
{"path": "a", "reason": "core", "suggested_turns": 50},
]
_, turn_map = _apply_plan(
self.all_dirs, list(self.all_dirs), plan, self.target,
)
a_path = os.path.join(self.tmp, "a")
self.assertEqual(turn_map[a_path], _MAX_TURNS_CEILING)
def test_shallow_dirs_get_shallow_turns(self):
plan = _default_plan()
plan["shallow_dirs"] = [{"path": "c", "reason": "docs only"}]
_, turn_map = _apply_plan(
self.all_dirs, list(self.all_dirs), plan, self.target,
)
c_path = os.path.join(self.tmp, "c")
self.assertEqual(turn_map[c_path], _SHALLOW_TURNS)
def test_priority_first_reorders_bands(self):
plan = _default_plan()
plan["investigation_order"] = "priority-first"
plan["priority_dirs"] = [
{"path": "c", "reason": "entry point", "suggested_turns": 15},
]
plan["shallow_dirs"] = [{"path": "b", "reason": "tests"}]
ordered, _ = _apply_plan(
self.all_dirs, list(self.all_dirs), plan, self.target,
)
c_path = os.path.join(self.tmp, "c")
b_path = os.path.join(self.tmp, "b")
# Priority dirs come before shallow dirs.
self.assertLess(ordered.index(c_path), ordered.index(b_path))
def test_leaf_first_preserved_within_priority_band(self):
plan = _default_plan()
plan["investigation_order"] = "priority-first"
plan["priority_dirs"] = [
{"path": os.path.join("a", "x"), "reason": "deep",
"suggested_turns": 15},
{"path": "a", "reason": "parent", "suggested_turns": 15},
]
ordered, _ = _apply_plan(
self.all_dirs, list(self.all_dirs), plan, self.target,
)
ax_path = os.path.join(self.tmp, "a", "x")
a_path = os.path.join(self.tmp, "a")
# a/x (leaf) comes before a (parent), preserving leaf-first.
self.assertLess(ordered.index(ax_path), ordered.index(a_path))
def test_unknown_paths_in_plan_ignored(self):
plan = _default_plan()
plan["skip_dirs"] = [{"path": "nonexistent", "reason": "gone"}]
plan["priority_dirs"] = [
{"path": "also_missing", "reason": "?", "suggested_turns": 20},
]
ordered, turn_map = _apply_plan(
self.all_dirs, list(self.all_dirs), plan, self.target,
)
# All original dirs still present, no crash.
self.assertEqual(len(ordered), len(self.all_dirs))
self.assertEqual(turn_map, {})
def test_to_investigate_subset_respected(self):
"""Only dirs in to_investigate appear in output, even if plan mentions all."""
plan = _default_plan()
subset = self.all_dirs[:3]
ordered, _ = _apply_plan(
self.all_dirs, subset, plan, self.target,
)
self.assertEqual(len(ordered), len(subset))
# ---------------------------------------------------------------------------
# _get_child_summaries (updated placeholder behavior)
# ---------------------------------------------------------------------------
class TestGetChildSummaries(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.cache = _make_manager(self.tmp)
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def test_leaf_directory_no_subdirs(self):
leaf = os.path.join(self.tmp, "leaf")
os.makedirs(leaf)
result = _get_child_summaries(leaf, self.cache)
self.assertIn("leaf directory", result)
self.assertNotIn("not been investigated", result)
def test_parent_with_uninvestigated_children(self):
parent = os.path.join(self.tmp, "parent")
child = os.path.join(parent, "child")
os.makedirs(child)
result = _get_child_summaries(parent, self.cache)
self.assertIn("not been investigated", result)
self.assertNotIn("leaf directory", result)
def test_parent_with_cached_children(self):
parent = os.path.join(self.tmp, "parent")
child = os.path.join(parent, "child")
os.makedirs(child)
self.cache.write_entry("dir", child, {
"path": child,
"relative_path": "parent/child",
"child_count": 0,
"summary": "A child directory with stuff.",
"dominant_category": "source",
"notable_files": [],
"cached_at": "2026-01-01T00:00:00+00:00",
})
result = _get_child_summaries(parent, self.cache)
self.assertIn("parent/child/", result)
self.assertIn("A child directory with stuff.", result)
def test_hidden_dirs_ignored(self):
parent = os.path.join(self.tmp, "parent")
os.makedirs(os.path.join(parent, ".hidden"))
result = _get_child_summaries(parent, self.cache)
# .hidden is ignored, so this looks like a leaf.
self.assertIn("leaf directory", result)
# ---------------------------------------------------------------------------
# _TokenTracker._loop_turns
# ---------------------------------------------------------------------------
class TestTokenTrackerLoopTurns(unittest.TestCase):
def test_loop_turns_increments_on_record(self):
t = _TokenTracker()
self.assertEqual(t._loop_turns, 0)
t.record(SimpleNamespace(input_tokens=100, output_tokens=50))
self.assertEqual(t._loop_turns, 1)
t.record(SimpleNamespace(input_tokens=200, output_tokens=75))
self.assertEqual(t._loop_turns, 2)
def test_loop_turns_reset(self):
t = _TokenTracker()
t.record(SimpleNamespace(input_tokens=100, output_tokens=50))
t.record(SimpleNamespace(input_tokens=200, output_tokens=75))
self.assertEqual(t._loop_turns, 2)
t.reset_loop()
self.assertEqual(t._loop_turns, 0)
def test_loop_turns_independent_of_totals(self):
t = _TokenTracker()
t.record(SimpleNamespace(input_tokens=100, output_tokens=50))
t.reset_loop()
t.record(SimpleNamespace(input_tokens=300, output_tokens=100))
self.assertEqual(t._loop_turns, 1)
self.assertEqual(t.total_input, 400)
# ---------------------------------------------------------------------------
# _write_plan_evaluation
# ---------------------------------------------------------------------------
class TestWritePlanEvaluation(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.cache = _make_manager(self.tmp)
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def test_writes_evaluation_file(self):
plan = {
"priority_dirs": [
{"path": "src", "reason": "core", "suggested_turns": 18},
],
"shallow_dirs": [
{"path": "docs", "reason": "docs"},
],
"skip_dirs": [],
"investigation_order": "leaf-first",
"notes": "",
}
utilization = [
{"dir": "src", "turns_allocated": 18, "turns_used": 12,
"completeness": 0.85},
{"dir": "docs", "turns_allocated": 5, "turns_used": 3,
"completeness": 0.7},
]
_write_plan_evaluation(self.cache, plan, utilization)
import json
eval_path = os.path.join(self.cache.root, "plan_evaluation.json")
self.assertTrue(os.path.exists(eval_path))
with open(eval_path) as f:
data = json.load(f)
self.assertEqual(data["total_dirs_investigated"], 2)
self.assertEqual(data["total_turns_allocated"], 23)
self.assertEqual(data["total_turns_used"], 15)
self.assertEqual(len(data["per_directory"]), 2)
# Check that tier classification came through.
src_entry = [d for d in data["per_directory"] if d["dir"] == "src"][0]
self.assertEqual(src_entry["planned_tier"], "priority")
self.assertEqual(src_entry["completeness"], 0.85)
def test_handles_none_plan(self):
utilization = [
{"dir": "a", "turns_allocated": 10, "turns_used": 8},
]
_write_plan_evaluation(self.cache, None, utilization)
eval_path = os.path.join(self.cache.root, "plan_evaluation.json")
self.assertTrue(os.path.exists(eval_path))
def test_handles_empty_utilization(self):
plan = _default_plan()
_write_plan_evaluation(self.cache, plan, [])
import json
eval_path = os.path.join(self.cache.root, "plan_evaluation.json")
with open(eval_path) as f:
data = json.load(f)
self.assertEqual(data["total_dirs_investigated"], 0)
self.assertEqual(data["overall_utilization"], 0)
def test_zero_allocated_turns_no_division_error(self):
plan = _default_plan()
utilization = [
{"dir": "x", "turns_allocated": 0, "turns_used": 0},
]
_write_plan_evaluation(self.cache, plan, utilization)
import json
eval_path = os.path.join(self.cache.root, "plan_evaluation.json")
with open(eval_path) as f:
data = json.load(f)
self.assertEqual(data["per_directory"][0]["utilization"], 0)
# ---------------------------------------------------------------------------
# Planning tool registry
# ---------------------------------------------------------------------------
class TestPlanningToolRegistry(unittest.TestCase):
def test_submit_plan_registered(self):
names = [t["name"] for t in _PLANNING_TOOLS]
self.assertIn("submit_plan", names)
def test_submit_plan_has_required_fields(self):
tool = [t for t in _PLANNING_TOOLS if t["name"] == "submit_plan"][0]
required = tool["input_schema"]["required"]
self.assertIn("priority_dirs", required)
self.assertIn("shallow_dirs", required)
self.assertIn("skip_dirs", required)
self.assertIn("investigation_order", required)
self.assertIn("notes", required)
def test_submit_plan_order_enum(self):
tool = [t for t in _PLANNING_TOOLS if t["name"] == "submit_plan"][0]
order_prop = tool["input_schema"]["properties"]["investigation_order"]
self.assertEqual(order_prop["enum"], ["leaf-first", "priority-first"])
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()