Compare commits
10 commits
8aa6c713db
...
d323190866
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d323190866 | ||
|
|
78f9a396dd | ||
|
|
78f80c31ed | ||
|
|
206d2d34f6 | ||
|
|
bbaf387cb7 | ||
|
|
ebc6b852f1 | ||
|
|
33df555a8c | ||
|
|
ea8c07a692 | ||
|
|
5c6124a715 | ||
|
|
0c49da23ab |
9 changed files with 510 additions and 326 deletions
68
luminos.py
68
luminos.py
|
|
@ -3,8 +3,9 @@
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import json
|
import json
|
||||||
import sys
|
|
||||||
import os
|
import os
|
||||||
|
import shutil
|
||||||
|
import sys
|
||||||
|
|
||||||
from luminos_lib.tree import build_tree, render_tree
|
from luminos_lib.tree import build_tree, render_tree
|
||||||
from luminos_lib.filetypes import classify_files, summarize_categories
|
from luminos_lib.filetypes import classify_files, summarize_categories
|
||||||
|
|
@ -15,29 +16,67 @@ from luminos_lib.watch import watch_loop
|
||||||
from luminos_lib.report import format_report
|
from luminos_lib.report import format_report
|
||||||
|
|
||||||
|
|
||||||
def scan(target, depth=3, show_hidden=False):
|
def _progress(label):
|
||||||
|
"""Return (on_file, finish) for in-place per-file progress on stderr.
|
||||||
|
|
||||||
|
on_file(path) overwrites the current line with the label and truncated path.
|
||||||
|
finish() finalises the line with a newline.
|
||||||
|
"""
|
||||||
|
cols = shutil.get_terminal_size((80, 20)).columns
|
||||||
|
prefix = f" [scan] {label}... "
|
||||||
|
available = max(cols - len(prefix), 10)
|
||||||
|
|
||||||
|
def on_file(path):
|
||||||
|
rel = os.path.relpath(path)
|
||||||
|
if len(rel) > available:
|
||||||
|
rel = "..." + rel[-(available - 3):]
|
||||||
|
print(f"\r{prefix}{rel}\033[K", end="", file=sys.stderr, flush=True)
|
||||||
|
|
||||||
|
def finish():
|
||||||
|
print(f"\r{prefix}done\033[K", file=sys.stderr, flush=True)
|
||||||
|
|
||||||
|
return on_file, finish
|
||||||
|
|
||||||
|
|
||||||
|
def scan(target, depth=3, show_hidden=False, exclude=None):
|
||||||
"""Run all analyses on the target directory and return a report dict."""
|
"""Run all analyses on the target directory and return a report dict."""
|
||||||
report = {}
|
report = {}
|
||||||
|
|
||||||
tree = build_tree(target, max_depth=depth, show_hidden=show_hidden)
|
exclude = exclude or []
|
||||||
|
|
||||||
|
print(f" [scan] Building directory tree (depth={depth})...", file=sys.stderr)
|
||||||
|
tree = build_tree(target, max_depth=depth, show_hidden=show_hidden,
|
||||||
|
exclude=exclude)
|
||||||
report["tree"] = tree
|
report["tree"] = tree
|
||||||
report["tree_rendered"] = render_tree(tree)
|
report["tree_rendered"] = render_tree(tree)
|
||||||
|
|
||||||
classified = classify_files(target, show_hidden=show_hidden)
|
on_file, finish = _progress("Classifying files")
|
||||||
|
classified = classify_files(target, show_hidden=show_hidden,
|
||||||
|
exclude=exclude, on_file=on_file)
|
||||||
|
finish()
|
||||||
report["file_categories"] = summarize_categories(classified)
|
report["file_categories"] = summarize_categories(classified)
|
||||||
report["classified_files"] = classified
|
report["classified_files"] = classified
|
||||||
|
|
||||||
languages, loc = detect_languages(classified)
|
on_file, finish = _progress("Counting lines")
|
||||||
|
languages, loc = detect_languages(classified, on_file=on_file)
|
||||||
|
finish()
|
||||||
report["languages"] = languages
|
report["languages"] = languages
|
||||||
report["lines_of_code"] = loc
|
report["lines_of_code"] = loc
|
||||||
report["large_files"] = find_large_files(classified)
|
|
||||||
|
|
||||||
report["recent_files"] = find_recent_files(target, show_hidden=show_hidden)
|
on_file, finish = _progress("Checking for large files")
|
||||||
|
report["large_files"] = find_large_files(classified, on_file=on_file)
|
||||||
|
finish()
|
||||||
|
|
||||||
usage = get_disk_usage(target, show_hidden=show_hidden)
|
print(" [scan] Finding recently modified files...", file=sys.stderr)
|
||||||
|
report["recent_files"] = find_recent_files(target, show_hidden=show_hidden,
|
||||||
|
exclude=exclude)
|
||||||
|
|
||||||
|
print(" [scan] Calculating disk usage...", file=sys.stderr)
|
||||||
|
usage = get_disk_usage(target, show_hidden=show_hidden, exclude=exclude)
|
||||||
report["disk_usage"] = usage
|
report["disk_usage"] = usage
|
||||||
report["top_directories"] = top_directories(usage, n=5)
|
report["top_directories"] = top_directories(usage, n=5)
|
||||||
|
|
||||||
|
print(" [scan] Base scan complete.", file=sys.stderr)
|
||||||
return report
|
return report
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -67,6 +106,10 @@ def main():
|
||||||
help="Force a new AI investigation (ignore cached results)")
|
help="Force a new AI investigation (ignore cached results)")
|
||||||
parser.add_argument("--install-extras", action="store_true",
|
parser.add_argument("--install-extras", action="store_true",
|
||||||
help="Show status of optional AI dependencies")
|
help="Show status of optional AI dependencies")
|
||||||
|
parser.add_argument("-x", "--exclude", metavar="DIR", action="append",
|
||||||
|
default=[],
|
||||||
|
help="Exclude a directory name from scan and analysis "
|
||||||
|
"(repeatable, e.g. -x .git -x node_modules)")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
|
@ -92,17 +135,22 @@ def main():
|
||||||
file=sys.stderr)
|
file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
if args.exclude:
|
||||||
|
print(f" [scan] Excluding: {', '.join(args.exclude)}", file=sys.stderr)
|
||||||
|
|
||||||
if args.watch:
|
if args.watch:
|
||||||
watch_loop(target, depth=args.depth, show_hidden=args.all,
|
watch_loop(target, depth=args.depth, show_hidden=args.all,
|
||||||
json_output=args.json_output)
|
json_output=args.json_output)
|
||||||
return
|
return
|
||||||
|
|
||||||
report = scan(target, depth=args.depth, show_hidden=args.all)
|
report = scan(target, depth=args.depth, show_hidden=args.all,
|
||||||
|
exclude=args.exclude)
|
||||||
|
|
||||||
flags = []
|
flags = []
|
||||||
if args.ai:
|
if args.ai:
|
||||||
from luminos_lib.ai import analyze_directory
|
from luminos_lib.ai import analyze_directory
|
||||||
brief, detailed, flags = analyze_directory(report, target, fresh=args.fresh)
|
brief, detailed, flags = analyze_directory(
|
||||||
|
report, target, fresh=args.fresh, exclude=args.exclude)
|
||||||
report["ai_brief"] = brief
|
report["ai_brief"] = brief
|
||||||
report["ai_detailed"] = detailed
|
report["ai_detailed"] = detailed
|
||||||
report["flags"] = flags
|
report["flags"] = flags
|
||||||
|
|
|
||||||
|
|
@ -19,14 +19,10 @@ from datetime import datetime, timezone
|
||||||
|
|
||||||
import anthropic
|
import anthropic
|
||||||
import magic
|
import magic
|
||||||
import tree_sitter
|
from luminos_lib.ast_parser import parse_structure
|
||||||
import tree_sitter_python
|
|
||||||
import tree_sitter_javascript
|
|
||||||
import tree_sitter_rust
|
|
||||||
import tree_sitter_go
|
|
||||||
|
|
||||||
from luminos_lib.cache import _CacheManager, _get_investigation_id
|
from luminos_lib.cache import _CacheManager, _get_investigation_id
|
||||||
from luminos_lib.capabilities import check_ai_dependencies
|
from luminos_lib.capabilities import check_ai_dependencies
|
||||||
|
from luminos_lib.prompts import _DIR_SYSTEM_PROMPT, _SYNTHESIS_SYSTEM_PROMPT
|
||||||
|
|
||||||
MODEL = "claude-sonnet-4-20250514"
|
MODEL = "claude-sonnet-4-20250514"
|
||||||
|
|
||||||
|
|
@ -48,33 +44,6 @@ _SKIP_DIRS = {
|
||||||
# Commands the run_command tool is allowed to execute.
|
# Commands the run_command tool is allowed to execute.
|
||||||
_COMMAND_WHITELIST = {"wc", "file", "grep", "head", "tail", "stat", "du", "find"}
|
_COMMAND_WHITELIST = {"wc", "file", "grep", "head", "tail", "stat", "du", "find"}
|
||||||
|
|
||||||
# tree-sitter language registry: extension → (grammar_module, language_name)
|
|
||||||
_TS_LANGUAGES = {
|
|
||||||
".py": (tree_sitter_python, "python"),
|
|
||||||
".js": (tree_sitter_javascript, "javascript"),
|
|
||||||
".jsx": (tree_sitter_javascript, "javascript"),
|
|
||||||
".mjs": (tree_sitter_javascript, "javascript"),
|
|
||||||
".rs": (tree_sitter_rust, "rust"),
|
|
||||||
".go": (tree_sitter_go, "go"),
|
|
||||||
}
|
|
||||||
|
|
||||||
# Precompute Language objects once.
|
|
||||||
_TS_LANG_CACHE = {}
|
|
||||||
|
|
||||||
|
|
||||||
def _get_ts_parser(ext):
|
|
||||||
"""Return a (Parser, language_name) tuple for a file extension, or None."""
|
|
||||||
entry = _TS_LANGUAGES.get(ext)
|
|
||||||
if entry is None:
|
|
||||||
return None
|
|
||||||
module, lang_name = entry
|
|
||||||
if lang_name not in _TS_LANG_CACHE:
|
|
||||||
_TS_LANG_CACHE[lang_name] = tree_sitter.Language(module.language())
|
|
||||||
lang = _TS_LANG_CACHE[lang_name]
|
|
||||||
parser = tree_sitter.Parser(lang)
|
|
||||||
return parser, lang_name
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Helpers
|
# Helpers
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
@ -533,181 +502,7 @@ def _tool_parse_structure(args, target, _cache):
|
||||||
path = os.path.join(target, path)
|
path = os.path.join(target, path)
|
||||||
if not _path_is_safe(path, target):
|
if not _path_is_safe(path, target):
|
||||||
return f"Error: path '{path}' is outside the target directory."
|
return f"Error: path '{path}' is outside the target directory."
|
||||||
if not os.path.isfile(path):
|
return parse_structure(path)
|
||||||
return f"Error: '{path}' is not a file."
|
|
||||||
|
|
||||||
ext = os.path.splitext(path)[1].lower()
|
|
||||||
ts = _get_ts_parser(ext)
|
|
||||||
if ts is None:
|
|
||||||
return f"Error: no grammar for extension '{ext}'. Supported: {', '.join(sorted(_TS_LANGUAGES.keys()))}"
|
|
||||||
|
|
||||||
parser, lang_name = ts
|
|
||||||
|
|
||||||
try:
|
|
||||||
with open(path, "rb") as f:
|
|
||||||
source = f.read()
|
|
||||||
except OSError as e:
|
|
||||||
return f"Error reading file: {e}"
|
|
||||||
|
|
||||||
tree = parser.parse(source)
|
|
||||||
root = tree.root_node
|
|
||||||
source_text = source.decode("utf-8", errors="replace")
|
|
||||||
lines = source_text.split("\n")
|
|
||||||
line_count = len(lines)
|
|
||||||
|
|
||||||
functions = []
|
|
||||||
classes = []
|
|
||||||
imports = []
|
|
||||||
has_docstrings = False
|
|
||||||
comment_lines = 0
|
|
||||||
|
|
||||||
def _walk(node):
|
|
||||||
nonlocal has_docstrings, comment_lines
|
|
||||||
for child in node.children:
|
|
||||||
ntype = child.type
|
|
||||||
|
|
||||||
# Comments
|
|
||||||
if ntype in ("comment", "line_comment", "block_comment"):
|
|
||||||
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
|
|
||||||
|
|
||||||
# Python
|
|
||||||
if lang_name == "python":
|
|
||||||
if ntype == "function_definition":
|
|
||||||
functions.append(_py_func_sig(child))
|
|
||||||
elif ntype == "class_definition":
|
|
||||||
classes.append(_py_class(child))
|
|
||||||
elif ntype in ("import_statement", "import_from_statement"):
|
|
||||||
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
|
||||||
elif ntype == "expression_statement":
|
|
||||||
first = child.children[0] if child.children else None
|
|
||||||
if first and first.type == "string":
|
|
||||||
has_docstrings = True
|
|
||||||
|
|
||||||
# JavaScript
|
|
||||||
elif lang_name == "javascript":
|
|
||||||
if ntype in ("function_declaration", "arrow_function",
|
|
||||||
"function"):
|
|
||||||
functions.append(_js_func_sig(child))
|
|
||||||
elif ntype == "class_declaration":
|
|
||||||
classes.append(_js_class(child))
|
|
||||||
elif ntype in ("import_statement",):
|
|
||||||
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
|
||||||
|
|
||||||
# Rust
|
|
||||||
elif lang_name == "rust":
|
|
||||||
if ntype == "function_item":
|
|
||||||
functions.append(_rust_func_sig(child))
|
|
||||||
elif ntype in ("struct_item", "enum_item", "impl_item"):
|
|
||||||
classes.append(_rust_struct(child))
|
|
||||||
elif ntype == "use_declaration":
|
|
||||||
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
|
||||||
|
|
||||||
# Go
|
|
||||||
elif lang_name == "go":
|
|
||||||
if ntype == "function_declaration":
|
|
||||||
functions.append(_go_func_sig(child))
|
|
||||||
elif ntype == "type_declaration":
|
|
||||||
classes.append(_go_type(child))
|
|
||||||
elif ntype == "import_declaration":
|
|
||||||
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
|
||||||
|
|
||||||
_walk(child)
|
|
||||||
|
|
||||||
_walk(root)
|
|
||||||
|
|
||||||
code_lines = max(1, line_count - comment_lines)
|
|
||||||
result = {
|
|
||||||
"language": lang_name,
|
|
||||||
"functions": functions[:50],
|
|
||||||
"classes": classes[:30],
|
|
||||||
"imports": imports[:30],
|
|
||||||
"line_count": line_count,
|
|
||||||
"has_docstrings": has_docstrings,
|
|
||||||
"has_comments": comment_lines > 0,
|
|
||||||
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
|
|
||||||
}
|
|
||||||
return json.dumps(result, indent=2)
|
|
||||||
|
|
||||||
|
|
||||||
# --- tree-sitter extraction helpers ---
|
|
||||||
|
|
||||||
def _child_by_type(node, *types):
|
|
||||||
for c in node.children:
|
|
||||||
if c.type in types:
|
|
||||||
return c
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _text(node):
|
|
||||||
return node.text.decode("utf-8", errors="replace") if node else ""
|
|
||||||
|
|
||||||
|
|
||||||
def _py_func_sig(node):
|
|
||||||
name = _text(_child_by_type(node, "identifier"))
|
|
||||||
params = _text(_child_by_type(node, "parameters"))
|
|
||||||
ret = _child_by_type(node, "type")
|
|
||||||
sig = f"{name}{params}"
|
|
||||||
if ret:
|
|
||||||
sig += f" -> {_text(ret)}"
|
|
||||||
return sig
|
|
||||||
|
|
||||||
|
|
||||||
def _py_class(node):
|
|
||||||
name = _text(_child_by_type(node, "identifier"))
|
|
||||||
methods = []
|
|
||||||
body = _child_by_type(node, "block")
|
|
||||||
if body:
|
|
||||||
for child in body.children:
|
|
||||||
if child.type == "function_definition":
|
|
||||||
methods.append(_py_func_sig(child))
|
|
||||||
return {"name": name, "methods": methods[:20]}
|
|
||||||
|
|
||||||
|
|
||||||
def _js_func_sig(node):
|
|
||||||
name = _text(_child_by_type(node, "identifier"))
|
|
||||||
params = _text(_child_by_type(node, "formal_parameters"))
|
|
||||||
return f"{name}{params}" if name else f"(anonymous){params}"
|
|
||||||
|
|
||||||
|
|
||||||
def _js_class(node):
|
|
||||||
name = _text(_child_by_type(node, "identifier"))
|
|
||||||
methods = []
|
|
||||||
body = _child_by_type(node, "class_body")
|
|
||||||
if body:
|
|
||||||
for child in body.children:
|
|
||||||
if child.type == "method_definition":
|
|
||||||
mname = _text(_child_by_type(child, "property_identifier"))
|
|
||||||
mparams = _text(_child_by_type(child, "formal_parameters"))
|
|
||||||
methods.append(f"{mname}{mparams}")
|
|
||||||
return {"name": name, "methods": methods[:20]}
|
|
||||||
|
|
||||||
|
|
||||||
def _rust_func_sig(node):
|
|
||||||
name = _text(_child_by_type(node, "identifier"))
|
|
||||||
params = _text(_child_by_type(node, "parameters"))
|
|
||||||
ret = _child_by_type(node, "type_identifier", "generic_type",
|
|
||||||
"reference_type", "scoped_type_identifier")
|
|
||||||
sig = f"{name}{params}"
|
|
||||||
if ret:
|
|
||||||
sig += f" -> {_text(ret)}"
|
|
||||||
return sig
|
|
||||||
|
|
||||||
|
|
||||||
def _rust_struct(node):
|
|
||||||
name = _text(_child_by_type(node, "type_identifier"))
|
|
||||||
return {"name": name or _text(node)[:60], "methods": []}
|
|
||||||
|
|
||||||
|
|
||||||
def _go_func_sig(node):
|
|
||||||
name = _text(_child_by_type(node, "identifier"))
|
|
||||||
params = _text(_child_by_type(node, "parameter_list"))
|
|
||||||
return f"{name}{params}"
|
|
||||||
|
|
||||||
|
|
||||||
def _go_type(node):
|
|
||||||
spec = _child_by_type(node, "type_spec")
|
|
||||||
name = _text(_child_by_type(spec, "type_identifier")) if spec else ""
|
|
||||||
return {"name": name or _text(node)[:60], "methods": []}
|
|
||||||
|
|
||||||
|
|
||||||
def _tool_write_cache(args, _target, cache):
|
def _tool_write_cache(args, _target, cache):
|
||||||
|
|
@ -848,14 +643,16 @@ def _call_api_streaming(client, system, messages, tools, tracker):
|
||||||
# Directory discovery
|
# Directory discovery
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def _discover_directories(target, show_hidden=False):
|
def _discover_directories(target, show_hidden=False, exclude=None):
|
||||||
"""Walk the target and return all directories sorted leaves-first."""
|
"""Walk the target and return all directories sorted leaves-first."""
|
||||||
|
extra = set(exclude or [])
|
||||||
dirs = []
|
dirs = []
|
||||||
target_real = os.path.realpath(target)
|
target_real = os.path.realpath(target)
|
||||||
for root, subdirs, _files in os.walk(target_real, topdown=True):
|
for root, subdirs, _files in os.walk(target_real, topdown=True):
|
||||||
subdirs[:] = [
|
subdirs[:] = [
|
||||||
d for d in subdirs
|
d for d in subdirs
|
||||||
if not _should_skip_dir(d)
|
if not _should_skip_dir(d)
|
||||||
|
and d not in extra
|
||||||
and (show_hidden or not d.startswith("."))
|
and (show_hidden or not d.startswith("."))
|
||||||
]
|
]
|
||||||
dirs.append(root)
|
dirs.append(root)
|
||||||
|
|
@ -867,74 +664,6 @@ def _discover_directories(target, show_hidden=False):
|
||||||
# Per-directory agent loop
|
# Per-directory agent loop
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
_DIR_SYSTEM_PROMPT = """\
|
|
||||||
You are an expert analyst investigating a SINGLE directory on a file system.
|
|
||||||
Do NOT assume the type of content before investigating. Discover what this
|
|
||||||
directory contains from what you find.
|
|
||||||
|
|
||||||
## Your Task
|
|
||||||
Investigate the directory: {dir_path}
|
|
||||||
(relative to target: {dir_rel})
|
|
||||||
|
|
||||||
You must:
|
|
||||||
1. Read the important files in THIS directory (not subdirectories)
|
|
||||||
2. For each file you read, call write_cache to save a summary
|
|
||||||
3. Call write_cache for the directory itself with a synthesis
|
|
||||||
4. Call submit_report with a 1-3 sentence summary
|
|
||||||
|
|
||||||
## Tools
|
|
||||||
parse_structure gives you the skeleton of a file. It does NOT replace \
|
|
||||||
reading the file. Use parse_structure first to understand structure, then \
|
|
||||||
use read_file if you need to verify intent, check for anomalies, or \
|
|
||||||
understand content that structure cannot capture (comments, documentation, \
|
|
||||||
data files, config values). A file where structure and content appear to \
|
|
||||||
contradict each other is always worth reading in full.
|
|
||||||
|
|
||||||
Use the think tool when choosing which file or directory to investigate \
|
|
||||||
next — before starting a new file or switching investigation direction. \
|
|
||||||
Do NOT call think before every individual tool call in a sequence.
|
|
||||||
|
|
||||||
Use the checkpoint tool after completing investigation of a meaningful \
|
|
||||||
cluster of files. Not after every file — once or twice per directory \
|
|
||||||
loop at most.
|
|
||||||
|
|
||||||
Use the flag tool immediately when you find something notable, \
|
|
||||||
surprising, or concerning. Severity guide:
|
|
||||||
info = interesting but not problematic
|
|
||||||
concern = worth addressing
|
|
||||||
critical = likely broken or dangerous
|
|
||||||
|
|
||||||
## Step Numbering
|
|
||||||
Number your investigation steps as you go. Before starting each new \
|
|
||||||
file cluster or phase transition, output:
|
|
||||||
Step N: <what you are doing and why>
|
|
||||||
Output this as plain text before tool calls, not as a tool call itself.
|
|
||||||
|
|
||||||
## Efficiency Rules
|
|
||||||
- Batch multiple tool calls in a single turn whenever possible
|
|
||||||
- Skip binary/compiled/generated files (.pyc, .class, .o, .min.js, etc.)
|
|
||||||
- Skip files >100KB unless uniquely important
|
|
||||||
- Prioritize: README, index, main, config, schema, manifest files
|
|
||||||
- For source files: try parse_structure first, then read_file if needed
|
|
||||||
- If read_file returns truncated content, use a larger max_bytes or
|
|
||||||
run_command('tail ...') — NEVER retry the identical call
|
|
||||||
- You have only {max_turns} turns — be efficient
|
|
||||||
|
|
||||||
## Cache Schemas
|
|
||||||
File: {{path, relative_path, size_bytes, category, summary, notable,
|
|
||||||
notable_reason, cached_at}}
|
|
||||||
Dir: {{path, relative_path, child_count, summary, dominant_category,
|
|
||||||
notable_files, cached_at}}
|
|
||||||
|
|
||||||
category values: source, config, data, document, media, archive, unknown
|
|
||||||
|
|
||||||
## Context
|
|
||||||
{context}
|
|
||||||
|
|
||||||
## Child Directory Summaries (already investigated)
|
|
||||||
{child_summaries}"""
|
|
||||||
|
|
||||||
|
|
||||||
def _build_dir_context(dir_path):
|
def _build_dir_context(dir_path):
|
||||||
lines = []
|
lines = []
|
||||||
try:
|
try:
|
||||||
|
|
@ -1144,32 +873,6 @@ def _block_to_dict(block):
|
||||||
# Synthesis pass
|
# Synthesis pass
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
_SYNTHESIS_SYSTEM_PROMPT = """\
|
|
||||||
You are an expert analyst synthesizing a final report about a directory tree.
|
|
||||||
ALL directory summaries are provided below — you do NOT need to call
|
|
||||||
list_cache or read_cache. Just read the summaries and call submit_report
|
|
||||||
immediately in your first turn.
|
|
||||||
|
|
||||||
Do NOT assume the type of content. Let the summaries speak for themselves.
|
|
||||||
|
|
||||||
## Your Goal
|
|
||||||
Produce two outputs via the submit_report tool:
|
|
||||||
1. **brief**: A 2-4 sentence summary of what this directory tree is.
|
|
||||||
2. **detailed**: A thorough breakdown covering purpose, structure, key
|
|
||||||
components, technologies, notable patterns, and any concerns.
|
|
||||||
|
|
||||||
## Rules
|
|
||||||
- ALL summaries are below — call submit_report directly
|
|
||||||
- Be specific — reference actual directory and file names
|
|
||||||
- Do NOT call list_cache or read_cache
|
|
||||||
|
|
||||||
## Target
|
|
||||||
{target}
|
|
||||||
|
|
||||||
## Directory Summaries
|
|
||||||
{summaries_text}"""
|
|
||||||
|
|
||||||
|
|
||||||
def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False):
|
def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False):
|
||||||
"""Run the final synthesis pass. Returns (brief, detailed)."""
|
"""Run the final synthesis pass. Returns (brief, detailed)."""
|
||||||
dir_entries = cache.read_all_entries("dir")
|
dir_entries = cache.read_all_entries("dir")
|
||||||
|
|
@ -1300,7 +1003,7 @@ def _synthesize_from_cache(cache):
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def _run_investigation(client, target, report, show_hidden=False,
|
def _run_investigation(client, target, report, show_hidden=False,
|
||||||
fresh=False, verbose=False):
|
fresh=False, verbose=False, exclude=None):
|
||||||
"""Orchestrate the multi-pass investigation. Returns (brief, detailed, flags)."""
|
"""Orchestrate the multi-pass investigation. Returns (brief, detailed, flags)."""
|
||||||
investigation_id, is_new = _get_investigation_id(target, fresh=fresh)
|
investigation_id, is_new = _get_investigation_id(target, fresh=fresh)
|
||||||
cache = _CacheManager(investigation_id, target)
|
cache = _CacheManager(investigation_id, target)
|
||||||
|
|
@ -1313,7 +1016,8 @@ def _run_investigation(client, target, report, show_hidden=False,
|
||||||
f"{'' if is_new else ' (resumed)'}", file=sys.stderr)
|
f"{'' if is_new else ' (resumed)'}", file=sys.stderr)
|
||||||
print(f" [AI] Cache: {cache.root}/", file=sys.stderr)
|
print(f" [AI] Cache: {cache.root}/", file=sys.stderr)
|
||||||
|
|
||||||
all_dirs = _discover_directories(target, show_hidden=show_hidden)
|
all_dirs = _discover_directories(target, show_hidden=show_hidden,
|
||||||
|
exclude=exclude)
|
||||||
|
|
||||||
to_investigate = []
|
to_investigate = []
|
||||||
cached_count = 0
|
cached_count = 0
|
||||||
|
|
@ -1386,7 +1090,8 @@ def _run_investigation(client, target, report, show_hidden=False,
|
||||||
# Public interface
|
# Public interface
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def analyze_directory(report, target, verbose_tools=False, fresh=False):
|
def analyze_directory(report, target, verbose_tools=False, fresh=False,
|
||||||
|
exclude=None):
|
||||||
"""Run AI analysis on the directory. Returns (brief, detailed, flags).
|
"""Run AI analysis on the directory. Returns (brief, detailed, flags).
|
||||||
|
|
||||||
Returns ("", "", []) if the API key is missing or dependencies are not met.
|
Returns ("", "", []) if the API key is missing or dependencies are not met.
|
||||||
|
|
@ -1405,6 +1110,7 @@ def analyze_directory(report, target, verbose_tools=False, fresh=False):
|
||||||
try:
|
try:
|
||||||
brief, detailed, flags = _run_investigation(
|
brief, detailed, flags = _run_investigation(
|
||||||
client, target, report, fresh=fresh, verbose=verbose_tools,
|
client, target, report, fresh=fresh, verbose=verbose_tools,
|
||||||
|
exclude=exclude,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Warning: AI analysis failed: {e}", file=sys.stderr)
|
print(f"Warning: AI analysis failed: {e}", file=sys.stderr)
|
||||||
|
|
|
||||||
314
luminos_lib/ast_parser.py
Normal file
314
luminos_lib/ast_parser.py
Normal file
|
|
@ -0,0 +1,314 @@
|
||||||
|
"""AST structure extraction for Luminos using tree-sitter."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
import tree_sitter
|
||||||
|
import tree_sitter_python
|
||||||
|
import tree_sitter_javascript
|
||||||
|
import tree_sitter_rust
|
||||||
|
import tree_sitter_go
|
||||||
|
|
||||||
|
# Extension → (grammar_module, language_name)
|
||||||
|
_TS_LANGUAGES = {
|
||||||
|
".py": (tree_sitter_python, "python"),
|
||||||
|
".js": (tree_sitter_javascript, "javascript"),
|
||||||
|
".jsx": (tree_sitter_javascript, "javascript"),
|
||||||
|
".mjs": (tree_sitter_javascript, "javascript"),
|
||||||
|
".rs": (tree_sitter_rust, "rust"),
|
||||||
|
".go": (tree_sitter_go, "go"),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Precomputed Language objects.
|
||||||
|
_TS_LANG_CACHE = {}
|
||||||
|
|
||||||
|
|
||||||
|
def _get_ts_parser(ext):
|
||||||
|
"""Return a (Parser, language_name) tuple for a file extension, or None."""
|
||||||
|
entry = _TS_LANGUAGES.get(ext)
|
||||||
|
if entry is None:
|
||||||
|
return None
|
||||||
|
module, lang_name = entry
|
||||||
|
if lang_name not in _TS_LANG_CACHE:
|
||||||
|
_TS_LANG_CACHE[lang_name] = tree_sitter.Language(module.language())
|
||||||
|
lang = _TS_LANG_CACHE[lang_name]
|
||||||
|
parser = tree_sitter.Parser(lang)
|
||||||
|
return parser, lang_name
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Tree-sitter node helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _child_by_type(node, *types):
|
||||||
|
for c in node.children:
|
||||||
|
if c.type in types:
|
||||||
|
return c
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _text(node):
|
||||||
|
return node.text.decode("utf-8", errors="replace") if node else ""
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Per-language handlers: (root_node, source_bytes) -> dict
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _parse_python(root, source):
|
||||||
|
functions = []
|
||||||
|
classes = []
|
||||||
|
imports = []
|
||||||
|
has_docstrings = False
|
||||||
|
comment_lines = 0
|
||||||
|
|
||||||
|
def _walk(node):
|
||||||
|
nonlocal has_docstrings, comment_lines
|
||||||
|
for child in node.children:
|
||||||
|
ntype = child.type
|
||||||
|
|
||||||
|
if ntype in ("comment", "line_comment", "block_comment"):
|
||||||
|
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
|
||||||
|
|
||||||
|
if ntype == "function_definition":
|
||||||
|
name = _text(_child_by_type(child, "identifier"))
|
||||||
|
params = _text(_child_by_type(child, "parameters"))
|
||||||
|
ret = _child_by_type(child, "type")
|
||||||
|
sig = f"{name}{params}"
|
||||||
|
if ret:
|
||||||
|
sig += f" -> {_text(ret)}"
|
||||||
|
functions.append(sig)
|
||||||
|
elif ntype == "class_definition":
|
||||||
|
name = _text(_child_by_type(child, "identifier"))
|
||||||
|
methods = []
|
||||||
|
body = _child_by_type(child, "block")
|
||||||
|
if body:
|
||||||
|
for c in body.children:
|
||||||
|
if c.type == "function_definition":
|
||||||
|
mname = _text(_child_by_type(c, "identifier"))
|
||||||
|
mparams = _text(_child_by_type(c, "parameters"))
|
||||||
|
mret = _child_by_type(c, "type")
|
||||||
|
msig = f"{mname}{mparams}"
|
||||||
|
if mret:
|
||||||
|
msig += f" -> {_text(mret)}"
|
||||||
|
methods.append(msig)
|
||||||
|
classes.append({"name": name, "methods": methods[:20]})
|
||||||
|
elif ntype in ("import_statement", "import_from_statement"):
|
||||||
|
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
||||||
|
elif ntype == "expression_statement":
|
||||||
|
first = child.children[0] if child.children else None
|
||||||
|
if first and first.type == "string":
|
||||||
|
has_docstrings = True
|
||||||
|
|
||||||
|
_walk(child)
|
||||||
|
|
||||||
|
_walk(root)
|
||||||
|
|
||||||
|
source_text = source.decode("utf-8", errors="replace")
|
||||||
|
line_count = len(source_text.split("\n"))
|
||||||
|
code_lines = max(1, line_count - comment_lines)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"language": "python",
|
||||||
|
"functions": functions[:50],
|
||||||
|
"classes": classes[:30],
|
||||||
|
"imports": imports[:30],
|
||||||
|
"line_count": line_count,
|
||||||
|
"has_docstrings": has_docstrings,
|
||||||
|
"has_comments": comment_lines > 0,
|
||||||
|
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_javascript(root, source):
|
||||||
|
functions = []
|
||||||
|
classes = []
|
||||||
|
imports = []
|
||||||
|
comment_lines = 0
|
||||||
|
|
||||||
|
def _walk(node):
|
||||||
|
nonlocal comment_lines
|
||||||
|
for child in node.children:
|
||||||
|
ntype = child.type
|
||||||
|
|
||||||
|
if ntype in ("comment", "line_comment", "block_comment"):
|
||||||
|
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
|
||||||
|
|
||||||
|
if ntype in ("function_declaration", "arrow_function", "function"):
|
||||||
|
name = _text(_child_by_type(child, "identifier"))
|
||||||
|
params = _text(_child_by_type(child, "formal_parameters"))
|
||||||
|
functions.append(f"{name}{params}" if name else f"(anonymous){params}")
|
||||||
|
elif ntype == "class_declaration":
|
||||||
|
name = _text(_child_by_type(child, "identifier"))
|
||||||
|
methods = []
|
||||||
|
body = _child_by_type(child, "class_body")
|
||||||
|
if body:
|
||||||
|
for c in body.children:
|
||||||
|
if c.type == "method_definition":
|
||||||
|
mname = _text(_child_by_type(c, "property_identifier"))
|
||||||
|
mparams = _text(_child_by_type(c, "formal_parameters"))
|
||||||
|
methods.append(f"{mname}{mparams}")
|
||||||
|
classes.append({"name": name, "methods": methods[:20]})
|
||||||
|
elif ntype == "import_statement":
|
||||||
|
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
||||||
|
|
||||||
|
_walk(child)
|
||||||
|
|
||||||
|
_walk(root)
|
||||||
|
|
||||||
|
source_text = source.decode("utf-8", errors="replace")
|
||||||
|
line_count = len(source_text.split("\n"))
|
||||||
|
code_lines = max(1, line_count - comment_lines)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"language": "javascript",
|
||||||
|
"functions": functions[:50],
|
||||||
|
"classes": classes[:30],
|
||||||
|
"imports": imports[:30],
|
||||||
|
"line_count": line_count,
|
||||||
|
"has_docstrings": False,
|
||||||
|
"has_comments": comment_lines > 0,
|
||||||
|
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_rust(root, source):
|
||||||
|
functions = []
|
||||||
|
classes = []
|
||||||
|
imports = []
|
||||||
|
comment_lines = 0
|
||||||
|
|
||||||
|
def _walk(node):
|
||||||
|
nonlocal comment_lines
|
||||||
|
for child in node.children:
|
||||||
|
ntype = child.type
|
||||||
|
|
||||||
|
if ntype in ("comment", "line_comment", "block_comment"):
|
||||||
|
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
|
||||||
|
|
||||||
|
if ntype == "function_item":
|
||||||
|
name = _text(_child_by_type(child, "identifier"))
|
||||||
|
params = _text(_child_by_type(child, "parameters"))
|
||||||
|
ret = _child_by_type(child, "type_identifier", "generic_type",
|
||||||
|
"reference_type", "scoped_type_identifier")
|
||||||
|
sig = f"{name}{params}"
|
||||||
|
if ret:
|
||||||
|
sig += f" -> {_text(ret)}"
|
||||||
|
functions.append(sig)
|
||||||
|
elif ntype in ("struct_item", "enum_item", "impl_item"):
|
||||||
|
name = _text(_child_by_type(child, "type_identifier"))
|
||||||
|
classes.append({"name": name or _text(child)[:60], "methods": []})
|
||||||
|
elif ntype == "use_declaration":
|
||||||
|
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
||||||
|
|
||||||
|
_walk(child)
|
||||||
|
|
||||||
|
_walk(root)
|
||||||
|
|
||||||
|
source_text = source.decode("utf-8", errors="replace")
|
||||||
|
line_count = len(source_text.split("\n"))
|
||||||
|
code_lines = max(1, line_count - comment_lines)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"language": "rust",
|
||||||
|
"functions": functions[:50],
|
||||||
|
"classes": classes[:30],
|
||||||
|
"imports": imports[:30],
|
||||||
|
"line_count": line_count,
|
||||||
|
"has_docstrings": False,
|
||||||
|
"has_comments": comment_lines > 0,
|
||||||
|
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_go(root, source):
|
||||||
|
functions = []
|
||||||
|
classes = []
|
||||||
|
imports = []
|
||||||
|
comment_lines = 0
|
||||||
|
|
||||||
|
def _walk(node):
|
||||||
|
nonlocal comment_lines
|
||||||
|
for child in node.children:
|
||||||
|
ntype = child.type
|
||||||
|
|
||||||
|
if ntype in ("comment", "line_comment", "block_comment"):
|
||||||
|
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
|
||||||
|
|
||||||
|
if ntype == "function_declaration":
|
||||||
|
name = _text(_child_by_type(child, "identifier"))
|
||||||
|
params = _text(_child_by_type(child, "parameter_list"))
|
||||||
|
functions.append(f"{name}{params}")
|
||||||
|
elif ntype == "type_declaration":
|
||||||
|
spec = _child_by_type(child, "type_spec")
|
||||||
|
name = _text(_child_by_type(spec, "type_identifier")) if spec else ""
|
||||||
|
classes.append({"name": name or _text(child)[:60], "methods": []})
|
||||||
|
elif ntype == "import_declaration":
|
||||||
|
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
||||||
|
|
||||||
|
_walk(child)
|
||||||
|
|
||||||
|
_walk(root)
|
||||||
|
|
||||||
|
source_text = source.decode("utf-8", errors="replace")
|
||||||
|
line_count = len(source_text.split("\n"))
|
||||||
|
code_lines = max(1, line_count - comment_lines)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"language": "go",
|
||||||
|
"functions": functions[:50],
|
||||||
|
"classes": classes[:30],
|
||||||
|
"imports": imports[:30],
|
||||||
|
"line_count": line_count,
|
||||||
|
"has_docstrings": False,
|
||||||
|
"has_comments": comment_lines > 0,
|
||||||
|
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Language handler registry
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_LANGUAGE_HANDLERS = {
|
||||||
|
"python": _parse_python,
|
||||||
|
"javascript": _parse_javascript,
|
||||||
|
"rust": _parse_rust,
|
||||||
|
"go": _parse_go,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Public API
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def parse_structure(path):
|
||||||
|
"""Parse a source file and return its structural skeleton as a JSON string.
|
||||||
|
|
||||||
|
Takes an absolute path. Returns a JSON string of the structure dict,
|
||||||
|
or an error string if parsing fails or the language is unsupported.
|
||||||
|
"""
|
||||||
|
if not os.path.isfile(path):
|
||||||
|
return f"Error: '{path}' is not a file."
|
||||||
|
|
||||||
|
ext = os.path.splitext(path)[1].lower()
|
||||||
|
ts = _get_ts_parser(ext)
|
||||||
|
if ts is None:
|
||||||
|
return (f"Error: no grammar for extension '{ext}'. "
|
||||||
|
f"Supported: {', '.join(sorted(_TS_LANGUAGES.keys()))}")
|
||||||
|
|
||||||
|
parser, lang_name = ts
|
||||||
|
|
||||||
|
handler = _LANGUAGE_HANDLERS.get(lang_name)
|
||||||
|
if handler is None:
|
||||||
|
return f"Error: no handler for language '{lang_name}'."
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
source = f.read()
|
||||||
|
except OSError as e:
|
||||||
|
return f"Error reading file: {e}"
|
||||||
|
|
||||||
|
tree = parser.parse(source)
|
||||||
|
result = handler(tree.root_node, source)
|
||||||
|
return json.dumps(result, indent=2)
|
||||||
|
|
@ -34,10 +34,11 @@ def _count_lines(filepath):
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def detect_languages(classified_files):
|
def detect_languages(classified_files, on_file=None):
|
||||||
"""Detect languages present and count lines of code per language.
|
"""Detect languages present and count lines of code per language.
|
||||||
|
|
||||||
Returns (languages_set, loc_by_language).
|
Returns (languages_set, loc_by_language).
|
||||||
|
on_file(path) is called per source file, if provided.
|
||||||
"""
|
"""
|
||||||
source_files = [f for f in classified_files if f["category"] == "source"]
|
source_files = [f for f in classified_files if f["category"] == "source"]
|
||||||
languages = set()
|
languages = set()
|
||||||
|
|
@ -49,12 +50,17 @@ def detect_languages(classified_files):
|
||||||
languages.add(lang)
|
languages.add(lang)
|
||||||
lines = _count_lines(f["path"])
|
lines = _count_lines(f["path"])
|
||||||
loc[lang] = loc.get(lang, 0) + lines
|
loc[lang] = loc.get(lang, 0) + lines
|
||||||
|
if on_file:
|
||||||
|
on_file(f["path"])
|
||||||
|
|
||||||
return sorted(languages), loc
|
return sorted(languages), loc
|
||||||
|
|
||||||
|
|
||||||
def find_large_files(classified_files):
|
def find_large_files(classified_files, on_file=None):
|
||||||
"""Find files that are unusually large (>1000 lines or >10MB)."""
|
"""Find files that are unusually large (>1000 lines or >10MB).
|
||||||
|
|
||||||
|
on_file(path) is called per source file checked, if provided.
|
||||||
|
"""
|
||||||
source_files = [f for f in classified_files if f["category"] == "source"]
|
source_files = [f for f in classified_files if f["category"] == "source"]
|
||||||
large = []
|
large = []
|
||||||
|
|
||||||
|
|
@ -68,5 +74,7 @@ def find_large_files(classified_files):
|
||||||
if reasons:
|
if reasons:
|
||||||
large.append({"path": f["path"], "name": f["name"],
|
large.append({"path": f["path"], "name": f["name"],
|
||||||
"reasons": reasons})
|
"reasons": reasons})
|
||||||
|
if on_file:
|
||||||
|
on_file(f["path"])
|
||||||
|
|
||||||
return large
|
return large
|
||||||
|
|
|
||||||
|
|
@ -3,12 +3,15 @@
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
|
|
||||||
def get_disk_usage(target, show_hidden=False):
|
def get_disk_usage(target, show_hidden=False, exclude=None):
|
||||||
"""Get per-directory disk usage via du.
|
"""Get per-directory disk usage via du.
|
||||||
|
|
||||||
Returns a list of dicts: {path, size_bytes, size_human}.
|
Returns a list of dicts: {path, size_bytes, size_human}.
|
||||||
"""
|
"""
|
||||||
cmd = ["du", "-b", "--max-depth=2", target]
|
cmd = ["du", "-b", "--max-depth=2"]
|
||||||
|
for name in (exclude or []):
|
||||||
|
cmd.append(f"--exclude={name}")
|
||||||
|
cmd.append(target)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = subprocess.run(
|
result = subprocess.run(
|
||||||
|
|
|
||||||
|
|
@ -86,15 +86,19 @@ def _classify_one(filepath):
|
||||||
return "unknown", desc
|
return "unknown", desc
|
||||||
|
|
||||||
|
|
||||||
def classify_files(target, show_hidden=False):
|
def classify_files(target, show_hidden=False, exclude=None, on_file=None):
|
||||||
|
exclude = exclude or []
|
||||||
"""Walk the target directory and classify every file.
|
"""Walk the target directory and classify every file.
|
||||||
|
|
||||||
Returns a list of dicts: {path, name, category, size, description}.
|
Returns a list of dicts: {path, name, category, size, description}.
|
||||||
|
on_file(path) is called after each file is classified, if provided.
|
||||||
"""
|
"""
|
||||||
results = []
|
results = []
|
||||||
for root, dirs, files in os.walk(target):
|
for root, dirs, files in os.walk(target):
|
||||||
|
dirs[:] = [d for d in dirs
|
||||||
|
if d not in exclude
|
||||||
|
and (show_hidden or not d.startswith("."))]
|
||||||
if not show_hidden:
|
if not show_hidden:
|
||||||
dirs[:] = [d for d in dirs if not d.startswith(".")]
|
|
||||||
files = [f for f in files if not f.startswith(".")]
|
files = [f for f in files if not f.startswith(".")]
|
||||||
for fname in files:
|
for fname in files:
|
||||||
full = os.path.join(root, fname)
|
full = os.path.join(root, fname)
|
||||||
|
|
@ -112,6 +116,8 @@ def classify_files(target, show_hidden=False):
|
||||||
"size": size,
|
"size": size,
|
||||||
"description": desc,
|
"description": desc,
|
||||||
})
|
})
|
||||||
|
if on_file:
|
||||||
|
on_file(full)
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
93
luminos_lib/prompts.py
Normal file
93
luminos_lib/prompts.py
Normal file
|
|
@ -0,0 +1,93 @@
|
||||||
|
"""System prompt templates for the Luminos agent loops."""
|
||||||
|
|
||||||
|
_DIR_SYSTEM_PROMPT = """\
|
||||||
|
You are an expert analyst investigating a SINGLE directory on a file system.
|
||||||
|
Do NOT assume the type of content before investigating. Discover what this
|
||||||
|
directory contains from what you find.
|
||||||
|
|
||||||
|
## Your Task
|
||||||
|
Investigate the directory: {dir_path}
|
||||||
|
(relative to target: {dir_rel})
|
||||||
|
|
||||||
|
You must:
|
||||||
|
1. Read the important files in THIS directory (not subdirectories)
|
||||||
|
2. For each file you read, call write_cache to save a summary
|
||||||
|
3. Call write_cache for the directory itself with a synthesis
|
||||||
|
4. Call submit_report with a 1-3 sentence summary
|
||||||
|
|
||||||
|
## Tools
|
||||||
|
parse_structure gives you the skeleton of a file. It does NOT replace \
|
||||||
|
reading the file. Use parse_structure first to understand structure, then \
|
||||||
|
use read_file if you need to verify intent, check for anomalies, or \
|
||||||
|
understand content that structure cannot capture (comments, documentation, \
|
||||||
|
data files, config values). A file where structure and content appear to \
|
||||||
|
contradict each other is always worth reading in full.
|
||||||
|
|
||||||
|
Use the think tool when choosing which file or directory to investigate \
|
||||||
|
next — before starting a new file or switching investigation direction. \
|
||||||
|
Do NOT call think before every individual tool call in a sequence.
|
||||||
|
|
||||||
|
Use the checkpoint tool after completing investigation of a meaningful \
|
||||||
|
cluster of files. Not after every file — once or twice per directory \
|
||||||
|
loop at most.
|
||||||
|
|
||||||
|
Use the flag tool immediately when you find something notable, \
|
||||||
|
surprising, or concerning. Severity guide:
|
||||||
|
info = interesting but not problematic
|
||||||
|
concern = worth addressing
|
||||||
|
critical = likely broken or dangerous
|
||||||
|
|
||||||
|
## Step Numbering
|
||||||
|
Number your investigation steps as you go. Before starting each new \
|
||||||
|
file cluster or phase transition, output:
|
||||||
|
Step N: <what you are doing and why>
|
||||||
|
Output this as plain text before tool calls, not as a tool call itself.
|
||||||
|
|
||||||
|
## Efficiency Rules
|
||||||
|
- Batch multiple tool calls in a single turn whenever possible
|
||||||
|
- Skip binary/compiled/generated files (.pyc, .class, .o, .min.js, etc.)
|
||||||
|
- Skip files >100KB unless uniquely important
|
||||||
|
- Prioritize: README, index, main, config, schema, manifest files
|
||||||
|
- For source files: try parse_structure first, then read_file if needed
|
||||||
|
- If read_file returns truncated content, use a larger max_bytes or
|
||||||
|
run_command('tail ...') — NEVER retry the identical call
|
||||||
|
- You have only {max_turns} turns — be efficient
|
||||||
|
|
||||||
|
## Cache Schemas
|
||||||
|
File: {{path, relative_path, size_bytes, category, summary, notable,
|
||||||
|
notable_reason, cached_at}}
|
||||||
|
Dir: {{path, relative_path, child_count, summary, dominant_category,
|
||||||
|
notable_files, cached_at}}
|
||||||
|
|
||||||
|
category values: source, config, data, document, media, archive, unknown
|
||||||
|
|
||||||
|
## Context
|
||||||
|
{context}
|
||||||
|
|
||||||
|
## Child Directory Summaries (already investigated)
|
||||||
|
{child_summaries}"""
|
||||||
|
|
||||||
|
_SYNTHESIS_SYSTEM_PROMPT = """\
|
||||||
|
You are an expert analyst synthesizing a final report about a directory tree.
|
||||||
|
ALL directory summaries are provided below — you do NOT need to call
|
||||||
|
list_cache or read_cache. Just read the summaries and call submit_report
|
||||||
|
immediately in your first turn.
|
||||||
|
|
||||||
|
Do NOT assume the type of content. Let the summaries speak for themselves.
|
||||||
|
|
||||||
|
## Your Goal
|
||||||
|
Produce two outputs via the submit_report tool:
|
||||||
|
1. **brief**: A 2-4 sentence summary of what this directory tree is.
|
||||||
|
2. **detailed**: A thorough breakdown covering purpose, structure, key
|
||||||
|
components, technologies, notable patterns, and any concerns.
|
||||||
|
|
||||||
|
## Rules
|
||||||
|
- ALL summaries are below — call submit_report directly
|
||||||
|
- Be specific — reference actual directory and file names
|
||||||
|
- Do NOT call list_cache or read_cache
|
||||||
|
|
||||||
|
## Target
|
||||||
|
{target}
|
||||||
|
|
||||||
|
## Directory Summaries
|
||||||
|
{summaries_text}"""
|
||||||
|
|
@ -5,7 +5,7 @@ import os
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
def find_recent_files(target, n=10, show_hidden=False):
|
def find_recent_files(target, n=10, show_hidden=False, exclude=None):
|
||||||
"""Find the n most recently modified files using find and stat.
|
"""Find the n most recently modified files using find and stat.
|
||||||
|
|
||||||
Returns a list of dicts: {path, name, modified, modified_human}.
|
Returns a list of dicts: {path, name, modified, modified_human}.
|
||||||
|
|
@ -14,6 +14,9 @@ def find_recent_files(target, n=10, show_hidden=False):
|
||||||
cmd = ["find", target, "-type", "f"]
|
cmd = ["find", target, "-type", "f"]
|
||||||
if not show_hidden:
|
if not show_hidden:
|
||||||
cmd.extend(["-not", "-path", "*/.*"])
|
cmd.extend(["-not", "-path", "*/.*"])
|
||||||
|
for name in (exclude or []):
|
||||||
|
cmd.extend(["-not", "-path", f"*/{name}/*",
|
||||||
|
"-not", "-path", f"*/{name}"])
|
||||||
cmd.extend(["-printf", "%T@\t%p\n"])
|
cmd.extend(["-printf", "%T@\t%p\n"])
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,8 @@
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
|
||||||
def build_tree(path, max_depth=3, show_hidden=False, _depth=0):
|
def build_tree(path, max_depth=3, show_hidden=False, exclude=None, _depth=0):
|
||||||
|
exclude = exclude or []
|
||||||
"""Build a nested dict representing the directory tree with file sizes."""
|
"""Build a nested dict representing the directory tree with file sizes."""
|
||||||
name = os.path.basename(path) or path
|
name = os.path.basename(path) or path
|
||||||
node = {"name": name, "path": path, "type": "directory", "children": []}
|
node = {"name": name, "path": path, "type": "directory", "children": []}
|
||||||
|
|
@ -17,10 +18,12 @@ def build_tree(path, max_depth=3, show_hidden=False, _depth=0):
|
||||||
for entry in entries:
|
for entry in entries:
|
||||||
if not show_hidden and entry.startswith("."):
|
if not show_hidden and entry.startswith("."):
|
||||||
continue
|
continue
|
||||||
|
if entry in exclude:
|
||||||
|
continue
|
||||||
full = os.path.join(path, entry)
|
full = os.path.join(path, entry)
|
||||||
if os.path.isdir(full):
|
if os.path.isdir(full):
|
||||||
if _depth < max_depth:
|
if _depth < max_depth:
|
||||||
child = build_tree(full, max_depth, show_hidden, _depth + 1)
|
child = build_tree(full, max_depth, show_hidden, exclude, _depth + 1)
|
||||||
node["children"].append(child)
|
node["children"].append(child)
|
||||||
else:
|
else:
|
||||||
node["children"].append({
|
node["children"].append({
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue