merge: extract AST parser module

This commit is contained in:
Jeff Smith 2026-03-30 14:34:06 -06:00
commit 5c6124a715
2 changed files with 316 additions and 208 deletions

View file

@ -19,12 +19,7 @@ from datetime import datetime, timezone
import anthropic
import magic
import tree_sitter
import tree_sitter_python
import tree_sitter_javascript
import tree_sitter_rust
import tree_sitter_go
from luminos_lib.ast_parser import parse_structure
from luminos_lib.cache import _CacheManager, _get_investigation_id
from luminos_lib.capabilities import check_ai_dependencies
@ -48,33 +43,6 @@ _SKIP_DIRS = {
# Commands the run_command tool is allowed to execute.
_COMMAND_WHITELIST = {"wc", "file", "grep", "head", "tail", "stat", "du", "find"}
# tree-sitter language registry: extension → (grammar_module, language_name)
_TS_LANGUAGES = {
".py": (tree_sitter_python, "python"),
".js": (tree_sitter_javascript, "javascript"),
".jsx": (tree_sitter_javascript, "javascript"),
".mjs": (tree_sitter_javascript, "javascript"),
".rs": (tree_sitter_rust, "rust"),
".go": (tree_sitter_go, "go"),
}
# Precompute Language objects once.
_TS_LANG_CACHE = {}
def _get_ts_parser(ext):
"""Return a (Parser, language_name) tuple for a file extension, or None."""
entry = _TS_LANGUAGES.get(ext)
if entry is None:
return None
module, lang_name = entry
if lang_name not in _TS_LANG_CACHE:
_TS_LANG_CACHE[lang_name] = tree_sitter.Language(module.language())
lang = _TS_LANG_CACHE[lang_name]
parser = tree_sitter.Parser(lang)
return parser, lang_name
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
@ -533,181 +501,7 @@ def _tool_parse_structure(args, target, _cache):
path = os.path.join(target, path)
if not _path_is_safe(path, target):
return f"Error: path '{path}' is outside the target directory."
if not os.path.isfile(path):
return f"Error: '{path}' is not a file."
ext = os.path.splitext(path)[1].lower()
ts = _get_ts_parser(ext)
if ts is None:
return f"Error: no grammar for extension '{ext}'. Supported: {', '.join(sorted(_TS_LANGUAGES.keys()))}"
parser, lang_name = ts
try:
with open(path, "rb") as f:
source = f.read()
except OSError as e:
return f"Error reading file: {e}"
tree = parser.parse(source)
root = tree.root_node
source_text = source.decode("utf-8", errors="replace")
lines = source_text.split("\n")
line_count = len(lines)
functions = []
classes = []
imports = []
has_docstrings = False
comment_lines = 0
def _walk(node):
nonlocal has_docstrings, comment_lines
for child in node.children:
ntype = child.type
# Comments
if ntype in ("comment", "line_comment", "block_comment"):
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
# Python
if lang_name == "python":
if ntype == "function_definition":
functions.append(_py_func_sig(child))
elif ntype == "class_definition":
classes.append(_py_class(child))
elif ntype in ("import_statement", "import_from_statement"):
imports.append(child.text.decode("utf-8", errors="replace").strip())
elif ntype == "expression_statement":
first = child.children[0] if child.children else None
if first and first.type == "string":
has_docstrings = True
# JavaScript
elif lang_name == "javascript":
if ntype in ("function_declaration", "arrow_function",
"function"):
functions.append(_js_func_sig(child))
elif ntype == "class_declaration":
classes.append(_js_class(child))
elif ntype in ("import_statement",):
imports.append(child.text.decode("utf-8", errors="replace").strip())
# Rust
elif lang_name == "rust":
if ntype == "function_item":
functions.append(_rust_func_sig(child))
elif ntype in ("struct_item", "enum_item", "impl_item"):
classes.append(_rust_struct(child))
elif ntype == "use_declaration":
imports.append(child.text.decode("utf-8", errors="replace").strip())
# Go
elif lang_name == "go":
if ntype == "function_declaration":
functions.append(_go_func_sig(child))
elif ntype == "type_declaration":
classes.append(_go_type(child))
elif ntype == "import_declaration":
imports.append(child.text.decode("utf-8", errors="replace").strip())
_walk(child)
_walk(root)
code_lines = max(1, line_count - comment_lines)
result = {
"language": lang_name,
"functions": functions[:50],
"classes": classes[:30],
"imports": imports[:30],
"line_count": line_count,
"has_docstrings": has_docstrings,
"has_comments": comment_lines > 0,
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
}
return json.dumps(result, indent=2)
# --- tree-sitter extraction helpers ---
def _child_by_type(node, *types):
for c in node.children:
if c.type in types:
return c
return None
def _text(node):
return node.text.decode("utf-8", errors="replace") if node else ""
def _py_func_sig(node):
name = _text(_child_by_type(node, "identifier"))
params = _text(_child_by_type(node, "parameters"))
ret = _child_by_type(node, "type")
sig = f"{name}{params}"
if ret:
sig += f" -> {_text(ret)}"
return sig
def _py_class(node):
name = _text(_child_by_type(node, "identifier"))
methods = []
body = _child_by_type(node, "block")
if body:
for child in body.children:
if child.type == "function_definition":
methods.append(_py_func_sig(child))
return {"name": name, "methods": methods[:20]}
def _js_func_sig(node):
name = _text(_child_by_type(node, "identifier"))
params = _text(_child_by_type(node, "formal_parameters"))
return f"{name}{params}" if name else f"(anonymous){params}"
def _js_class(node):
name = _text(_child_by_type(node, "identifier"))
methods = []
body = _child_by_type(node, "class_body")
if body:
for child in body.children:
if child.type == "method_definition":
mname = _text(_child_by_type(child, "property_identifier"))
mparams = _text(_child_by_type(child, "formal_parameters"))
methods.append(f"{mname}{mparams}")
return {"name": name, "methods": methods[:20]}
def _rust_func_sig(node):
name = _text(_child_by_type(node, "identifier"))
params = _text(_child_by_type(node, "parameters"))
ret = _child_by_type(node, "type_identifier", "generic_type",
"reference_type", "scoped_type_identifier")
sig = f"{name}{params}"
if ret:
sig += f" -> {_text(ret)}"
return sig
def _rust_struct(node):
name = _text(_child_by_type(node, "type_identifier"))
return {"name": name or _text(node)[:60], "methods": []}
def _go_func_sig(node):
name = _text(_child_by_type(node, "identifier"))
params = _text(_child_by_type(node, "parameter_list"))
return f"{name}{params}"
def _go_type(node):
spec = _child_by_type(node, "type_spec")
name = _text(_child_by_type(spec, "type_identifier")) if spec else ""
return {"name": name or _text(node)[:60], "methods": []}
return parse_structure(path)
def _tool_write_cache(args, _target, cache):

314
luminos_lib/ast_parser.py Normal file
View file

@ -0,0 +1,314 @@
"""AST structure extraction for Luminos using tree-sitter."""
import json
import os
import tree_sitter
import tree_sitter_python
import tree_sitter_javascript
import tree_sitter_rust
import tree_sitter_go
# Extension → (grammar_module, language_name)
_TS_LANGUAGES = {
".py": (tree_sitter_python, "python"),
".js": (tree_sitter_javascript, "javascript"),
".jsx": (tree_sitter_javascript, "javascript"),
".mjs": (tree_sitter_javascript, "javascript"),
".rs": (tree_sitter_rust, "rust"),
".go": (tree_sitter_go, "go"),
}
# Precomputed Language objects.
_TS_LANG_CACHE = {}
def _get_ts_parser(ext):
"""Return a (Parser, language_name) tuple for a file extension, or None."""
entry = _TS_LANGUAGES.get(ext)
if entry is None:
return None
module, lang_name = entry
if lang_name not in _TS_LANG_CACHE:
_TS_LANG_CACHE[lang_name] = tree_sitter.Language(module.language())
lang = _TS_LANG_CACHE[lang_name]
parser = tree_sitter.Parser(lang)
return parser, lang_name
# ---------------------------------------------------------------------------
# Tree-sitter node helpers
# ---------------------------------------------------------------------------
def _child_by_type(node, *types):
for c in node.children:
if c.type in types:
return c
return None
def _text(node):
return node.text.decode("utf-8", errors="replace") if node else ""
# ---------------------------------------------------------------------------
# Per-language handlers: (root_node, source_bytes) -> dict
# ---------------------------------------------------------------------------
def _parse_python(root, source):
functions = []
classes = []
imports = []
has_docstrings = False
comment_lines = 0
def _walk(node):
nonlocal has_docstrings, comment_lines
for child in node.children:
ntype = child.type
if ntype in ("comment", "line_comment", "block_comment"):
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
if ntype == "function_definition":
name = _text(_child_by_type(child, "identifier"))
params = _text(_child_by_type(child, "parameters"))
ret = _child_by_type(child, "type")
sig = f"{name}{params}"
if ret:
sig += f" -> {_text(ret)}"
functions.append(sig)
elif ntype == "class_definition":
name = _text(_child_by_type(child, "identifier"))
methods = []
body = _child_by_type(child, "block")
if body:
for c in body.children:
if c.type == "function_definition":
mname = _text(_child_by_type(c, "identifier"))
mparams = _text(_child_by_type(c, "parameters"))
mret = _child_by_type(c, "type")
msig = f"{mname}{mparams}"
if mret:
msig += f" -> {_text(mret)}"
methods.append(msig)
classes.append({"name": name, "methods": methods[:20]})
elif ntype in ("import_statement", "import_from_statement"):
imports.append(child.text.decode("utf-8", errors="replace").strip())
elif ntype == "expression_statement":
first = child.children[0] if child.children else None
if first and first.type == "string":
has_docstrings = True
_walk(child)
_walk(root)
source_text = source.decode("utf-8", errors="replace")
line_count = len(source_text.split("\n"))
code_lines = max(1, line_count - comment_lines)
return {
"language": "python",
"functions": functions[:50],
"classes": classes[:30],
"imports": imports[:30],
"line_count": line_count,
"has_docstrings": has_docstrings,
"has_comments": comment_lines > 0,
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
}
def _parse_javascript(root, source):
functions = []
classes = []
imports = []
comment_lines = 0
def _walk(node):
nonlocal comment_lines
for child in node.children:
ntype = child.type
if ntype in ("comment", "line_comment", "block_comment"):
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
if ntype in ("function_declaration", "arrow_function", "function"):
name = _text(_child_by_type(child, "identifier"))
params = _text(_child_by_type(child, "formal_parameters"))
functions.append(f"{name}{params}" if name else f"(anonymous){params}")
elif ntype == "class_declaration":
name = _text(_child_by_type(child, "identifier"))
methods = []
body = _child_by_type(child, "class_body")
if body:
for c in body.children:
if c.type == "method_definition":
mname = _text(_child_by_type(c, "property_identifier"))
mparams = _text(_child_by_type(c, "formal_parameters"))
methods.append(f"{mname}{mparams}")
classes.append({"name": name, "methods": methods[:20]})
elif ntype == "import_statement":
imports.append(child.text.decode("utf-8", errors="replace").strip())
_walk(child)
_walk(root)
source_text = source.decode("utf-8", errors="replace")
line_count = len(source_text.split("\n"))
code_lines = max(1, line_count - comment_lines)
return {
"language": "javascript",
"functions": functions[:50],
"classes": classes[:30],
"imports": imports[:30],
"line_count": line_count,
"has_docstrings": False,
"has_comments": comment_lines > 0,
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
}
def _parse_rust(root, source):
functions = []
classes = []
imports = []
comment_lines = 0
def _walk(node):
nonlocal comment_lines
for child in node.children:
ntype = child.type
if ntype in ("comment", "line_comment", "block_comment"):
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
if ntype == "function_item":
name = _text(_child_by_type(child, "identifier"))
params = _text(_child_by_type(child, "parameters"))
ret = _child_by_type(child, "type_identifier", "generic_type",
"reference_type", "scoped_type_identifier")
sig = f"{name}{params}"
if ret:
sig += f" -> {_text(ret)}"
functions.append(sig)
elif ntype in ("struct_item", "enum_item", "impl_item"):
name = _text(_child_by_type(child, "type_identifier"))
classes.append({"name": name or _text(child)[:60], "methods": []})
elif ntype == "use_declaration":
imports.append(child.text.decode("utf-8", errors="replace").strip())
_walk(child)
_walk(root)
source_text = source.decode("utf-8", errors="replace")
line_count = len(source_text.split("\n"))
code_lines = max(1, line_count - comment_lines)
return {
"language": "rust",
"functions": functions[:50],
"classes": classes[:30],
"imports": imports[:30],
"line_count": line_count,
"has_docstrings": False,
"has_comments": comment_lines > 0,
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
}
def _parse_go(root, source):
functions = []
classes = []
imports = []
comment_lines = 0
def _walk(node):
nonlocal comment_lines
for child in node.children:
ntype = child.type
if ntype in ("comment", "line_comment", "block_comment"):
comment_lines += child.text.decode("utf-8", errors="replace").count("\n") + 1
if ntype == "function_declaration":
name = _text(_child_by_type(child, "identifier"))
params = _text(_child_by_type(child, "parameter_list"))
functions.append(f"{name}{params}")
elif ntype == "type_declaration":
spec = _child_by_type(child, "type_spec")
name = _text(_child_by_type(spec, "type_identifier")) if spec else ""
classes.append({"name": name or _text(child)[:60], "methods": []})
elif ntype == "import_declaration":
imports.append(child.text.decode("utf-8", errors="replace").strip())
_walk(child)
_walk(root)
source_text = source.decode("utf-8", errors="replace")
line_count = len(source_text.split("\n"))
code_lines = max(1, line_count - comment_lines)
return {
"language": "go",
"functions": functions[:50],
"classes": classes[:30],
"imports": imports[:30],
"line_count": line_count,
"has_docstrings": False,
"has_comments": comment_lines > 0,
"comment_to_code_ratio": round(comment_lines / code_lines, 2),
}
# ---------------------------------------------------------------------------
# Language handler registry
# ---------------------------------------------------------------------------
_LANGUAGE_HANDLERS = {
"python": _parse_python,
"javascript": _parse_javascript,
"rust": _parse_rust,
"go": _parse_go,
}
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def parse_structure(path):
"""Parse a source file and return its structural skeleton as a JSON string.
Takes an absolute path. Returns a JSON string of the structure dict,
or an error string if parsing fails or the language is unsupported.
"""
if not os.path.isfile(path):
return f"Error: '{path}' is not a file."
ext = os.path.splitext(path)[1].lower()
ts = _get_ts_parser(ext)
if ts is None:
return (f"Error: no grammar for extension '{ext}'. "
f"Supported: {', '.join(sorted(_TS_LANGUAGES.keys()))}")
parser, lang_name = ts
handler = _LANGUAGE_HANDLERS.get(lang_name)
if handler is None:
return f"Error: no handler for language '{lang_name}'."
try:
with open(path, "rb") as f:
source = f.read()
except OSError as e:
return f"Error reading file: {e}"
tree = parser.parse(source)
result = handler(tree.root_node, source)
return json.dumps(result, indent=2)