feat(ai): add _run_survey() and submit_survey tool (#5)

Adds the reconnaissance survey pass: a fast, ≤3-turn LLM call that
characterizes the target before any directory investigation begins.
The survey receives the file-type distribution (from the base scan),
a top-2-level tree preview, and the list of available dir-loop tools,
and returns description / approach / relevant_tools / skip_tools /
domain_notes / confidence via a single submit_survey tool call.

Wired into _run_investigation() before the directory loop. Output is
logged but not yet consumed — that wiring is #6. Survey failure is
non-fatal: if the call errors or runs out of turns, the investigation
proceeds without survey context.

Also adds a Band-Aid to _SURVEY_SYSTEM_PROMPT warning the LLM that
the file-type histogram is biased toward source code (the underlying
classifier has no concept of mail, notebooks, ledgers, etc.) and to
trust the tree preview when they conflict. The proper fix is #42.
This commit is contained in:
Jeff Smith 2026-04-06 21:49:59 -06:00
parent 05fcaac755
commit fecb24d6e1
2 changed files with 196 additions and 1 deletions

View file

@ -22,7 +22,12 @@ import magic
from luminos_lib.ast_parser import parse_structure
from luminos_lib.cache import _CacheManager, _get_investigation_id
from luminos_lib.capabilities import check_ai_dependencies
from luminos_lib.prompts import _DIR_SYSTEM_PROMPT, _SYNTHESIS_SYSTEM_PROMPT
from luminos_lib.prompts import (
_DIR_SYSTEM_PROMPT,
_SURVEY_SYSTEM_PROMPT,
_SYNTHESIS_SYSTEM_PROMPT,
)
from luminos_lib.tree import build_tree, render_tree
MODEL = "claude-sonnet-4-20250514"
@ -329,6 +334,51 @@ _DIR_TOOLS = [
},
]
_SURVEY_TOOLS = [
{
"name": "submit_survey",
"description": (
"Submit the reconnaissance survey. Call exactly once."
),
"input_schema": {
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "Plain-language description of the target.",
},
"approach": {
"type": "string",
"description": "Recommended analytical approach.",
},
"relevant_tools": {
"type": "array",
"items": {"type": "string"},
"description": "Tool names the dir loop should lean on.",
},
"skip_tools": {
"type": "array",
"items": {"type": "string"},
"description": "Tool names whose use would be wrong here.",
},
"domain_notes": {
"type": "string",
"description": "Short actionable hint, or empty string.",
},
"confidence": {
"type": "number",
"description": "0.01.0 confidence in this survey.",
},
},
"required": [
"description", "approach", "relevant_tools",
"skip_tools", "domain_notes", "confidence",
],
},
},
]
_SYNTHESIS_TOOLS = [
{
"name": "read_cache",
@ -873,6 +923,119 @@ def _block_to_dict(block):
# Synthesis pass
# ---------------------------------------------------------------------------
def _run_survey(client, target, report, tracker, max_turns=3, verbose=False):
"""Run the reconnaissance survey pass.
Returns a survey dict on success, or None on failure / out-of-turns.
Survey is advisory callers must treat None as "no survey context".
"""
categories = report.get("file_categories", {}) or {}
if categories:
ftd_lines = [
f" {cat}: {n}"
for cat, n in sorted(categories.items(), key=lambda kv: -kv[1])
]
file_type_distribution = "\n".join(ftd_lines)
else:
file_type_distribution = " (no files classified)"
try:
tree_node = build_tree(target, max_depth=2)
tree_preview = render_tree(tree_node)
except Exception:
tree_preview = "(tree unavailable)"
tool_names = [t["name"] for t in _DIR_TOOLS if t["name"] != "submit_report"]
available_tools = ", ".join(tool_names)
system = _SURVEY_SYSTEM_PROMPT.format(
target=target,
file_type_distribution=file_type_distribution,
tree_preview=tree_preview,
available_tools=available_tools,
)
messages = [
{
"role": "user",
"content": (
"All inputs are in the system prompt above. Call "
"submit_survey now — no other tool calls needed."
),
},
]
survey = None
for turn in range(max_turns):
try:
content_blocks, _usage = _call_api_streaming(
client, system, messages, _SURVEY_TOOLS, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
return None
for b in content_blocks:
if b.type == "text" and b.text.strip():
for line in b.text.strip().split("\n"):
print(f" [AI] {line}", file=sys.stderr)
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items()
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_survey.",
})
continue
tool_results = []
done = False
for tu in tool_uses:
if tu.name == "submit_survey":
survey = {
"description": tu.input.get("description", ""),
"approach": tu.input.get("approach", ""),
"relevant_tools": tu.input.get("relevant_tools", []) or [],
"skip_tools": tu.input.get("skip_tools", []) or [],
"domain_notes": tu.input.get("domain_notes", ""),
"confidence": float(tu.input.get("confidence", 0.0) or 0.0),
}
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Survey received. Thank you.",
})
done = True
else:
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Unknown tool. Call submit_survey.",
"is_error": True,
})
messages.append({"role": "user", "content": tool_results})
if done:
break
else:
print(" [AI] Warning: survey ran out of turns.", file=sys.stderr)
return survey
def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False):
"""Run the final synthesis pass. Returns (brief, detailed)."""
dir_entries = cache.read_all_entries("dir")
@ -1016,6 +1179,29 @@ def _run_investigation(client, target, report, show_hidden=False,
f"{'' if is_new else ' (resumed)'}", file=sys.stderr)
print(f" [AI] Cache: {cache.root}/", file=sys.stderr)
print(" [AI] Survey pass...", file=sys.stderr)
survey = _run_survey(client, target, report, tracker, verbose=verbose)
if survey:
print(
f" [AI] Survey: {survey['description']} "
f"(confidence {survey['confidence']:.2f})",
file=sys.stderr,
)
if survey.get("domain_notes"):
print(f" [AI] Survey notes: {survey['domain_notes']}", file=sys.stderr)
if survey.get("relevant_tools"):
print(
f" [AI] Survey relevant_tools: {', '.join(survey['relevant_tools'])}",
file=sys.stderr,
)
if survey.get("skip_tools"):
print(
f" [AI] Survey skip_tools: {', '.join(survey['skip_tools'])}",
file=sys.stderr,
)
else:
print(" [AI] Survey unavailable — proceeding without it.", file=sys.stderr)
all_dirs = _discover_directories(target, show_hidden=show_hidden,
exclude=exclude)

View file

@ -131,6 +131,15 @@ You have exactly two signals. Do not ask for more.
File type distribution (counts by category):
{file_type_distribution}
IMPORTANT: the file type distribution is produced by a classifier
that is biased toward source code. Its categories are: source,
config, data, document, media, archive, unknown. It has NO concept
of mail, notebooks, calendars, contacts, ledgers, photo libraries,
or other personal-data domains anything text-shaped tends to be
labeled `source` even when it is not code. If the tree preview
suggests a non-code target, trust the tree over the histogram and
say so in `domain_notes`.
Top-level tree (2 levels deep):
{tree_preview}