Compare commits

..

No commits in common. "ffd9d9e929c9d352f3afd5dfb419ea1ca3634dc4" and "05fcaac755adb334340bbc72a14c7a71ca930a82" have entirely different histories.

2 changed files with 1 additions and 196 deletions

View file

@ -22,12 +22,7 @@ import magic
from luminos_lib.ast_parser import parse_structure from luminos_lib.ast_parser import parse_structure
from luminos_lib.cache import _CacheManager, _get_investigation_id from luminos_lib.cache import _CacheManager, _get_investigation_id
from luminos_lib.capabilities import check_ai_dependencies from luminos_lib.capabilities import check_ai_dependencies
from luminos_lib.prompts import ( from luminos_lib.prompts import _DIR_SYSTEM_PROMPT, _SYNTHESIS_SYSTEM_PROMPT
_DIR_SYSTEM_PROMPT,
_SURVEY_SYSTEM_PROMPT,
_SYNTHESIS_SYSTEM_PROMPT,
)
from luminos_lib.tree import build_tree, render_tree
MODEL = "claude-sonnet-4-20250514" MODEL = "claude-sonnet-4-20250514"
@ -334,51 +329,6 @@ _DIR_TOOLS = [
}, },
] ]
_SURVEY_TOOLS = [
{
"name": "submit_survey",
"description": (
"Submit the reconnaissance survey. Call exactly once."
),
"input_schema": {
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "Plain-language description of the target.",
},
"approach": {
"type": "string",
"description": "Recommended analytical approach.",
},
"relevant_tools": {
"type": "array",
"items": {"type": "string"},
"description": "Tool names the dir loop should lean on.",
},
"skip_tools": {
"type": "array",
"items": {"type": "string"},
"description": "Tool names whose use would be wrong here.",
},
"domain_notes": {
"type": "string",
"description": "Short actionable hint, or empty string.",
},
"confidence": {
"type": "number",
"description": "0.01.0 confidence in this survey.",
},
},
"required": [
"description", "approach", "relevant_tools",
"skip_tools", "domain_notes", "confidence",
],
},
},
]
_SYNTHESIS_TOOLS = [ _SYNTHESIS_TOOLS = [
{ {
"name": "read_cache", "name": "read_cache",
@ -923,119 +873,6 @@ def _block_to_dict(block):
# Synthesis pass # Synthesis pass
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _run_survey(client, target, report, tracker, max_turns=3, verbose=False):
"""Run the reconnaissance survey pass.
Returns a survey dict on success, or None on failure / out-of-turns.
Survey is advisory callers must treat None as "no survey context".
"""
categories = report.get("file_categories", {}) or {}
if categories:
ftd_lines = [
f" {cat}: {n}"
for cat, n in sorted(categories.items(), key=lambda kv: -kv[1])
]
file_type_distribution = "\n".join(ftd_lines)
else:
file_type_distribution = " (no files classified)"
try:
tree_node = build_tree(target, max_depth=2)
tree_preview = render_tree(tree_node)
except Exception:
tree_preview = "(tree unavailable)"
tool_names = [t["name"] for t in _DIR_TOOLS if t["name"] != "submit_report"]
available_tools = ", ".join(tool_names)
system = _SURVEY_SYSTEM_PROMPT.format(
target=target,
file_type_distribution=file_type_distribution,
tree_preview=tree_preview,
available_tools=available_tools,
)
messages = [
{
"role": "user",
"content": (
"All inputs are in the system prompt above. Call "
"submit_survey now — no other tool calls needed."
),
},
]
survey = None
for turn in range(max_turns):
try:
content_blocks, _usage = _call_api_streaming(
client, system, messages, _SURVEY_TOOLS, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
return None
for b in content_blocks:
if b.type == "text" and b.text.strip():
for line in b.text.strip().split("\n"):
print(f" [AI] {line}", file=sys.stderr)
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items()
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_survey.",
})
continue
tool_results = []
done = False
for tu in tool_uses:
if tu.name == "submit_survey":
survey = {
"description": tu.input.get("description", ""),
"approach": tu.input.get("approach", ""),
"relevant_tools": tu.input.get("relevant_tools", []) or [],
"skip_tools": tu.input.get("skip_tools", []) or [],
"domain_notes": tu.input.get("domain_notes", ""),
"confidence": float(tu.input.get("confidence", 0.0) or 0.0),
}
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Survey received. Thank you.",
})
done = True
else:
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Unknown tool. Call submit_survey.",
"is_error": True,
})
messages.append({"role": "user", "content": tool_results})
if done:
break
else:
print(" [AI] Warning: survey ran out of turns.", file=sys.stderr)
return survey
def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False): def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False):
"""Run the final synthesis pass. Returns (brief, detailed).""" """Run the final synthesis pass. Returns (brief, detailed)."""
dir_entries = cache.read_all_entries("dir") dir_entries = cache.read_all_entries("dir")
@ -1179,29 +1016,6 @@ def _run_investigation(client, target, report, show_hidden=False,
f"{'' if is_new else ' (resumed)'}", file=sys.stderr) f"{'' if is_new else ' (resumed)'}", file=sys.stderr)
print(f" [AI] Cache: {cache.root}/", file=sys.stderr) print(f" [AI] Cache: {cache.root}/", file=sys.stderr)
print(" [AI] Survey pass...", file=sys.stderr)
survey = _run_survey(client, target, report, tracker, verbose=verbose)
if survey:
print(
f" [AI] Survey: {survey['description']} "
f"(confidence {survey['confidence']:.2f})",
file=sys.stderr,
)
if survey.get("domain_notes"):
print(f" [AI] Survey notes: {survey['domain_notes']}", file=sys.stderr)
if survey.get("relevant_tools"):
print(
f" [AI] Survey relevant_tools: {', '.join(survey['relevant_tools'])}",
file=sys.stderr,
)
if survey.get("skip_tools"):
print(
f" [AI] Survey skip_tools: {', '.join(survey['skip_tools'])}",
file=sys.stderr,
)
else:
print(" [AI] Survey unavailable — proceeding without it.", file=sys.stderr)
all_dirs = _discover_directories(target, show_hidden=show_hidden, all_dirs = _discover_directories(target, show_hidden=show_hidden,
exclude=exclude) exclude=exclude)

View file

@ -131,15 +131,6 @@ You have exactly two signals. Do not ask for more.
File type distribution (counts by category): File type distribution (counts by category):
{file_type_distribution} {file_type_distribution}
IMPORTANT: the file type distribution is produced by a classifier
that is biased toward source code. Its categories are: source,
config, data, document, media, archive, unknown. It has NO concept
of mail, notebooks, calendars, contacts, ledgers, photo libraries,
or other personal-data domains anything text-shaped tends to be
labeled `source` even when it is not code. If the tree preview
suggests a non-code target, trust the tree over the histogram and
say so in `domain_notes`.
Top-level tree (2 levels deep): Top-level tree (2 levels deep):
{tree_preview} {tree_preview}