diff --git a/luminos_lib/ai.py b/luminos_lib/ai.py index 61f6944..22ce3f5 100644 --- a/luminos_lib/ai.py +++ b/luminos_lib/ai.py @@ -11,12 +11,10 @@ Uses the Anthropic SDK for streaming, automatic retries, and token counting. Uses tree-sitter for AST parsing and python-magic for file classification. """ -import hashlib import json import os import subprocess import sys -import uuid from datetime import datetime, timezone import anthropic @@ -27,11 +25,10 @@ import tree_sitter_javascript import tree_sitter_rust import tree_sitter_go +from luminos_lib.cache import CACHE_ROOT, _CacheManager, _get_investigation_id from luminos_lib.capabilities import check_ai_dependencies MODEL = "claude-sonnet-4-20250514" -CACHE_ROOT = "/tmp/luminos" -INVESTIGATIONS_PATH = os.path.join(CACHE_ROOT, "investigations.json") # Context budget: trigger early exit at 70% of Sonnet's context window. MAX_CONTEXT = 180_000 @@ -98,11 +95,6 @@ def _path_is_safe(path, target): return real == target_real or real.startswith(target_real + os.sep) -def _sha256_path(path): - """Return a hex SHA-256 of a path string, used as cache key.""" - return hashlib.sha256(path.encode("utf-8")).hexdigest() - - def _now_iso(): return datetime.now(timezone.utc).isoformat() @@ -159,170 +151,6 @@ class _TokenTracker: f"(approx ${cost:.2f})") -# --------------------------------------------------------------------------- -# Investigation ID persistence -# --------------------------------------------------------------------------- - -def _load_investigations(): - try: - with open(INVESTIGATIONS_PATH) as f: - return json.load(f) - except (OSError, json.JSONDecodeError): - return {} - - -def _save_investigations(data): - os.makedirs(CACHE_ROOT, exist_ok=True) - with open(INVESTIGATIONS_PATH, "w") as f: - json.dump(data, f, indent=2) - - -def _get_investigation_id(target, fresh=False): - target_real = os.path.realpath(target) - investigations = _load_investigations() - if not fresh and target_real in investigations: - inv_id = investigations[target_real] - cache_dir = os.path.join(CACHE_ROOT, inv_id) - if os.path.isdir(cache_dir): - return inv_id, False - inv_id = str(uuid.uuid4()) - investigations[target_real] = inv_id - _save_investigations(investigations) - return inv_id, True - - -# --------------------------------------------------------------------------- -# Cache manager -# --------------------------------------------------------------------------- - -class _CacheManager: - """Manages the /tmp/luminos/{investigation_id}/ cache tree.""" - - def __init__(self, investigation_id, target): - self.investigation_id = investigation_id - self.target = os.path.realpath(target) - self.root = os.path.join(CACHE_ROOT, investigation_id) - self.files_dir = os.path.join(self.root, "files") - self.dirs_dir = os.path.join(self.root, "dirs") - self.log_path = os.path.join(self.root, "investigation.log") - self.meta_path = os.path.join(self.root, "meta.json") - os.makedirs(self.files_dir, exist_ok=True) - os.makedirs(self.dirs_dir, exist_ok=True) - - def write_meta(self, model, start_time): - data = { - "investigation_id": self.investigation_id, - "target": self.target, - "start_time": start_time, - "model": model, - "directories_investigated": 0, - "total_turns": 0, - } - with open(self.meta_path, "w") as f: - json.dump(data, f, indent=2) - - def update_meta(self, **kwargs): - try: - with open(self.meta_path) as f: - data = json.load(f) - except (OSError, json.JSONDecodeError): - data = {} - data.update(kwargs) - with open(self.meta_path, "w") as f: - json.dump(data, f, indent=2) - - def log_turn(self, directory, turn, tool_name, tool_args, result_len): - entry = { - "directory": directory, - "turn": turn, - "timestamp": _now_iso(), - "tool": tool_name, - "args": tool_args, - "result_length": result_len, - } - with open(self.log_path, "a") as f: - f.write(json.dumps(entry) + "\n") - - def _cache_path(self, cache_type, path): - subdir = self.files_dir if cache_type == "file" else self.dirs_dir - return os.path.join(subdir, _sha256_path(path) + ".json") - - def _cache_safe(self, cache_file): - real = os.path.realpath(cache_file) - root_real = os.path.realpath(self.root) - return real.startswith(root_real + os.sep) - - def write_entry(self, cache_type, path, data): - cache_file = self._cache_path(cache_type, path) - if not self._cache_safe(cache_file): - return "Error: cache path escapes cache root." - required = {"path", "summary", "cached_at"} - if cache_type == "file": - required |= {"relative_path", "size_bytes", "category"} - elif cache_type == "dir": - required |= {"relative_path", "child_count", "dominant_category"} - missing = required - set(data.keys()) - if missing: - return f"Error: missing required fields: {', '.join(sorted(missing))}" - if "content" in data or "contents" in data or "raw" in data: - return "Error: cache entries must not contain raw file contents." - try: - with open(cache_file, "w") as f: - json.dump(data, f, indent=2) - return "ok" - except OSError as e: - return f"Error writing cache: {e}" - - def read_entry(self, cache_type, path): - cache_file = self._cache_path(cache_type, path) - if not self._cache_safe(cache_file): - return None - try: - with open(cache_file) as f: - return json.load(f) - except (OSError, json.JSONDecodeError): - return None - - def has_entry(self, cache_type, path): - cache_file = self._cache_path(cache_type, path) - return os.path.exists(cache_file) - - def list_entries(self, cache_type): - subdir = self.files_dir if cache_type == "file" else self.dirs_dir - result = [] - try: - for name in sorted(os.listdir(subdir)): - if not name.endswith(".json"): - continue - fpath = os.path.join(subdir, name) - try: - with open(fpath) as f: - data = json.load(f) - result.append(data.get("relative_path", data.get("path", name))) - except (OSError, json.JSONDecodeError): - continue - except OSError: - pass - return result - - def read_all_entries(self, cache_type): - subdir = self.files_dir if cache_type == "file" else self.dirs_dir - result = [] - try: - for name in sorted(os.listdir(subdir)): - if not name.endswith(".json"): - continue - fpath = os.path.join(subdir, name) - try: - with open(fpath) as f: - result.append(json.load(f)) - except (OSError, json.JSONDecodeError): - continue - except OSError: - pass - return result - - # --------------------------------------------------------------------------- # Tool definitions # --------------------------------------------------------------------------- diff --git a/luminos_lib/cache.py b/luminos_lib/cache.py new file mode 100644 index 0000000..e26dd64 --- /dev/null +++ b/luminos_lib/cache.py @@ -0,0 +1,183 @@ +"""Cache management for Luminos investigations.""" + +import hashlib +import json +import os +import uuid +from datetime import datetime, timezone + +CACHE_ROOT = "/tmp/luminos" +INVESTIGATIONS_PATH = os.path.join(CACHE_ROOT, "investigations.json") + + +def _sha256_path(path): + """Return a hex SHA-256 of a path string, used as cache key.""" + return hashlib.sha256(path.encode("utf-8")).hexdigest() + + +def _now_iso(): + return datetime.now(timezone.utc).isoformat() + + +# --------------------------------------------------------------------------- +# Investigation ID persistence +# --------------------------------------------------------------------------- + +def _load_investigations(): + try: + with open(INVESTIGATIONS_PATH) as f: + return json.load(f) + except (OSError, json.JSONDecodeError): + return {} + + +def _save_investigations(data): + os.makedirs(CACHE_ROOT, exist_ok=True) + with open(INVESTIGATIONS_PATH, "w") as f: + json.dump(data, f, indent=2) + + +def _get_investigation_id(target, fresh=False): + target_real = os.path.realpath(target) + investigations = _load_investigations() + if not fresh and target_real in investigations: + inv_id = investigations[target_real] + cache_dir = os.path.join(CACHE_ROOT, inv_id) + if os.path.isdir(cache_dir): + return inv_id, False + inv_id = str(uuid.uuid4()) + investigations[target_real] = inv_id + _save_investigations(investigations) + return inv_id, True + + +# --------------------------------------------------------------------------- +# Cache manager +# --------------------------------------------------------------------------- + +class _CacheManager: + """Manages the /tmp/luminos/{investigation_id}/ cache tree.""" + + def __init__(self, investigation_id, target): + self.investigation_id = investigation_id + self.target = os.path.realpath(target) + self.root = os.path.join(CACHE_ROOT, investigation_id) + self.files_dir = os.path.join(self.root, "files") + self.dirs_dir = os.path.join(self.root, "dirs") + self.log_path = os.path.join(self.root, "investigation.log") + self.meta_path = os.path.join(self.root, "meta.json") + os.makedirs(self.files_dir, exist_ok=True) + os.makedirs(self.dirs_dir, exist_ok=True) + + def write_meta(self, model, start_time): + data = { + "investigation_id": self.investigation_id, + "target": self.target, + "start_time": start_time, + "model": model, + "directories_investigated": 0, + "total_turns": 0, + } + with open(self.meta_path, "w") as f: + json.dump(data, f, indent=2) + + def update_meta(self, **kwargs): + try: + with open(self.meta_path) as f: + data = json.load(f) + except (OSError, json.JSONDecodeError): + data = {} + data.update(kwargs) + with open(self.meta_path, "w") as f: + json.dump(data, f, indent=2) + + def log_turn(self, directory, turn, tool_name, tool_args, result_len): + entry = { + "directory": directory, + "turn": turn, + "timestamp": _now_iso(), + "tool": tool_name, + "args": tool_args, + "result_length": result_len, + } + with open(self.log_path, "a") as f: + f.write(json.dumps(entry) + "\n") + + def _cache_path(self, cache_type, path): + subdir = self.files_dir if cache_type == "file" else self.dirs_dir + return os.path.join(subdir, _sha256_path(path) + ".json") + + def _cache_safe(self, cache_file): + real = os.path.realpath(cache_file) + root_real = os.path.realpath(self.root) + return real.startswith(root_real + os.sep) + + def write_entry(self, cache_type, path, data): + cache_file = self._cache_path(cache_type, path) + if not self._cache_safe(cache_file): + return "Error: cache path escapes cache root." + required = {"path", "summary", "cached_at"} + if cache_type == "file": + required |= {"relative_path", "size_bytes", "category"} + elif cache_type == "dir": + required |= {"relative_path", "child_count", "dominant_category"} + missing = required - set(data.keys()) + if missing: + return f"Error: missing required fields: {', '.join(sorted(missing))}" + if "content" in data or "contents" in data or "raw" in data: + return "Error: cache entries must not contain raw file contents." + try: + with open(cache_file, "w") as f: + json.dump(data, f, indent=2) + return "ok" + except OSError as e: + return f"Error writing cache: {e}" + + def read_entry(self, cache_type, path): + cache_file = self._cache_path(cache_type, path) + if not self._cache_safe(cache_file): + return None + try: + with open(cache_file) as f: + return json.load(f) + except (OSError, json.JSONDecodeError): + return None + + def has_entry(self, cache_type, path): + cache_file = self._cache_path(cache_type, path) + return os.path.exists(cache_file) + + def list_entries(self, cache_type): + subdir = self.files_dir if cache_type == "file" else self.dirs_dir + result = [] + try: + for name in sorted(os.listdir(subdir)): + if not name.endswith(".json"): + continue + fpath = os.path.join(subdir, name) + try: + with open(fpath) as f: + data = json.load(f) + result.append(data.get("relative_path", data.get("path", name))) + except (OSError, json.JSONDecodeError): + continue + except OSError: + pass + return result + + def read_all_entries(self, cache_type): + subdir = self.files_dir if cache_type == "file" else self.dirs_dir + result = [] + try: + for name in sorted(os.listdir(subdir)): + if not name.endswith(".json"): + continue + fpath = os.path.join(subdir, name) + try: + with open(fpath) as f: + result.append(json.load(f)) + except (OSError, json.JSONDecodeError): + continue + except OSError: + pass + return result