diff --git a/luminos_lib/ai.py b/luminos_lib/ai.py index 1c9a4d2..38c707c 100644 --- a/luminos_lib/ai.py +++ b/luminos_lib/ai.py @@ -22,7 +22,12 @@ import magic from luminos_lib.ast_parser import parse_structure from luminos_lib.cache import _CacheManager, _get_investigation_id from luminos_lib.capabilities import check_ai_dependencies -from luminos_lib.prompts import _DIR_SYSTEM_PROMPT, _SYNTHESIS_SYSTEM_PROMPT +from luminos_lib.prompts import ( + _DIR_SYSTEM_PROMPT, + _SURVEY_SYSTEM_PROMPT, + _SYNTHESIS_SYSTEM_PROMPT, +) +from luminos_lib.tree import build_tree, render_tree MODEL = "claude-sonnet-4-20250514" @@ -329,6 +334,51 @@ _DIR_TOOLS = [ }, ] +_SURVEY_TOOLS = [ + { + "name": "submit_survey", + "description": ( + "Submit the reconnaissance survey. Call exactly once." + ), + "input_schema": { + "type": "object", + "properties": { + "description": { + "type": "string", + "description": "Plain-language description of the target.", + }, + "approach": { + "type": "string", + "description": "Recommended analytical approach.", + }, + "relevant_tools": { + "type": "array", + "items": {"type": "string"}, + "description": "Tool names the dir loop should lean on.", + }, + "skip_tools": { + "type": "array", + "items": {"type": "string"}, + "description": "Tool names whose use would be wrong here.", + }, + "domain_notes": { + "type": "string", + "description": "Short actionable hint, or empty string.", + }, + "confidence": { + "type": "number", + "description": "0.0–1.0 confidence in this survey.", + }, + }, + "required": [ + "description", "approach", "relevant_tools", + "skip_tools", "domain_notes", "confidence", + ], + }, + }, +] + + _SYNTHESIS_TOOLS = [ { "name": "read_cache", @@ -873,6 +923,119 @@ def _block_to_dict(block): # Synthesis pass # --------------------------------------------------------------------------- +def _run_survey(client, target, report, tracker, max_turns=3, verbose=False): + """Run the reconnaissance survey pass. + + Returns a survey dict on success, or None on failure / out-of-turns. + Survey is advisory — callers must treat None as "no survey context". + """ + categories = report.get("file_categories", {}) or {} + if categories: + ftd_lines = [ + f" {cat}: {n}" + for cat, n in sorted(categories.items(), key=lambda kv: -kv[1]) + ] + file_type_distribution = "\n".join(ftd_lines) + else: + file_type_distribution = " (no files classified)" + + try: + tree_node = build_tree(target, max_depth=2) + tree_preview = render_tree(tree_node) + except Exception: + tree_preview = "(tree unavailable)" + + tool_names = [t["name"] for t in _DIR_TOOLS if t["name"] != "submit_report"] + available_tools = ", ".join(tool_names) + + system = _SURVEY_SYSTEM_PROMPT.format( + target=target, + file_type_distribution=file_type_distribution, + tree_preview=tree_preview, + available_tools=available_tools, + ) + + messages = [ + { + "role": "user", + "content": ( + "All inputs are in the system prompt above. Call " + "submit_survey now — no other tool calls needed." + ), + }, + ] + + survey = None + + for turn in range(max_turns): + try: + content_blocks, _usage = _call_api_streaming( + client, system, messages, _SURVEY_TOOLS, tracker, + ) + except anthropic.APIError as e: + print(f" [AI] API error: {e}", file=sys.stderr) + return None + + for b in content_blocks: + if b.type == "text" and b.text.strip(): + for line in b.text.strip().split("\n"): + print(f" [AI] {line}", file=sys.stderr) + + tool_uses = [b for b in content_blocks if b.type == "tool_use"] + for tu in tool_uses: + arg_summary = ", ".join( + f"{k}={v!r}" for k, v in tu.input.items() + ) if tu.input else "" + print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr) + + messages.append({ + "role": "assistant", + "content": [_block_to_dict(b) for b in content_blocks], + }) + + if not tool_uses: + messages.append({ + "role": "user", + "content": "Please call submit_survey.", + }) + continue + + tool_results = [] + done = False + for tu in tool_uses: + if tu.name == "submit_survey": + survey = { + "description": tu.input.get("description", ""), + "approach": tu.input.get("approach", ""), + "relevant_tools": tu.input.get("relevant_tools", []) or [], + "skip_tools": tu.input.get("skip_tools", []) or [], + "domain_notes": tu.input.get("domain_notes", ""), + "confidence": float(tu.input.get("confidence", 0.0) or 0.0), + } + tool_results.append({ + "type": "tool_result", + "tool_use_id": tu.id, + "content": "Survey received. Thank you.", + }) + done = True + else: + tool_results.append({ + "type": "tool_result", + "tool_use_id": tu.id, + "content": "Unknown tool. Call submit_survey.", + "is_error": True, + }) + + messages.append({"role": "user", "content": tool_results}) + + if done: + break + else: + print(" [AI] Warning: survey ran out of turns.", file=sys.stderr) + + return survey + + def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False): """Run the final synthesis pass. Returns (brief, detailed).""" dir_entries = cache.read_all_entries("dir") @@ -1016,6 +1179,29 @@ def _run_investigation(client, target, report, show_hidden=False, f"{'' if is_new else ' (resumed)'}", file=sys.stderr) print(f" [AI] Cache: {cache.root}/", file=sys.stderr) + print(" [AI] Survey pass...", file=sys.stderr) + survey = _run_survey(client, target, report, tracker, verbose=verbose) + if survey: + print( + f" [AI] Survey: {survey['description']} " + f"(confidence {survey['confidence']:.2f})", + file=sys.stderr, + ) + if survey.get("domain_notes"): + print(f" [AI] Survey notes: {survey['domain_notes']}", file=sys.stderr) + if survey.get("relevant_tools"): + print( + f" [AI] Survey relevant_tools: {', '.join(survey['relevant_tools'])}", + file=sys.stderr, + ) + if survey.get("skip_tools"): + print( + f" [AI] Survey skip_tools: {', '.join(survey['skip_tools'])}", + file=sys.stderr, + ) + else: + print(" [AI] Survey unavailable — proceeding without it.", file=sys.stderr) + all_dirs = _discover_directories(target, show_hidden=show_hidden, exclude=exclude) diff --git a/luminos_lib/prompts.py b/luminos_lib/prompts.py index 9268b1f..c695d3f 100644 --- a/luminos_lib/prompts.py +++ b/luminos_lib/prompts.py @@ -131,6 +131,15 @@ You have exactly two signals. Do not ask for more. File type distribution (counts by category): {file_type_distribution} +IMPORTANT: the file type distribution is produced by a classifier +that is biased toward source code. Its categories are: source, +config, data, document, media, archive, unknown. It has NO concept +of mail, notebooks, calendars, contacts, ledgers, photo libraries, +or other personal-data domains — anything text-shaped tends to be +labeled `source` even when it is not code. If the tree preview +suggests a non-code target, trust the tree over the histogram and +say so in `domain_notes`. + Top-level tree (2 levels deep): {tree_preview}