Compare commits
10 commits
8aa6c713db
...
d323190866
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d323190866 | ||
|
|
78f9a396dd | ||
|
|
78f80c31ed | ||
|
|
206d2d34f6 | ||
|
|
bbaf387cb7 | ||
|
|
ebc6b852f1 | ||
|
|
33df555a8c | ||
|
|
ea8c07a692 | ||
|
|
5c6124a715 | ||
|
|
0c49da23ab |
9 changed files with 510 additions and 326 deletions
68
luminos.py
68
luminos.py
|
|
@ -3,8 +3,9 @@
|
|||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
from luminos_lib.tree import build_tree, render_tree
|
||||
from luminos_lib.filetypes import classify_files, summarize_categories
|
||||
|
|
@ -15,29 +16,67 @@ from luminos_lib.watch import watch_loop
|
|||
from luminos_lib.report import format_report
|
||||
|
||||
|
||||
def scan(target, depth=3, show_hidden=False):
|
||||
def _progress(label):
|
||||
"""Return (on_file, finish) for in-place per-file progress on stderr.
|
||||
|
||||
on_file(path) overwrites the current line with the label and truncated path.
|
||||
finish() finalises the line with a newline.
|
||||
"""
|
||||
cols = shutil.get_terminal_size((80, 20)).columns
|
||||
prefix = f" [scan] {label}... "
|
||||
available = max(cols - len(prefix), 10)
|
||||
|
||||
def on_file(path):
|
||||
rel = os.path.relpath(path)
|
||||
if len(rel) > available:
|
||||
rel = "..." + rel[-(available - 3):]
|
||||
print(f"\r{prefix}{rel}\033[K", end="", file=sys.stderr, flush=True)
|
||||
|
||||
def finish():
|
||||
print(f"\r{prefix}done\033[K", file=sys.stderr, flush=True)
|
||||
|
||||
return on_file, finish
|
||||
|
||||
|
||||
def scan(target, depth=3, show_hidden=False, exclude=None):
|
||||
"""Run all analyses on the target directory and return a report dict."""
|
||||
report = {}
|
||||
|
||||
tree = build_tree(target, max_depth=depth, show_hidden=show_hidden)
|
||||
exclude = exclude or []
|
||||
|
||||
print(f" [scan] Building directory tree (depth={depth})...", file=sys.stderr)
|
||||
tree = build_tree(target, max_depth=depth, show_hidden=show_hidden,
|
||||
exclude=exclude)
|
||||
report["tree"] = tree
|
||||
report["tree_rendered"] = render_tree(tree)
|
||||
|
||||
classified = classify_files(target, show_hidden=show_hidden)
|
||||
on_file, finish = _progress("Classifying files")
|
||||
classified = classify_files(target, show_hidden=show_hidden,
|
||||
exclude=exclude, on_file=on_file)
|
||||
finish()
|
||||
report["file_categories"] = summarize_categories(classified)
|
||||
report["classified_files"] = classified
|
||||
|
||||
languages, loc = detect_languages(classified)
|
||||
on_file, finish = _progress("Counting lines")
|
||||
languages, loc = detect_languages(classified, on_file=on_file)
|
||||
finish()
|
||||
report["languages"] = languages
|
||||
report["lines_of_code"] = loc
|
||||
report["large_files"] = find_large_files(classified)
|
||||
|
||||
report["recent_files"] = find_recent_files(target, show_hidden=show_hidden)
|
||||
on_file, finish = _progress("Checking for large files")
|
||||
report["large_files"] = find_large_files(classified, on_file=on_file)
|
||||
finish()
|
||||
|
||||
usage = get_disk_usage(target, show_hidden=show_hidden)
|
||||
print(" [scan] Finding recently modified files...", file=sys.stderr)
|
||||
report["recent_files"] = find_recent_files(target, show_hidden=show_hidden,
|
||||
exclude=exclude)
|
||||
|
||||
print(" [scan] Calculating disk usage...", file=sys.stderr)
|
||||
usage = get_disk_usage(target, show_hidden=show_hidden, exclude=exclude)
|
||||
report["disk_usage"] = usage
|
||||
report["top_directories"] = top_directories(usage, n=5)
|
||||
|
||||
print(" [scan] Base scan complete.", file=sys.stderr)
|
||||
return report
|
||||
|
||||
|
||||
|
|
@ -67,6 +106,10 @@ def main():
|
|||
help="Force a new AI investigation (ignore cached results)")
|
||||
parser.add_argument("--install-extras", action="store_true",
|
||||
help="Show status of optional AI dependencies")
|
||||
parser.add_argument("-x", "--exclude", metavar="DIR", action="append",
|
||||
default=[],
|
||||
help="Exclude a directory name from scan and analysis "
|
||||
"(repeatable, e.g. -x .git -x node_modules)")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
|
@ -92,17 +135,22 @@ def main():
|
|||
file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if args.exclude:
|
||||
print(f" [scan] Excluding: {', '.join(args.exclude)}", file=sys.stderr)
|
||||
|
||||
if args.watch:
|
||||
watch_loop(target, depth=args.depth, show_hidden=args.all,
|
||||
json_output=args.json_output)
|
||||
return
|
||||
|
||||
report = scan(target, depth=args.depth, show_hidden=args.all)
|
||||
report = scan(target, depth=args.depth, show_hidden=args.all,
|
||||
exclude=args.exclude)
|
||||
|
||||
flags = []
|
||||
if args.ai:
|
||||
from luminos_lib.ai import analyze_directory
|
||||
brief, detailed, flags = analyze_directory(report, target, fresh=args.fresh)
|
||||
brief, detailed, flags = analyze_directory(
|
||||
report, target, fresh=args.fresh, exclude=args.exclude)
|
||||
report["ai_brief"] = brief
|
||||
report["ai_detailed"] = detailed
|
||||
report["flags"] = flags
|
||||
|
|
|
|||
|
|
@ -19,14 +19,10 @@ from datetime import datetime, timezone
|
|||
|
||||
import anthropic
|
||||
import magic
|
||||
import tree_sitter
|
||||
import tree_sitter_python
|
||||
import tree_sitter_javascript
|
||||
import tree_sitter_rust
|
||||
import tree_sitter_go
|
||||
|
||||
from luminos_lib.ast_parser import parse_structure
|
||||
from luminos_lib.cache import _CacheManager, _get_investigation_id
|
||||
from luminos_lib.capabilities import check_ai_dependencies
|
||||
from luminos_lib.prompts import _DIR_SYSTEM_PROMPT, _SYNTHESIS_SYSTEM_PROMPT
|
||||
|
||||
MODEL = "claude-sonnet-4-20250514"
|
||||
|
||||
|
|
@ -48,33 +44,6 @@ _SKIP_DIRS = {
|
|||
# Commands the run_command tool is allowed to execute.
|
||||
_COMMAND_WHITELIST = {"wc", "file", "grep", "head", "tail", "stat", "du", "find"}
|
||||
|
||||
# tree-sitter language registry: extension → (grammar_module, language_name)
|
||||
_TS_LANGUAGES = {
|
||||
".py": (tree_sitter_python, "python"),
|
||||
".js": (tree_sitter_javascript, "javascript"),
|
||||
".jsx": (tree_sitter_javascript, "javascript"),
|
||||
".mjs": (tree_sitter_javascript, "javascript"),
|
||||
".rs": (tree_sitter_rust, "rust"),
|
||||
".go": (tree_sitter_go, "go"),
|
||||
}
|
||||
|
||||
# Precompute Language objects once.
|
||||
_TS_LANG_CACHE = {}
|
||||
|
||||
|
||||
def _get_ts_parser(ext):
|
||||
"""Return a (Parser, language_name) tuple for a file extension, or None."""
|
||||
entry = _TS_LANGUAGES.get(ext)
|
||||
if entry is None:
|
||||
return None
|
||||
module, lang_name = entry
|
||||
if lang_name not in _TS_LANG_CACHE:
|
||||
_TS_LANG_CACHE[lang_name] = tree_sitter.Language(module.language())
|
||||
lang = _TS_LANG_CACHE[lang_name]
|
||||
parser = tree_sitter.Parser(lang)
|
||||
return parser, lang_name
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
@ -533,181 +502,7 @@ def _tool_parse_structure(args, target, _cache):
|
|||
path = os.path.join(target, path)
|
||||
if not _path_is_safe(path, target):
|
||||
return f"Error: path '{path}' is outside the target directory."
|
||||
if not os.path.isfile(path):
|
||||
return f"Error: '{path}' is not a file."
|
||||
|
||||
ext = os.path.splitext(path)[1].lower()
|
||||
ts = _get_ts_parser(ext)
|
||||
if ts is None:
|
||||
return f"Error: no grammar for extension '{ext}'. Supported: {', '.join(sorted(_TS_LANGUAGES.keys()))}"
|
||||
|
||||
parser, lang_name = ts
|
||||
|
||||
try:
|
||||
with open(path, "rb") as f:
|
||||
source = f.read()
|
||||
except OSError as e:
|
||||
return f"Error reading file: {e}"
|
||||
|
||||
tree = parser.parse(source)
|
||||
root = tree.root_node
|
||||
source_text = source.decode("utf-8", errors="replace")
|
||||
lines = source_text.split("\n")
|
||||
line_count = len(lines)
|
||||
|
||||
functions = []
|
||||
classes = []
|
||||
imports = []
|
||||
has_docstrings = False
|
||||
comment_lines = 0
|
||||
|
||||
def _walk(node):
|
||||
nonlocal has_docstrings, comment_lines
|
||||
for child in node.children:
|
||||
ntype = child.type
|
||||
|
||||
# Comments
|
||||
if ntype in ("comment", "line_comment", "block_comment"):
|
||||
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
|
||||
|
||||
# Python
|
||||
if lang_name == "python":
|
||||
if ntype == "function_definition":
|
||||
functions.append(_py_func_sig(child))
|
||||
elif ntype == "class_definition":
|
||||
classes.append(_py_class(child))
|
||||
elif ntype in ("import_statement", "import_from_statement"):
|
||||
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
||||
elif ntype == "expression_statement":
|
||||
first = child.children[0] if child.children else None
|
||||
if first and first.type == "string":
|
||||
has_docstrings = True
|
||||
|
||||
# JavaScript
|
||||
elif lang_name == "javascript":
|
||||
if ntype in ("function_declaration", "arrow_function",
|
||||
"function"):
|
||||
functions.append(_js_func_sig(child))
|
||||
elif ntype == "class_declaration":
|
||||
classes.append(_js_class(child))
|
||||
elif ntype in ("import_statement",):
|
||||
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
||||
|
||||
# Rust
|
||||
elif lang_name == "rust":
|
||||
if ntype == "function_item":
|
||||
functions.append(_rust_func_sig(child))
|
||||
elif ntype in ("struct_item", "enum_item", "impl_item"):
|
||||
classes.append(_rust_struct(child))
|
||||
elif ntype == "use_declaration":
|
||||
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
||||
|
||||
# Go
|
||||
elif lang_name == "go":
|
||||
if ntype == "function_declaration":
|
||||
functions.append(_go_func_sig(child))
|
||||
elif ntype == "type_declaration":
|
||||
classes.append(_go_type(child))
|
||||
elif ntype == "import_declaration":
|
||||
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
||||
|
||||
_walk(child)
|
||||
|
||||
_walk(root)
|
||||
|
||||
code_lines = max(1, line_count - comment_lines)
|
||||
result = {
|
||||
"language": lang_name,
|
||||
"functions": functions[:50],
|
||||
"classes": classes[:30],
|
||||
"imports": imports[:30],
|
||||
"line_count": line_count,
|
||||
"has_docstrings": has_docstrings,
|
||||
"has_comments": comment_lines > 0,
|
||||
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
|
||||
}
|
||||
return json.dumps(result, indent=2)
|
||||
|
||||
|
||||
# --- tree-sitter extraction helpers ---
|
||||
|
||||
def _child_by_type(node, *types):
|
||||
for c in node.children:
|
||||
if c.type in types:
|
||||
return c
|
||||
return None
|
||||
|
||||
|
||||
def _text(node):
|
||||
return node.text.decode("utf-8", errors="replace") if node else ""
|
||||
|
||||
|
||||
def _py_func_sig(node):
|
||||
name = _text(_child_by_type(node, "identifier"))
|
||||
params = _text(_child_by_type(node, "parameters"))
|
||||
ret = _child_by_type(node, "type")
|
||||
sig = f"{name}{params}"
|
||||
if ret:
|
||||
sig += f" -> {_text(ret)}"
|
||||
return sig
|
||||
|
||||
|
||||
def _py_class(node):
|
||||
name = _text(_child_by_type(node, "identifier"))
|
||||
methods = []
|
||||
body = _child_by_type(node, "block")
|
||||
if body:
|
||||
for child in body.children:
|
||||
if child.type == "function_definition":
|
||||
methods.append(_py_func_sig(child))
|
||||
return {"name": name, "methods": methods[:20]}
|
||||
|
||||
|
||||
def _js_func_sig(node):
|
||||
name = _text(_child_by_type(node, "identifier"))
|
||||
params = _text(_child_by_type(node, "formal_parameters"))
|
||||
return f"{name}{params}" if name else f"(anonymous){params}"
|
||||
|
||||
|
||||
def _js_class(node):
|
||||
name = _text(_child_by_type(node, "identifier"))
|
||||
methods = []
|
||||
body = _child_by_type(node, "class_body")
|
||||
if body:
|
||||
for child in body.children:
|
||||
if child.type == "method_definition":
|
||||
mname = _text(_child_by_type(child, "property_identifier"))
|
||||
mparams = _text(_child_by_type(child, "formal_parameters"))
|
||||
methods.append(f"{mname}{mparams}")
|
||||
return {"name": name, "methods": methods[:20]}
|
||||
|
||||
|
||||
def _rust_func_sig(node):
|
||||
name = _text(_child_by_type(node, "identifier"))
|
||||
params = _text(_child_by_type(node, "parameters"))
|
||||
ret = _child_by_type(node, "type_identifier", "generic_type",
|
||||
"reference_type", "scoped_type_identifier")
|
||||
sig = f"{name}{params}"
|
||||
if ret:
|
||||
sig += f" -> {_text(ret)}"
|
||||
return sig
|
||||
|
||||
|
||||
def _rust_struct(node):
|
||||
name = _text(_child_by_type(node, "type_identifier"))
|
||||
return {"name": name or _text(node)[:60], "methods": []}
|
||||
|
||||
|
||||
def _go_func_sig(node):
|
||||
name = _text(_child_by_type(node, "identifier"))
|
||||
params = _text(_child_by_type(node, "parameter_list"))
|
||||
return f"{name}{params}"
|
||||
|
||||
|
||||
def _go_type(node):
|
||||
spec = _child_by_type(node, "type_spec")
|
||||
name = _text(_child_by_type(spec, "type_identifier")) if spec else ""
|
||||
return {"name": name or _text(node)[:60], "methods": []}
|
||||
return parse_structure(path)
|
||||
|
||||
|
||||
def _tool_write_cache(args, _target, cache):
|
||||
|
|
@ -848,14 +643,16 @@ def _call_api_streaming(client, system, messages, tools, tracker):
|
|||
# Directory discovery
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _discover_directories(target, show_hidden=False):
|
||||
def _discover_directories(target, show_hidden=False, exclude=None):
|
||||
"""Walk the target and return all directories sorted leaves-first."""
|
||||
extra = set(exclude or [])
|
||||
dirs = []
|
||||
target_real = os.path.realpath(target)
|
||||
for root, subdirs, _files in os.walk(target_real, topdown=True):
|
||||
subdirs[:] = [
|
||||
d for d in subdirs
|
||||
if not _should_skip_dir(d)
|
||||
and d not in extra
|
||||
and (show_hidden or not d.startswith("."))
|
||||
]
|
||||
dirs.append(root)
|
||||
|
|
@ -867,74 +664,6 @@ def _discover_directories(target, show_hidden=False):
|
|||
# Per-directory agent loop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_DIR_SYSTEM_PROMPT = """\
|
||||
You are an expert analyst investigating a SINGLE directory on a file system.
|
||||
Do NOT assume the type of content before investigating. Discover what this
|
||||
directory contains from what you find.
|
||||
|
||||
## Your Task
|
||||
Investigate the directory: {dir_path}
|
||||
(relative to target: {dir_rel})
|
||||
|
||||
You must:
|
||||
1. Read the important files in THIS directory (not subdirectories)
|
||||
2. For each file you read, call write_cache to save a summary
|
||||
3. Call write_cache for the directory itself with a synthesis
|
||||
4. Call submit_report with a 1-3 sentence summary
|
||||
|
||||
## Tools
|
||||
parse_structure gives you the skeleton of a file. It does NOT replace \
|
||||
reading the file. Use parse_structure first to understand structure, then \
|
||||
use read_file if you need to verify intent, check for anomalies, or \
|
||||
understand content that structure cannot capture (comments, documentation, \
|
||||
data files, config values). A file where structure and content appear to \
|
||||
contradict each other is always worth reading in full.
|
||||
|
||||
Use the think tool when choosing which file or directory to investigate \
|
||||
next — before starting a new file or switching investigation direction. \
|
||||
Do NOT call think before every individual tool call in a sequence.
|
||||
|
||||
Use the checkpoint tool after completing investigation of a meaningful \
|
||||
cluster of files. Not after every file — once or twice per directory \
|
||||
loop at most.
|
||||
|
||||
Use the flag tool immediately when you find something notable, \
|
||||
surprising, or concerning. Severity guide:
|
||||
info = interesting but not problematic
|
||||
concern = worth addressing
|
||||
critical = likely broken or dangerous
|
||||
|
||||
## Step Numbering
|
||||
Number your investigation steps as you go. Before starting each new \
|
||||
file cluster or phase transition, output:
|
||||
Step N: <what you are doing and why>
|
||||
Output this as plain text before tool calls, not as a tool call itself.
|
||||
|
||||
## Efficiency Rules
|
||||
- Batch multiple tool calls in a single turn whenever possible
|
||||
- Skip binary/compiled/generated files (.pyc, .class, .o, .min.js, etc.)
|
||||
- Skip files >100KB unless uniquely important
|
||||
- Prioritize: README, index, main, config, schema, manifest files
|
||||
- For source files: try parse_structure first, then read_file if needed
|
||||
- If read_file returns truncated content, use a larger max_bytes or
|
||||
run_command('tail ...') — NEVER retry the identical call
|
||||
- You have only {max_turns} turns — be efficient
|
||||
|
||||
## Cache Schemas
|
||||
File: {{path, relative_path, size_bytes, category, summary, notable,
|
||||
notable_reason, cached_at}}
|
||||
Dir: {{path, relative_path, child_count, summary, dominant_category,
|
||||
notable_files, cached_at}}
|
||||
|
||||
category values: source, config, data, document, media, archive, unknown
|
||||
|
||||
## Context
|
||||
{context}
|
||||
|
||||
## Child Directory Summaries (already investigated)
|
||||
{child_summaries}"""
|
||||
|
||||
|
||||
def _build_dir_context(dir_path):
|
||||
lines = []
|
||||
try:
|
||||
|
|
@ -1144,32 +873,6 @@ def _block_to_dict(block):
|
|||
# Synthesis pass
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_SYNTHESIS_SYSTEM_PROMPT = """\
|
||||
You are an expert analyst synthesizing a final report about a directory tree.
|
||||
ALL directory summaries are provided below — you do NOT need to call
|
||||
list_cache or read_cache. Just read the summaries and call submit_report
|
||||
immediately in your first turn.
|
||||
|
||||
Do NOT assume the type of content. Let the summaries speak for themselves.
|
||||
|
||||
## Your Goal
|
||||
Produce two outputs via the submit_report tool:
|
||||
1. **brief**: A 2-4 sentence summary of what this directory tree is.
|
||||
2. **detailed**: A thorough breakdown covering purpose, structure, key
|
||||
components, technologies, notable patterns, and any concerns.
|
||||
|
||||
## Rules
|
||||
- ALL summaries are below — call submit_report directly
|
||||
- Be specific — reference actual directory and file names
|
||||
- Do NOT call list_cache or read_cache
|
||||
|
||||
## Target
|
||||
{target}
|
||||
|
||||
## Directory Summaries
|
||||
{summaries_text}"""
|
||||
|
||||
|
||||
def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False):
|
||||
"""Run the final synthesis pass. Returns (brief, detailed)."""
|
||||
dir_entries = cache.read_all_entries("dir")
|
||||
|
|
@ -1300,7 +1003,7 @@ def _synthesize_from_cache(cache):
|
|||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _run_investigation(client, target, report, show_hidden=False,
|
||||
fresh=False, verbose=False):
|
||||
fresh=False, verbose=False, exclude=None):
|
||||
"""Orchestrate the multi-pass investigation. Returns (brief, detailed, flags)."""
|
||||
investigation_id, is_new = _get_investigation_id(target, fresh=fresh)
|
||||
cache = _CacheManager(investigation_id, target)
|
||||
|
|
@ -1313,7 +1016,8 @@ def _run_investigation(client, target, report, show_hidden=False,
|
|||
f"{'' if is_new else ' (resumed)'}", file=sys.stderr)
|
||||
print(f" [AI] Cache: {cache.root}/", file=sys.stderr)
|
||||
|
||||
all_dirs = _discover_directories(target, show_hidden=show_hidden)
|
||||
all_dirs = _discover_directories(target, show_hidden=show_hidden,
|
||||
exclude=exclude)
|
||||
|
||||
to_investigate = []
|
||||
cached_count = 0
|
||||
|
|
@ -1386,7 +1090,8 @@ def _run_investigation(client, target, report, show_hidden=False,
|
|||
# Public interface
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def analyze_directory(report, target, verbose_tools=False, fresh=False):
|
||||
def analyze_directory(report, target, verbose_tools=False, fresh=False,
|
||||
exclude=None):
|
||||
"""Run AI analysis on the directory. Returns (brief, detailed, flags).
|
||||
|
||||
Returns ("", "", []) if the API key is missing or dependencies are not met.
|
||||
|
|
@ -1405,6 +1110,7 @@ def analyze_directory(report, target, verbose_tools=False, fresh=False):
|
|||
try:
|
||||
brief, detailed, flags = _run_investigation(
|
||||
client, target, report, fresh=fresh, verbose=verbose_tools,
|
||||
exclude=exclude,
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Warning: AI analysis failed: {e}", file=sys.stderr)
|
||||
|
|
|
|||
314
luminos_lib/ast_parser.py
Normal file
314
luminos_lib/ast_parser.py
Normal file
|
|
@ -0,0 +1,314 @@
|
|||
"""AST structure extraction for Luminos using tree-sitter."""
|
||||
|
||||
import json
|
||||
import os
|
||||
|
||||
import tree_sitter
|
||||
import tree_sitter_python
|
||||
import tree_sitter_javascript
|
||||
import tree_sitter_rust
|
||||
import tree_sitter_go
|
||||
|
||||
# Extension → (grammar_module, language_name)
|
||||
_TS_LANGUAGES = {
|
||||
".py": (tree_sitter_python, "python"),
|
||||
".js": (tree_sitter_javascript, "javascript"),
|
||||
".jsx": (tree_sitter_javascript, "javascript"),
|
||||
".mjs": (tree_sitter_javascript, "javascript"),
|
||||
".rs": (tree_sitter_rust, "rust"),
|
||||
".go": (tree_sitter_go, "go"),
|
||||
}
|
||||
|
||||
# Precomputed Language objects.
|
||||
_TS_LANG_CACHE = {}
|
||||
|
||||
|
||||
def _get_ts_parser(ext):
|
||||
"""Return a (Parser, language_name) tuple for a file extension, or None."""
|
||||
entry = _TS_LANGUAGES.get(ext)
|
||||
if entry is None:
|
||||
return None
|
||||
module, lang_name = entry
|
||||
if lang_name not in _TS_LANG_CACHE:
|
||||
_TS_LANG_CACHE[lang_name] = tree_sitter.Language(module.language())
|
||||
lang = _TS_LANG_CACHE[lang_name]
|
||||
parser = tree_sitter.Parser(lang)
|
||||
return parser, lang_name
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tree-sitter node helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _child_by_type(node, *types):
|
||||
for c in node.children:
|
||||
if c.type in types:
|
||||
return c
|
||||
return None
|
||||
|
||||
|
||||
def _text(node):
|
||||
return node.text.decode("utf-8", errors="replace") if node else ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Per-language handlers: (root_node, source_bytes) -> dict
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _parse_python(root, source):
|
||||
functions = []
|
||||
classes = []
|
||||
imports = []
|
||||
has_docstrings = False
|
||||
comment_lines = 0
|
||||
|
||||
def _walk(node):
|
||||
nonlocal has_docstrings, comment_lines
|
||||
for child in node.children:
|
||||
ntype = child.type
|
||||
|
||||
if ntype in ("comment", "line_comment", "block_comment"):
|
||||
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
|
||||
|
||||
if ntype == "function_definition":
|
||||
name = _text(_child_by_type(child, "identifier"))
|
||||
params = _text(_child_by_type(child, "parameters"))
|
||||
ret = _child_by_type(child, "type")
|
||||
sig = f"{name}{params}"
|
||||
if ret:
|
||||
sig += f" -> {_text(ret)}"
|
||||
functions.append(sig)
|
||||
elif ntype == "class_definition":
|
||||
name = _text(_child_by_type(child, "identifier"))
|
||||
methods = []
|
||||
body = _child_by_type(child, "block")
|
||||
if body:
|
||||
for c in body.children:
|
||||
if c.type == "function_definition":
|
||||
mname = _text(_child_by_type(c, "identifier"))
|
||||
mparams = _text(_child_by_type(c, "parameters"))
|
||||
mret = _child_by_type(c, "type")
|
||||
msig = f"{mname}{mparams}"
|
||||
if mret:
|
||||
msig += f" -> {_text(mret)}"
|
||||
methods.append(msig)
|
||||
classes.append({"name": name, "methods": methods[:20]})
|
||||
elif ntype in ("import_statement", "import_from_statement"):
|
||||
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
||||
elif ntype == "expression_statement":
|
||||
first = child.children[0] if child.children else None
|
||||
if first and first.type == "string":
|
||||
has_docstrings = True
|
||||
|
||||
_walk(child)
|
||||
|
||||
_walk(root)
|
||||
|
||||
source_text = source.decode("utf-8", errors="replace")
|
||||
line_count = len(source_text.split("\n"))
|
||||
code_lines = max(1, line_count - comment_lines)
|
||||
|
||||
return {
|
||||
"language": "python",
|
||||
"functions": functions[:50],
|
||||
"classes": classes[:30],
|
||||
"imports": imports[:30],
|
||||
"line_count": line_count,
|
||||
"has_docstrings": has_docstrings,
|
||||
"has_comments": comment_lines > 0,
|
||||
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
|
||||
}
|
||||
|
||||
|
||||
def _parse_javascript(root, source):
|
||||
functions = []
|
||||
classes = []
|
||||
imports = []
|
||||
comment_lines = 0
|
||||
|
||||
def _walk(node):
|
||||
nonlocal comment_lines
|
||||
for child in node.children:
|
||||
ntype = child.type
|
||||
|
||||
if ntype in ("comment", "line_comment", "block_comment"):
|
||||
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
|
||||
|
||||
if ntype in ("function_declaration", "arrow_function", "function"):
|
||||
name = _text(_child_by_type(child, "identifier"))
|
||||
params = _text(_child_by_type(child, "formal_parameters"))
|
||||
functions.append(f"{name}{params}" if name else f"(anonymous){params}")
|
||||
elif ntype == "class_declaration":
|
||||
name = _text(_child_by_type(child, "identifier"))
|
||||
methods = []
|
||||
body = _child_by_type(child, "class_body")
|
||||
if body:
|
||||
for c in body.children:
|
||||
if c.type == "method_definition":
|
||||
mname = _text(_child_by_type(c, "property_identifier"))
|
||||
mparams = _text(_child_by_type(c, "formal_parameters"))
|
||||
methods.append(f"{mname}{mparams}")
|
||||
classes.append({"name": name, "methods": methods[:20]})
|
||||
elif ntype == "import_statement":
|
||||
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
||||
|
||||
_walk(child)
|
||||
|
||||
_walk(root)
|
||||
|
||||
source_text = source.decode("utf-8", errors="replace")
|
||||
line_count = len(source_text.split("\n"))
|
||||
code_lines = max(1, line_count - comment_lines)
|
||||
|
||||
return {
|
||||
"language": "javascript",
|
||||
"functions": functions[:50],
|
||||
"classes": classes[:30],
|
||||
"imports": imports[:30],
|
||||
"line_count": line_count,
|
||||
"has_docstrings": False,
|
||||
"has_comments": comment_lines > 0,
|
||||
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
|
||||
}
|
||||
|
||||
|
||||
def _parse_rust(root, source):
|
||||
functions = []
|
||||
classes = []
|
||||
imports = []
|
||||
comment_lines = 0
|
||||
|
||||
def _walk(node):
|
||||
nonlocal comment_lines
|
||||
for child in node.children:
|
||||
ntype = child.type
|
||||
|
||||
if ntype in ("comment", "line_comment", "block_comment"):
|
||||
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
|
||||
|
||||
if ntype == "function_item":
|
||||
name = _text(_child_by_type(child, "identifier"))
|
||||
params = _text(_child_by_type(child, "parameters"))
|
||||
ret = _child_by_type(child, "type_identifier", "generic_type",
|
||||
"reference_type", "scoped_type_identifier")
|
||||
sig = f"{name}{params}"
|
||||
if ret:
|
||||
sig += f" -> {_text(ret)}"
|
||||
functions.append(sig)
|
||||
elif ntype in ("struct_item", "enum_item", "impl_item"):
|
||||
name = _text(_child_by_type(child, "type_identifier"))
|
||||
classes.append({"name": name or _text(child)[:60], "methods": []})
|
||||
elif ntype == "use_declaration":
|
||||
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
||||
|
||||
_walk(child)
|
||||
|
||||
_walk(root)
|
||||
|
||||
source_text = source.decode("utf-8", errors="replace")
|
||||
line_count = len(source_text.split("\n"))
|
||||
code_lines = max(1, line_count - comment_lines)
|
||||
|
||||
return {
|
||||
"language": "rust",
|
||||
"functions": functions[:50],
|
||||
"classes": classes[:30],
|
||||
"imports": imports[:30],
|
||||
"line_count": line_count,
|
||||
"has_docstrings": False,
|
||||
"has_comments": comment_lines > 0,
|
||||
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
|
||||
}
|
||||
|
||||
|
||||
def _parse_go(root, source):
|
||||
functions = []
|
||||
classes = []
|
||||
imports = []
|
||||
comment_lines = 0
|
||||
|
||||
def _walk(node):
|
||||
nonlocal comment_lines
|
||||
for child in node.children:
|
||||
ntype = child.type
|
||||
|
||||
if ntype in ("comment", "line_comment", "block_comment"):
|
||||
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
|
||||
|
||||
if ntype == "function_declaration":
|
||||
name = _text(_child_by_type(child, "identifier"))
|
||||
params = _text(_child_by_type(child, "parameter_list"))
|
||||
functions.append(f"{name}{params}")
|
||||
elif ntype == "type_declaration":
|
||||
spec = _child_by_type(child, "type_spec")
|
||||
name = _text(_child_by_type(spec, "type_identifier")) if spec else ""
|
||||
classes.append({"name": name or _text(child)[:60], "methods": []})
|
||||
elif ntype == "import_declaration":
|
||||
imports.append(child.text.decode("utf-8", errors="replace").strip())
|
||||
|
||||
_walk(child)
|
||||
|
||||
_walk(root)
|
||||
|
||||
source_text = source.decode("utf-8", errors="replace")
|
||||
line_count = len(source_text.split("\n"))
|
||||
code_lines = max(1, line_count - comment_lines)
|
||||
|
||||
return {
|
||||
"language": "go",
|
||||
"functions": functions[:50],
|
||||
"classes": classes[:30],
|
||||
"imports": imports[:30],
|
||||
"line_count": line_count,
|
||||
"has_docstrings": False,
|
||||
"has_comments": comment_lines > 0,
|
||||
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Language handler registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_LANGUAGE_HANDLERS = {
|
||||
"python": _parse_python,
|
||||
"javascript": _parse_javascript,
|
||||
"rust": _parse_rust,
|
||||
"go": _parse_go,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def parse_structure(path):
|
||||
"""Parse a source file and return its structural skeleton as a JSON string.
|
||||
|
||||
Takes an absolute path. Returns a JSON string of the structure dict,
|
||||
or an error string if parsing fails or the language is unsupported.
|
||||
"""
|
||||
if not os.path.isfile(path):
|
||||
return f"Error: '{path}' is not a file."
|
||||
|
||||
ext = os.path.splitext(path)[1].lower()
|
||||
ts = _get_ts_parser(ext)
|
||||
if ts is None:
|
||||
return (f"Error: no grammar for extension '{ext}'. "
|
||||
f"Supported: {', '.join(sorted(_TS_LANGUAGES.keys()))}")
|
||||
|
||||
parser, lang_name = ts
|
||||
|
||||
handler = _LANGUAGE_HANDLERS.get(lang_name)
|
||||
if handler is None:
|
||||
return f"Error: no handler for language '{lang_name}'."
|
||||
|
||||
try:
|
||||
with open(path, "rb") as f:
|
||||
source = f.read()
|
||||
except OSError as e:
|
||||
return f"Error reading file: {e}"
|
||||
|
||||
tree = parser.parse(source)
|
||||
result = handler(tree.root_node, source)
|
||||
return json.dumps(result, indent=2)
|
||||
|
|
@ -34,10 +34,11 @@ def _count_lines(filepath):
|
|||
return 0
|
||||
|
||||
|
||||
def detect_languages(classified_files):
|
||||
def detect_languages(classified_files, on_file=None):
|
||||
"""Detect languages present and count lines of code per language.
|
||||
|
||||
Returns (languages_set, loc_by_language).
|
||||
on_file(path) is called per source file, if provided.
|
||||
"""
|
||||
source_files = [f for f in classified_files if f["category"] == "source"]
|
||||
languages = set()
|
||||
|
|
@ -49,12 +50,17 @@ def detect_languages(classified_files):
|
|||
languages.add(lang)
|
||||
lines = _count_lines(f["path"])
|
||||
loc[lang] = loc.get(lang, 0) + lines
|
||||
if on_file:
|
||||
on_file(f["path"])
|
||||
|
||||
return sorted(languages), loc
|
||||
|
||||
|
||||
def find_large_files(classified_files):
|
||||
"""Find files that are unusually large (>1000 lines or >10MB)."""
|
||||
def find_large_files(classified_files, on_file=None):
|
||||
"""Find files that are unusually large (>1000 lines or >10MB).
|
||||
|
||||
on_file(path) is called per source file checked, if provided.
|
||||
"""
|
||||
source_files = [f for f in classified_files if f["category"] == "source"]
|
||||
large = []
|
||||
|
||||
|
|
@ -68,5 +74,7 @@ def find_large_files(classified_files):
|
|||
if reasons:
|
||||
large.append({"path": f["path"], "name": f["name"],
|
||||
"reasons": reasons})
|
||||
if on_file:
|
||||
on_file(f["path"])
|
||||
|
||||
return large
|
||||
|
|
|
|||
|
|
@ -3,12 +3,15 @@
|
|||
import subprocess
|
||||
|
||||
|
||||
def get_disk_usage(target, show_hidden=False):
|
||||
def get_disk_usage(target, show_hidden=False, exclude=None):
|
||||
"""Get per-directory disk usage via du.
|
||||
|
||||
Returns a list of dicts: {path, size_bytes, size_human}.
|
||||
"""
|
||||
cmd = ["du", "-b", "--max-depth=2", target]
|
||||
cmd = ["du", "-b", "--max-depth=2"]
|
||||
for name in (exclude or []):
|
||||
cmd.append(f"--exclude={name}")
|
||||
cmd.append(target)
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
|
|
|
|||
|
|
@ -86,15 +86,19 @@ def _classify_one(filepath):
|
|||
return "unknown", desc
|
||||
|
||||
|
||||
def classify_files(target, show_hidden=False):
|
||||
def classify_files(target, show_hidden=False, exclude=None, on_file=None):
|
||||
exclude = exclude or []
|
||||
"""Walk the target directory and classify every file.
|
||||
|
||||
Returns a list of dicts: {path, name, category, size, description}.
|
||||
on_file(path) is called after each file is classified, if provided.
|
||||
"""
|
||||
results = []
|
||||
for root, dirs, files in os.walk(target):
|
||||
dirs[:] = [d for d in dirs
|
||||
if d not in exclude
|
||||
and (show_hidden or not d.startswith("."))]
|
||||
if not show_hidden:
|
||||
dirs[:] = [d for d in dirs if not d.startswith(".")]
|
||||
files = [f for f in files if not f.startswith(".")]
|
||||
for fname in files:
|
||||
full = os.path.join(root, fname)
|
||||
|
|
@ -112,6 +116,8 @@ def classify_files(target, show_hidden=False):
|
|||
"size": size,
|
||||
"description": desc,
|
||||
})
|
||||
if on_file:
|
||||
on_file(full)
|
||||
return results
|
||||
|
||||
|
||||
|
|
|
|||
93
luminos_lib/prompts.py
Normal file
93
luminos_lib/prompts.py
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
"""System prompt templates for the Luminos agent loops."""
|
||||
|
||||
_DIR_SYSTEM_PROMPT = """\
|
||||
You are an expert analyst investigating a SINGLE directory on a file system.
|
||||
Do NOT assume the type of content before investigating. Discover what this
|
||||
directory contains from what you find.
|
||||
|
||||
## Your Task
|
||||
Investigate the directory: {dir_path}
|
||||
(relative to target: {dir_rel})
|
||||
|
||||
You must:
|
||||
1. Read the important files in THIS directory (not subdirectories)
|
||||
2. For each file you read, call write_cache to save a summary
|
||||
3. Call write_cache for the directory itself with a synthesis
|
||||
4. Call submit_report with a 1-3 sentence summary
|
||||
|
||||
## Tools
|
||||
parse_structure gives you the skeleton of a file. It does NOT replace \
|
||||
reading the file. Use parse_structure first to understand structure, then \
|
||||
use read_file if you need to verify intent, check for anomalies, or \
|
||||
understand content that structure cannot capture (comments, documentation, \
|
||||
data files, config values). A file where structure and content appear to \
|
||||
contradict each other is always worth reading in full.
|
||||
|
||||
Use the think tool when choosing which file or directory to investigate \
|
||||
next — before starting a new file or switching investigation direction. \
|
||||
Do NOT call think before every individual tool call in a sequence.
|
||||
|
||||
Use the checkpoint tool after completing investigation of a meaningful \
|
||||
cluster of files. Not after every file — once or twice per directory \
|
||||
loop at most.
|
||||
|
||||
Use the flag tool immediately when you find something notable, \
|
||||
surprising, or concerning. Severity guide:
|
||||
info = interesting but not problematic
|
||||
concern = worth addressing
|
||||
critical = likely broken or dangerous
|
||||
|
||||
## Step Numbering
|
||||
Number your investigation steps as you go. Before starting each new \
|
||||
file cluster or phase transition, output:
|
||||
Step N: <what you are doing and why>
|
||||
Output this as plain text before tool calls, not as a tool call itself.
|
||||
|
||||
## Efficiency Rules
|
||||
- Batch multiple tool calls in a single turn whenever possible
|
||||
- Skip binary/compiled/generated files (.pyc, .class, .o, .min.js, etc.)
|
||||
- Skip files >100KB unless uniquely important
|
||||
- Prioritize: README, index, main, config, schema, manifest files
|
||||
- For source files: try parse_structure first, then read_file if needed
|
||||
- If read_file returns truncated content, use a larger max_bytes or
|
||||
run_command('tail ...') — NEVER retry the identical call
|
||||
- You have only {max_turns} turns — be efficient
|
||||
|
||||
## Cache Schemas
|
||||
File: {{path, relative_path, size_bytes, category, summary, notable,
|
||||
notable_reason, cached_at}}
|
||||
Dir: {{path, relative_path, child_count, summary, dominant_category,
|
||||
notable_files, cached_at}}
|
||||
|
||||
category values: source, config, data, document, media, archive, unknown
|
||||
|
||||
## Context
|
||||
{context}
|
||||
|
||||
## Child Directory Summaries (already investigated)
|
||||
{child_summaries}"""
|
||||
|
||||
_SYNTHESIS_SYSTEM_PROMPT = """\
|
||||
You are an expert analyst synthesizing a final report about a directory tree.
|
||||
ALL directory summaries are provided below — you do NOT need to call
|
||||
list_cache or read_cache. Just read the summaries and call submit_report
|
||||
immediately in your first turn.
|
||||
|
||||
Do NOT assume the type of content. Let the summaries speak for themselves.
|
||||
|
||||
## Your Goal
|
||||
Produce two outputs via the submit_report tool:
|
||||
1. **brief**: A 2-4 sentence summary of what this directory tree is.
|
||||
2. **detailed**: A thorough breakdown covering purpose, structure, key
|
||||
components, technologies, notable patterns, and any concerns.
|
||||
|
||||
## Rules
|
||||
- ALL summaries are below — call submit_report directly
|
||||
- Be specific — reference actual directory and file names
|
||||
- Do NOT call list_cache or read_cache
|
||||
|
||||
## Target
|
||||
{target}
|
||||
|
||||
## Directory Summaries
|
||||
{summaries_text}"""
|
||||
|
|
@ -5,7 +5,7 @@ import os
|
|||
from datetime import datetime
|
||||
|
||||
|
||||
def find_recent_files(target, n=10, show_hidden=False):
|
||||
def find_recent_files(target, n=10, show_hidden=False, exclude=None):
|
||||
"""Find the n most recently modified files using find and stat.
|
||||
|
||||
Returns a list of dicts: {path, name, modified, modified_human}.
|
||||
|
|
@ -14,6 +14,9 @@ def find_recent_files(target, n=10, show_hidden=False):
|
|||
cmd = ["find", target, "-type", "f"]
|
||||
if not show_hidden:
|
||||
cmd.extend(["-not", "-path", "*/.*"])
|
||||
for name in (exclude or []):
|
||||
cmd.extend(["-not", "-path", f"*/{name}/*",
|
||||
"-not", "-path", f"*/{name}"])
|
||||
cmd.extend(["-printf", "%T@\t%p\n"])
|
||||
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -3,7 +3,8 @@
|
|||
import os
|
||||
|
||||
|
||||
def build_tree(path, max_depth=3, show_hidden=False, _depth=0):
|
||||
def build_tree(path, max_depth=3, show_hidden=False, exclude=None, _depth=0):
|
||||
exclude = exclude or []
|
||||
"""Build a nested dict representing the directory tree with file sizes."""
|
||||
name = os.path.basename(path) or path
|
||||
node = {"name": name, "path": path, "type": "directory", "children": []}
|
||||
|
|
@ -17,10 +18,12 @@ def build_tree(path, max_depth=3, show_hidden=False, _depth=0):
|
|||
for entry in entries:
|
||||
if not show_hidden and entry.startswith("."):
|
||||
continue
|
||||
if entry in exclude:
|
||||
continue
|
||||
full = os.path.join(path, entry)
|
||||
if os.path.isdir(full):
|
||||
if _depth < max_depth:
|
||||
child = build_tree(full, max_depth, show_hidden, _depth + 1)
|
||||
child = build_tree(full, max_depth, show_hidden, exclude, _depth + 1)
|
||||
node["children"].append(child)
|
||||
else:
|
||||
node["children"].append({
|
||||
|
|
|
|||
Loading…
Reference in a new issue