Compare commits

...

10 commits

Author SHA1 Message Date
Jeff Smith
d323190866 merge: add -x/--exclude flag for directory exclusion 2026-04-06 14:32:17 -06:00
Jeff Smith
78f9a396dd feat: add -x/--exclude flag to exclude directories from scan and AI analysis 2026-04-06 14:32:12 -06:00
Jeff Smith
78f80c31ed merge: in-place per-file progress for scan steps 2026-04-06 14:26:40 -06:00
Jeff Smith
206d2d34f6 feat: in-place per-file progress for classify, count, and large-file steps 2026-04-06 14:26:37 -06:00
Jeff Smith
bbaf387cb7 merge: add progress output to base scan steps 2026-04-06 14:21:19 -06:00
Jeff Smith
ebc6b852f1 feat: add progress output to base scan steps 2026-04-06 14:21:17 -06:00
Jeff Smith
33df555a8c merge: extract system prompts module 2026-03-30 14:44:57 -06:00
Jeff Smith
ea8c07a692 refactor: extract system prompts into luminos_lib/prompts.py
Moves _DIR_SYSTEM_PROMPT and _SYNTHESIS_SYSTEM_PROMPT from ai.py into
a dedicated prompts module. Both are pure template strings with .format()
placeholders — no runtime imports needed in prompts.py. Prompt content
is byte-for-byte identical to the original.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 14:44:45 -06:00
Jeff Smith
5c6124a715 merge: extract AST parser module 2026-03-30 14:34:06 -06:00
Jeff Smith
0c49da23ab refactor: extract AST parsing into luminos_lib/ast_parser.py
Moves all tree-sitter parsing logic from ai.py into a dedicated module.
Replaces the if/elif language chain with a _LANGUAGE_HANDLERS registry
mapping language names to handler functions.

Extracted: _tool_parse_structure body, _get_ts_parser, _child_by_type,
_text, and all per-language helpers (_py_func_sig, _py_class, etc.).
ai.py retains a thin wrapper for path validation.

Public API: parse_structure(path) -> JSON string

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 14:34:02 -06:00
9 changed files with 510 additions and 326 deletions

View file

@ -3,8 +3,9 @@
import argparse
import json
import sys
import os
import shutil
import sys
from luminos_lib.tree import build_tree, render_tree
from luminos_lib.filetypes import classify_files, summarize_categories
@ -15,29 +16,67 @@ from luminos_lib.watch import watch_loop
from luminos_lib.report import format_report
def scan(target, depth=3, show_hidden=False):
def _progress(label):
"""Return (on_file, finish) for in-place per-file progress on stderr.
on_file(path) overwrites the current line with the label and truncated path.
finish() finalises the line with a newline.
"""
cols = shutil.get_terminal_size((80, 20)).columns
prefix = f" [scan] {label}... "
available = max(cols - len(prefix), 10)
def on_file(path):
rel = os.path.relpath(path)
if len(rel) > available:
rel = "..." + rel[-(available - 3):]
print(f"\r{prefix}{rel}\033[K", end="", file=sys.stderr, flush=True)
def finish():
print(f"\r{prefix}done\033[K", file=sys.stderr, flush=True)
return on_file, finish
def scan(target, depth=3, show_hidden=False, exclude=None):
"""Run all analyses on the target directory and return a report dict."""
report = {}
tree = build_tree(target, max_depth=depth, show_hidden=show_hidden)
exclude = exclude or []
print(f" [scan] Building directory tree (depth={depth})...", file=sys.stderr)
tree = build_tree(target, max_depth=depth, show_hidden=show_hidden,
exclude=exclude)
report["tree"] = tree
report["tree_rendered"] = render_tree(tree)
classified = classify_files(target, show_hidden=show_hidden)
on_file, finish = _progress("Classifying files")
classified = classify_files(target, show_hidden=show_hidden,
exclude=exclude, on_file=on_file)
finish()
report["file_categories"] = summarize_categories(classified)
report["classified_files"] = classified
languages, loc = detect_languages(classified)
on_file, finish = _progress("Counting lines")
languages, loc = detect_languages(classified, on_file=on_file)
finish()
report["languages"] = languages
report["lines_of_code"] = loc
report["large_files"] = find_large_files(classified)
report["recent_files"] = find_recent_files(target, show_hidden=show_hidden)
on_file, finish = _progress("Checking for large files")
report["large_files"] = find_large_files(classified, on_file=on_file)
finish()
usage = get_disk_usage(target, show_hidden=show_hidden)
print(" [scan] Finding recently modified files...", file=sys.stderr)
report["recent_files"] = find_recent_files(target, show_hidden=show_hidden,
exclude=exclude)
print(" [scan] Calculating disk usage...", file=sys.stderr)
usage = get_disk_usage(target, show_hidden=show_hidden, exclude=exclude)
report["disk_usage"] = usage
report["top_directories"] = top_directories(usage, n=5)
print(" [scan] Base scan complete.", file=sys.stderr)
return report
@ -67,6 +106,10 @@ def main():
help="Force a new AI investigation (ignore cached results)")
parser.add_argument("--install-extras", action="store_true",
help="Show status of optional AI dependencies")
parser.add_argument("-x", "--exclude", metavar="DIR", action="append",
default=[],
help="Exclude a directory name from scan and analysis "
"(repeatable, e.g. -x .git -x node_modules)")
args = parser.parse_args()
@ -92,17 +135,22 @@ def main():
file=sys.stderr)
sys.exit(1)
if args.exclude:
print(f" [scan] Excluding: {', '.join(args.exclude)}", file=sys.stderr)
if args.watch:
watch_loop(target, depth=args.depth, show_hidden=args.all,
json_output=args.json_output)
return
report = scan(target, depth=args.depth, show_hidden=args.all)
report = scan(target, depth=args.depth, show_hidden=args.all,
exclude=args.exclude)
flags = []
if args.ai:
from luminos_lib.ai import analyze_directory
brief, detailed, flags = analyze_directory(report, target, fresh=args.fresh)
brief, detailed, flags = analyze_directory(
report, target, fresh=args.fresh, exclude=args.exclude)
report["ai_brief"] = brief
report["ai_detailed"] = detailed
report["flags"] = flags

View file

@ -19,14 +19,10 @@ from datetime import datetime, timezone
import anthropic
import magic
import tree_sitter
import tree_sitter_python
import tree_sitter_javascript
import tree_sitter_rust
import tree_sitter_go
from luminos_lib.ast_parser import parse_structure
from luminos_lib.cache import _CacheManager, _get_investigation_id
from luminos_lib.capabilities import check_ai_dependencies
from luminos_lib.prompts import _DIR_SYSTEM_PROMPT, _SYNTHESIS_SYSTEM_PROMPT
MODEL = "claude-sonnet-4-20250514"
@ -48,33 +44,6 @@ _SKIP_DIRS = {
# Commands the run_command tool is allowed to execute.
_COMMAND_WHITELIST = {"wc", "file", "grep", "head", "tail", "stat", "du", "find"}
# tree-sitter language registry: extension → (grammar_module, language_name)
_TS_LANGUAGES = {
".py": (tree_sitter_python, "python"),
".js": (tree_sitter_javascript, "javascript"),
".jsx": (tree_sitter_javascript, "javascript"),
".mjs": (tree_sitter_javascript, "javascript"),
".rs": (tree_sitter_rust, "rust"),
".go": (tree_sitter_go, "go"),
}
# Precompute Language objects once.
_TS_LANG_CACHE = {}
def _get_ts_parser(ext):
"""Return a (Parser, language_name) tuple for a file extension, or None."""
entry = _TS_LANGUAGES.get(ext)
if entry is None:
return None
module, lang_name = entry
if lang_name not in _TS_LANG_CACHE:
_TS_LANG_CACHE[lang_name] = tree_sitter.Language(module.language())
lang = _TS_LANG_CACHE[lang_name]
parser = tree_sitter.Parser(lang)
return parser, lang_name
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
@ -533,181 +502,7 @@ def _tool_parse_structure(args, target, _cache):
path = os.path.join(target, path)
if not _path_is_safe(path, target):
return f"Error: path '{path}' is outside the target directory."
if not os.path.isfile(path):
return f"Error: '{path}' is not a file."
ext = os.path.splitext(path)[1].lower()
ts = _get_ts_parser(ext)
if ts is None:
return f"Error: no grammar for extension '{ext}'. Supported: {', '.join(sorted(_TS_LANGUAGES.keys()))}"
parser, lang_name = ts
try:
with open(path, "rb") as f:
source = f.read()
except OSError as e:
return f"Error reading file: {e}"
tree = parser.parse(source)
root = tree.root_node
source_text = source.decode("utf-8", errors="replace")
lines = source_text.split("\n")
line_count = len(lines)
functions = []
classes = []
imports = []
has_docstrings = False
comment_lines = 0
def _walk(node):
nonlocal has_docstrings, comment_lines
for child in node.children:
ntype = child.type
# Comments
if ntype in ("comment", "line_comment", "block_comment"):
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
# Python
if lang_name == "python":
if ntype == "function_definition":
functions.append(_py_func_sig(child))
elif ntype == "class_definition":
classes.append(_py_class(child))
elif ntype in ("import_statement", "import_from_statement"):
imports.append(child.text.decode("utf-8", errors="replace").strip())
elif ntype == "expression_statement":
first = child.children[0] if child.children else None
if first and first.type == "string":
has_docstrings = True
# JavaScript
elif lang_name == "javascript":
if ntype in ("function_declaration", "arrow_function",
"function"):
functions.append(_js_func_sig(child))
elif ntype == "class_declaration":
classes.append(_js_class(child))
elif ntype in ("import_statement",):
imports.append(child.text.decode("utf-8", errors="replace").strip())
# Rust
elif lang_name == "rust":
if ntype == "function_item":
functions.append(_rust_func_sig(child))
elif ntype in ("struct_item", "enum_item", "impl_item"):
classes.append(_rust_struct(child))
elif ntype == "use_declaration":
imports.append(child.text.decode("utf-8", errors="replace").strip())
# Go
elif lang_name == "go":
if ntype == "function_declaration":
functions.append(_go_func_sig(child))
elif ntype == "type_declaration":
classes.append(_go_type(child))
elif ntype == "import_declaration":
imports.append(child.text.decode("utf-8", errors="replace").strip())
_walk(child)
_walk(root)
code_lines = max(1, line_count - comment_lines)
result = {
"language": lang_name,
"functions": functions[:50],
"classes": classes[:30],
"imports": imports[:30],
"line_count": line_count,
"has_docstrings": has_docstrings,
"has_comments": comment_lines > 0,
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
}
return json.dumps(result, indent=2)
# --- tree-sitter extraction helpers ---
def _child_by_type(node, *types):
for c in node.children:
if c.type in types:
return c
return None
def _text(node):
return node.text.decode("utf-8", errors="replace") if node else ""
def _py_func_sig(node):
name = _text(_child_by_type(node, "identifier"))
params = _text(_child_by_type(node, "parameters"))
ret = _child_by_type(node, "type")
sig = f"{name}{params}"
if ret:
sig += f" -> {_text(ret)}"
return sig
def _py_class(node):
name = _text(_child_by_type(node, "identifier"))
methods = []
body = _child_by_type(node, "block")
if body:
for child in body.children:
if child.type == "function_definition":
methods.append(_py_func_sig(child))
return {"name": name, "methods": methods[:20]}
def _js_func_sig(node):
name = _text(_child_by_type(node, "identifier"))
params = _text(_child_by_type(node, "formal_parameters"))
return f"{name}{params}" if name else f"(anonymous){params}"
def _js_class(node):
name = _text(_child_by_type(node, "identifier"))
methods = []
body = _child_by_type(node, "class_body")
if body:
for child in body.children:
if child.type == "method_definition":
mname = _text(_child_by_type(child, "property_identifier"))
mparams = _text(_child_by_type(child, "formal_parameters"))
methods.append(f"{mname}{mparams}")
return {"name": name, "methods": methods[:20]}
def _rust_func_sig(node):
name = _text(_child_by_type(node, "identifier"))
params = _text(_child_by_type(node, "parameters"))
ret = _child_by_type(node, "type_identifier", "generic_type",
"reference_type", "scoped_type_identifier")
sig = f"{name}{params}"
if ret:
sig += f" -> {_text(ret)}"
return sig
def _rust_struct(node):
name = _text(_child_by_type(node, "type_identifier"))
return {"name": name or _text(node)[:60], "methods": []}
def _go_func_sig(node):
name = _text(_child_by_type(node, "identifier"))
params = _text(_child_by_type(node, "parameter_list"))
return f"{name}{params}"
def _go_type(node):
spec = _child_by_type(node, "type_spec")
name = _text(_child_by_type(spec, "type_identifier")) if spec else ""
return {"name": name or _text(node)[:60], "methods": []}
return parse_structure(path)
def _tool_write_cache(args, _target, cache):
@ -848,14 +643,16 @@ def _call_api_streaming(client, system, messages, tools, tracker):
# Directory discovery
# ---------------------------------------------------------------------------
def _discover_directories(target, show_hidden=False):
def _discover_directories(target, show_hidden=False, exclude=None):
"""Walk the target and return all directories sorted leaves-first."""
extra = set(exclude or [])
dirs = []
target_real = os.path.realpath(target)
for root, subdirs, _files in os.walk(target_real, topdown=True):
subdirs[:] = [
d for d in subdirs
if not _should_skip_dir(d)
and d not in extra
and (show_hidden or not d.startswith("."))
]
dirs.append(root)
@ -867,74 +664,6 @@ def _discover_directories(target, show_hidden=False):
# Per-directory agent loop
# ---------------------------------------------------------------------------
_DIR_SYSTEM_PROMPT = """\
You are an expert analyst investigating a SINGLE directory on a file system.
Do NOT assume the type of content before investigating. Discover what this
directory contains from what you find.
## Your Task
Investigate the directory: {dir_path}
(relative to target: {dir_rel})
You must:
1. Read the important files in THIS directory (not subdirectories)
2. For each file you read, call write_cache to save a summary
3. Call write_cache for the directory itself with a synthesis
4. Call submit_report with a 1-3 sentence summary
## Tools
parse_structure gives you the skeleton of a file. It does NOT replace \
reading the file. Use parse_structure first to understand structure, then \
use read_file if you need to verify intent, check for anomalies, or \
understand content that structure cannot capture (comments, documentation, \
data files, config values). A file where structure and content appear to \
contradict each other is always worth reading in full.
Use the think tool when choosing which file or directory to investigate \
next before starting a new file or switching investigation direction. \
Do NOT call think before every individual tool call in a sequence.
Use the checkpoint tool after completing investigation of a meaningful \
cluster of files. Not after every file once or twice per directory \
loop at most.
Use the flag tool immediately when you find something notable, \
surprising, or concerning. Severity guide:
info = interesting but not problematic
concern = worth addressing
critical = likely broken or dangerous
## Step Numbering
Number your investigation steps as you go. Before starting each new \
file cluster or phase transition, output:
Step N: <what you are doing and why>
Output this as plain text before tool calls, not as a tool call itself.
## Efficiency Rules
- Batch multiple tool calls in a single turn whenever possible
- Skip binary/compiled/generated files (.pyc, .class, .o, .min.js, etc.)
- Skip files >100KB unless uniquely important
- Prioritize: README, index, main, config, schema, manifest files
- For source files: try parse_structure first, then read_file if needed
- If read_file returns truncated content, use a larger max_bytes or
run_command('tail ...') NEVER retry the identical call
- You have only {max_turns} turns be efficient
## Cache Schemas
File: {{path, relative_path, size_bytes, category, summary, notable,
notable_reason, cached_at}}
Dir: {{path, relative_path, child_count, summary, dominant_category,
notable_files, cached_at}}
category values: source, config, data, document, media, archive, unknown
## Context
{context}
## Child Directory Summaries (already investigated)
{child_summaries}"""
def _build_dir_context(dir_path):
lines = []
try:
@ -1144,32 +873,6 @@ def _block_to_dict(block):
# Synthesis pass
# ---------------------------------------------------------------------------
_SYNTHESIS_SYSTEM_PROMPT = """\
You are an expert analyst synthesizing a final report about a directory tree.
ALL directory summaries are provided below you do NOT need to call
list_cache or read_cache. Just read the summaries and call submit_report
immediately in your first turn.
Do NOT assume the type of content. Let the summaries speak for themselves.
## Your Goal
Produce two outputs via the submit_report tool:
1. **brief**: A 2-4 sentence summary of what this directory tree is.
2. **detailed**: A thorough breakdown covering purpose, structure, key
components, technologies, notable patterns, and any concerns.
## Rules
- ALL summaries are below call submit_report directly
- Be specific reference actual directory and file names
- Do NOT call list_cache or read_cache
## Target
{target}
## Directory Summaries
{summaries_text}"""
def _run_synthesis(client, target, cache, tracker, max_turns=5, verbose=False):
"""Run the final synthesis pass. Returns (brief, detailed)."""
dir_entries = cache.read_all_entries("dir")
@ -1300,7 +1003,7 @@ def _synthesize_from_cache(cache):
# ---------------------------------------------------------------------------
def _run_investigation(client, target, report, show_hidden=False,
fresh=False, verbose=False):
fresh=False, verbose=False, exclude=None):
"""Orchestrate the multi-pass investigation. Returns (brief, detailed, flags)."""
investigation_id, is_new = _get_investigation_id(target, fresh=fresh)
cache = _CacheManager(investigation_id, target)
@ -1313,7 +1016,8 @@ def _run_investigation(client, target, report, show_hidden=False,
f"{'' if is_new else ' (resumed)'}", file=sys.stderr)
print(f" [AI] Cache: {cache.root}/", file=sys.stderr)
all_dirs = _discover_directories(target, show_hidden=show_hidden)
all_dirs = _discover_directories(target, show_hidden=show_hidden,
exclude=exclude)
to_investigate = []
cached_count = 0
@ -1386,7 +1090,8 @@ def _run_investigation(client, target, report, show_hidden=False,
# Public interface
# ---------------------------------------------------------------------------
def analyze_directory(report, target, verbose_tools=False, fresh=False):
def analyze_directory(report, target, verbose_tools=False, fresh=False,
exclude=None):
"""Run AI analysis on the directory. Returns (brief, detailed, flags).
Returns ("", "", []) if the API key is missing or dependencies are not met.
@ -1405,6 +1110,7 @@ def analyze_directory(report, target, verbose_tools=False, fresh=False):
try:
brief, detailed, flags = _run_investigation(
client, target, report, fresh=fresh, verbose=verbose_tools,
exclude=exclude,
)
except Exception as e:
print(f"Warning: AI analysis failed: {e}", file=sys.stderr)

314
luminos_lib/ast_parser.py Normal file
View file

@ -0,0 +1,314 @@
"""AST structure extraction for Luminos using tree-sitter."""
import json
import os
import tree_sitter
import tree_sitter_python
import tree_sitter_javascript
import tree_sitter_rust
import tree_sitter_go
# Extension → (grammar_module, language_name)
_TS_LANGUAGES = {
".py": (tree_sitter_python, "python"),
".js": (tree_sitter_javascript, "javascript"),
".jsx": (tree_sitter_javascript, "javascript"),
".mjs": (tree_sitter_javascript, "javascript"),
".rs": (tree_sitter_rust, "rust"),
".go": (tree_sitter_go, "go"),
}
# Precomputed Language objects.
_TS_LANG_CACHE = {}
def _get_ts_parser(ext):
"""Return a (Parser, language_name) tuple for a file extension, or None."""
entry = _TS_LANGUAGES.get(ext)
if entry is None:
return None
module, lang_name = entry
if lang_name not in _TS_LANG_CACHE:
_TS_LANG_CACHE[lang_name] = tree_sitter.Language(module.language())
lang = _TS_LANG_CACHE[lang_name]
parser = tree_sitter.Parser(lang)
return parser, lang_name
# ---------------------------------------------------------------------------
# Tree-sitter node helpers
# ---------------------------------------------------------------------------
def _child_by_type(node, *types):
for c in node.children:
if c.type in types:
return c
return None
def _text(node):
return node.text.decode("utf-8", errors="replace") if node else ""
# ---------------------------------------------------------------------------
# Per-language handlers: (root_node, source_bytes) -> dict
# ---------------------------------------------------------------------------
def _parse_python(root, source):
functions = []
classes = []
imports = []
has_docstrings = False
comment_lines = 0
def _walk(node):
nonlocal has_docstrings, comment_lines
for child in node.children:
ntype = child.type
if ntype in ("comment", "line_comment", "block_comment"):
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
if ntype == "function_definition":
name = _text(_child_by_type(child, "identifier"))
params = _text(_child_by_type(child, "parameters"))
ret = _child_by_type(child, "type")
sig = f"{name}{params}"
if ret:
sig += f" -> {_text(ret)}"
functions.append(sig)
elif ntype == "class_definition":
name = _text(_child_by_type(child, "identifier"))
methods = []
body = _child_by_type(child, "block")
if body:
for c in body.children:
if c.type == "function_definition":
mname = _text(_child_by_type(c, "identifier"))
mparams = _text(_child_by_type(c, "parameters"))
mret = _child_by_type(c, "type")
msig = f"{mname}{mparams}"
if mret:
msig += f" -> {_text(mret)}"
methods.append(msig)
classes.append({"name": name, "methods": methods[:20]})
elif ntype in ("import_statement", "import_from_statement"):
imports.append(child.text.decode("utf-8", errors="replace").strip())
elif ntype == "expression_statement":
first = child.children[0] if child.children else None
if first and first.type == "string":
has_docstrings = True
_walk(child)
_walk(root)
source_text = source.decode("utf-8", errors="replace")
line_count = len(source_text.split("\n"))
code_lines = max(1, line_count - comment_lines)
return {
"language": "python",
"functions": functions[:50],
"classes": classes[:30],
"imports": imports[:30],
"line_count": line_count,
"has_docstrings": has_docstrings,
"has_comments": comment_lines > 0,
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
}
def _parse_javascript(root, source):
functions = []
classes = []
imports = []
comment_lines = 0
def _walk(node):
nonlocal comment_lines
for child in node.children:
ntype = child.type
if ntype in ("comment", "line_comment", "block_comment"):
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
if ntype in ("function_declaration", "arrow_function", "function"):
name = _text(_child_by_type(child, "identifier"))
params = _text(_child_by_type(child, "formal_parameters"))
functions.append(f"{name}{params}" if name else f"(anonymous){params}")
elif ntype == "class_declaration":
name = _text(_child_by_type(child, "identifier"))
methods = []
body = _child_by_type(child, "class_body")
if body:
for c in body.children:
if c.type == "method_definition":
mname = _text(_child_by_type(c, "property_identifier"))
mparams = _text(_child_by_type(c, "formal_parameters"))
methods.append(f"{mname}{mparams}")
classes.append({"name": name, "methods": methods[:20]})
elif ntype == "import_statement":
imports.append(child.text.decode("utf-8", errors="replace").strip())
_walk(child)
_walk(root)
source_text = source.decode("utf-8", errors="replace")
line_count = len(source_text.split("\n"))
code_lines = max(1, line_count - comment_lines)
return {
"language": "javascript",
"functions": functions[:50],
"classes": classes[:30],
"imports": imports[:30],
"line_count": line_count,
"has_docstrings": False,
"has_comments": comment_lines > 0,
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
}
def _parse_rust(root, source):
functions = []
classes = []
imports = []
comment_lines = 0
def _walk(node):
nonlocal comment_lines
for child in node.children:
ntype = child.type
if ntype in ("comment", "line_comment", "block_comment"):
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
if ntype == "function_item":
name = _text(_child_by_type(child, "identifier"))
params = _text(_child_by_type(child, "parameters"))
ret = _child_by_type(child, "type_identifier", "generic_type",
"reference_type", "scoped_type_identifier")
sig = f"{name}{params}"
if ret:
sig += f" -> {_text(ret)}"
functions.append(sig)
elif ntype in ("struct_item", "enum_item", "impl_item"):
name = _text(_child_by_type(child, "type_identifier"))
classes.append({"name": name or _text(child)[:60], "methods": []})
elif ntype == "use_declaration":
imports.append(child.text.decode("utf-8", errors="replace").strip())
_walk(child)
_walk(root)
source_text = source.decode("utf-8", errors="replace")
line_count = len(source_text.split("\n"))
code_lines = max(1, line_count - comment_lines)
return {
"language": "rust",
"functions": functions[:50],
"classes": classes[:30],
"imports": imports[:30],
"line_count": line_count,
"has_docstrings": False,
"has_comments": comment_lines > 0,
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
}
def _parse_go(root, source):
functions = []
classes = []
imports = []
comment_lines = 0
def _walk(node):
nonlocal comment_lines
for child in node.children:
ntype = child.type
if ntype in ("comment", "line_comment", "block_comment"):
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
if ntype == "function_declaration":
name = _text(_child_by_type(child, "identifier"))
params = _text(_child_by_type(child, "parameter_list"))
functions.append(f"{name}{params}")
elif ntype == "type_declaration":
spec = _child_by_type(child, "type_spec")
name = _text(_child_by_type(spec, "type_identifier")) if spec else ""
classes.append({"name": name or _text(child)[:60], "methods": []})
elif ntype == "import_declaration":
imports.append(child.text.decode("utf-8", errors="replace").strip())
_walk(child)
_walk(root)
source_text = source.decode("utf-8", errors="replace")
line_count = len(source_text.split("\n"))
code_lines = max(1, line_count - comment_lines)
return {
"language": "go",
"functions": functions[:50],
"classes": classes[:30],
"imports": imports[:30],
"line_count": line_count,
"has_docstrings": False,
"has_comments": comment_lines > 0,
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
}
# ---------------------------------------------------------------------------
# Language handler registry
# ---------------------------------------------------------------------------
_LANGUAGE_HANDLERS = {
"python": _parse_python,
"javascript": _parse_javascript,
"rust": _parse_rust,
"go": _parse_go,
}
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def parse_structure(path):
"""Parse a source file and return its structural skeleton as a JSON string.
Takes an absolute path. Returns a JSON string of the structure dict,
or an error string if parsing fails or the language is unsupported.
"""
if not os.path.isfile(path):
return f"Error: '{path}' is not a file."
ext = os.path.splitext(path)[1].lower()
ts = _get_ts_parser(ext)
if ts is None:
return (f"Error: no grammar for extension '{ext}'. "
f"Supported: {', '.join(sorted(_TS_LANGUAGES.keys()))}")
parser, lang_name = ts
handler = _LANGUAGE_HANDLERS.get(lang_name)
if handler is None:
return f"Error: no handler for language '{lang_name}'."
try:
with open(path, "rb") as f:
source = f.read()
except OSError as e:
return f"Error reading file: {e}"
tree = parser.parse(source)
result = handler(tree.root_node, source)
return json.dumps(result, indent=2)

View file

@ -34,10 +34,11 @@ def _count_lines(filepath):
return 0
def detect_languages(classified_files):
def detect_languages(classified_files, on_file=None):
"""Detect languages present and count lines of code per language.
Returns (languages_set, loc_by_language).
on_file(path) is called per source file, if provided.
"""
source_files = [f for f in classified_files if f["category"] == "source"]
languages = set()
@ -49,12 +50,17 @@ def detect_languages(classified_files):
languages.add(lang)
lines = _count_lines(f["path"])
loc[lang] = loc.get(lang, 0) + lines
if on_file:
on_file(f["path"])
return sorted(languages), loc
def find_large_files(classified_files):
"""Find files that are unusually large (>1000 lines or >10MB)."""
def find_large_files(classified_files, on_file=None):
"""Find files that are unusually large (>1000 lines or >10MB).
on_file(path) is called per source file checked, if provided.
"""
source_files = [f for f in classified_files if f["category"] == "source"]
large = []
@ -68,5 +74,7 @@ def find_large_files(classified_files):
if reasons:
large.append({"path": f["path"], "name": f["name"],
"reasons": reasons})
if on_file:
on_file(f["path"])
return large

View file

@ -3,12 +3,15 @@
import subprocess
def get_disk_usage(target, show_hidden=False):
def get_disk_usage(target, show_hidden=False, exclude=None):
"""Get per-directory disk usage via du.
Returns a list of dicts: {path, size_bytes, size_human}.
"""
cmd = ["du", "-b", "--max-depth=2", target]
cmd = ["du", "-b", "--max-depth=2"]
for name in (exclude or []):
cmd.append(f"--exclude={name}")
cmd.append(target)
try:
result = subprocess.run(

View file

@ -86,15 +86,19 @@ def _classify_one(filepath):
return "unknown", desc
def classify_files(target, show_hidden=False):
def classify_files(target, show_hidden=False, exclude=None, on_file=None):
exclude = exclude or []
"""Walk the target directory and classify every file.
Returns a list of dicts: {path, name, category, size, description}.
on_file(path) is called after each file is classified, if provided.
"""
results = []
for root, dirs, files in os.walk(target):
dirs[:] = [d for d in dirs
if d not in exclude
and (show_hidden or not d.startswith("."))]
if not show_hidden:
dirs[:] = [d for d in dirs if not d.startswith(".")]
files = [f for f in files if not f.startswith(".")]
for fname in files:
full = os.path.join(root, fname)
@ -112,6 +116,8 @@ def classify_files(target, show_hidden=False):
"size": size,
"description": desc,
})
if on_file:
on_file(full)
return results

93
luminos_lib/prompts.py Normal file
View file

@ -0,0 +1,93 @@
"""System prompt templates for the Luminos agent loops."""
_DIR_SYSTEM_PROMPT = """\
You are an expert analyst investigating a SINGLE directory on a file system.
Do NOT assume the type of content before investigating. Discover what this
directory contains from what you find.
## Your Task
Investigate the directory: {dir_path}
(relative to target: {dir_rel})
You must:
1. Read the important files in THIS directory (not subdirectories)
2. For each file you read, call write_cache to save a summary
3. Call write_cache for the directory itself with a synthesis
4. Call submit_report with a 1-3 sentence summary
## Tools
parse_structure gives you the skeleton of a file. It does NOT replace \
reading the file. Use parse_structure first to understand structure, then \
use read_file if you need to verify intent, check for anomalies, or \
understand content that structure cannot capture (comments, documentation, \
data files, config values). A file where structure and content appear to \
contradict each other is always worth reading in full.
Use the think tool when choosing which file or directory to investigate \
next before starting a new file or switching investigation direction. \
Do NOT call think before every individual tool call in a sequence.
Use the checkpoint tool after completing investigation of a meaningful \
cluster of files. Not after every file once or twice per directory \
loop at most.
Use the flag tool immediately when you find something notable, \
surprising, or concerning. Severity guide:
info = interesting but not problematic
concern = worth addressing
critical = likely broken or dangerous
## Step Numbering
Number your investigation steps as you go. Before starting each new \
file cluster or phase transition, output:
Step N: <what you are doing and why>
Output this as plain text before tool calls, not as a tool call itself.
## Efficiency Rules
- Batch multiple tool calls in a single turn whenever possible
- Skip binary/compiled/generated files (.pyc, .class, .o, .min.js, etc.)
- Skip files >100KB unless uniquely important
- Prioritize: README, index, main, config, schema, manifest files
- For source files: try parse_structure first, then read_file if needed
- If read_file returns truncated content, use a larger max_bytes or
run_command('tail ...') NEVER retry the identical call
- You have only {max_turns} turns be efficient
## Cache Schemas
File: {{path, relative_path, size_bytes, category, summary, notable,
notable_reason, cached_at}}
Dir: {{path, relative_path, child_count, summary, dominant_category,
notable_files, cached_at}}
category values: source, config, data, document, media, archive, unknown
## Context
{context}
## Child Directory Summaries (already investigated)
{child_summaries}"""
_SYNTHESIS_SYSTEM_PROMPT = """\
You are an expert analyst synthesizing a final report about a directory tree.
ALL directory summaries are provided below you do NOT need to call
list_cache or read_cache. Just read the summaries and call submit_report
immediately in your first turn.
Do NOT assume the type of content. Let the summaries speak for themselves.
## Your Goal
Produce two outputs via the submit_report tool:
1. **brief**: A 2-4 sentence summary of what this directory tree is.
2. **detailed**: A thorough breakdown covering purpose, structure, key
components, technologies, notable patterns, and any concerns.
## Rules
- ALL summaries are below call submit_report directly
- Be specific reference actual directory and file names
- Do NOT call list_cache or read_cache
## Target
{target}
## Directory Summaries
{summaries_text}"""

View file

@ -5,7 +5,7 @@ import os
from datetime import datetime
def find_recent_files(target, n=10, show_hidden=False):
def find_recent_files(target, n=10, show_hidden=False, exclude=None):
"""Find the n most recently modified files using find and stat.
Returns a list of dicts: {path, name, modified, modified_human}.
@ -14,6 +14,9 @@ def find_recent_files(target, n=10, show_hidden=False):
cmd = ["find", target, "-type", "f"]
if not show_hidden:
cmd.extend(["-not", "-path", "*/.*"])
for name in (exclude or []):
cmd.extend(["-not", "-path", f"*/{name}/*",
"-not", "-path", f"*/{name}"])
cmd.extend(["-printf", "%T@\t%p\n"])
try:

View file

@ -3,7 +3,8 @@
import os
def build_tree(path, max_depth=3, show_hidden=False, _depth=0):
def build_tree(path, max_depth=3, show_hidden=False, exclude=None, _depth=0):
exclude = exclude or []
"""Build a nested dict representing the directory tree with file sizes."""
name = os.path.basename(path) or path
node = {"name": name, "path": path, "type": "directory", "children": []}
@ -17,10 +18,12 @@ def build_tree(path, max_depth=3, show_hidden=False, _depth=0):
for entry in entries:
if not show_hidden and entry.startswith("."):
continue
if entry in exclude:
continue
full = os.path.join(path, entry)
if os.path.isdir(full):
if _depth < max_depth:
child = build_tree(full, max_depth, show_hidden, _depth + 1)
child = build_tree(full, max_depth, show_hidden, exclude, _depth + 1)
node["children"].append(child)
else:
node["children"].append({