luminos/luminos_lib/cache.py
Jeff Smith bbd04f41a7 refactor: extract cache management into luminos_lib/cache.py
Moves investigation ID persistence and _CacheManager class from ai.py
into a dedicated cache module. No behavior changes.

Moved: _load_investigations, _save_investigations, _get_investigation_id,
_CacheManager (all methods), _sha256_path, CACHE_ROOT, INVESTIGATIONS_PATH.

Also added a local _now_iso() in cache.py to avoid a circular import
(ai.py imports from cache.py).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 13:12:37 -06:00

183 lines
6.3 KiB
Python

"""Cache management for Luminos investigations."""
import hashlib
import json
import os
import uuid
from datetime import datetime, timezone
CACHE_ROOT = "/tmp/luminos"
INVESTIGATIONS_PATH = os.path.join(CACHE_ROOT, "investigations.json")
def _sha256_path(path):
"""Return a hex SHA-256 of a path string, used as cache key."""
return hashlib.sha256(path.encode("utf-8")).hexdigest()
def _now_iso():
return datetime.now(timezone.utc).isoformat()
# ---------------------------------------------------------------------------
# Investigation ID persistence
# ---------------------------------------------------------------------------
def _load_investigations():
try:
with open(INVESTIGATIONS_PATH) as f:
return json.load(f)
except (OSError, json.JSONDecodeError):
return {}
def _save_investigations(data):
os.makedirs(CACHE_ROOT, exist_ok=True)
with open(INVESTIGATIONS_PATH, "w") as f:
json.dump(data, f, indent=2)
def _get_investigation_id(target, fresh=False):
target_real = os.path.realpath(target)
investigations = _load_investigations()
if not fresh and target_real in investigations:
inv_id = investigations[target_real]
cache_dir = os.path.join(CACHE_ROOT, inv_id)
if os.path.isdir(cache_dir):
return inv_id, False
inv_id = str(uuid.uuid4())
investigations[target_real] = inv_id
_save_investigations(investigations)
return inv_id, True
# ---------------------------------------------------------------------------
# Cache manager
# ---------------------------------------------------------------------------
class _CacheManager:
"""Manages the /tmp/luminos/{investigation_id}/ cache tree."""
def __init__(self, investigation_id, target):
self.investigation_id = investigation_id
self.target = os.path.realpath(target)
self.root = os.path.join(CACHE_ROOT, investigation_id)
self.files_dir = os.path.join(self.root, "files")
self.dirs_dir = os.path.join(self.root, "dirs")
self.log_path = os.path.join(self.root, "investigation.log")
self.meta_path = os.path.join(self.root, "meta.json")
os.makedirs(self.files_dir, exist_ok=True)
os.makedirs(self.dirs_dir, exist_ok=True)
def write_meta(self, model, start_time):
data = {
"investigation_id": self.investigation_id,
"target": self.target,
"start_time": start_time,
"model": model,
"directories_investigated": 0,
"total_turns": 0,
}
with open(self.meta_path, "w") as f:
json.dump(data, f, indent=2)
def update_meta(self, **kwargs):
try:
with open(self.meta_path) as f:
data = json.load(f)
except (OSError, json.JSONDecodeError):
data = {}
data.update(kwargs)
with open(self.meta_path, "w") as f:
json.dump(data, f, indent=2)
def log_turn(self, directory, turn, tool_name, tool_args, result_len):
entry = {
"directory": directory,
"turn": turn,
"timestamp": _now_iso(),
"tool": tool_name,
"args": tool_args,
"result_length": result_len,
}
with open(self.log_path, "a") as f:
f.write(json.dumps(entry) + "\n")
def _cache_path(self, cache_type, path):
subdir = self.files_dir if cache_type == "file" else self.dirs_dir
return os.path.join(subdir, _sha256_path(path) + ".json")
def _cache_safe(self, cache_file):
real = os.path.realpath(cache_file)
root_real = os.path.realpath(self.root)
return real.startswith(root_real + os.sep)
def write_entry(self, cache_type, path, data):
cache_file = self._cache_path(cache_type, path)
if not self._cache_safe(cache_file):
return "Error: cache path escapes cache root."
required = {"path", "summary", "cached_at"}
if cache_type == "file":
required |= {"relative_path", "size_bytes", "category"}
elif cache_type == "dir":
required |= {"relative_path", "child_count", "dominant_category"}
missing = required - set(data.keys())
if missing:
return f"Error: missing required fields: {', '.join(sorted(missing))}"
if "content" in data or "contents" in data or "raw" in data:
return "Error: cache entries must not contain raw file contents."
try:
with open(cache_file, "w") as f:
json.dump(data, f, indent=2)
return "ok"
except OSError as e:
return f"Error writing cache: {e}"
def read_entry(self, cache_type, path):
cache_file = self._cache_path(cache_type, path)
if not self._cache_safe(cache_file):
return None
try:
with open(cache_file) as f:
return json.load(f)
except (OSError, json.JSONDecodeError):
return None
def has_entry(self, cache_type, path):
cache_file = self._cache_path(cache_type, path)
return os.path.exists(cache_file)
def list_entries(self, cache_type):
subdir = self.files_dir if cache_type == "file" else self.dirs_dir
result = []
try:
for name in sorted(os.listdir(subdir)):
if not name.endswith(".json"):
continue
fpath = os.path.join(subdir, name)
try:
with open(fpath) as f:
data = json.load(f)
result.append(data.get("relative_path", data.get("path", name)))
except (OSError, json.JSONDecodeError):
continue
except OSError:
pass
return result
def read_all_entries(self, cache_type):
subdir = self.files_dir if cache_type == "file" else self.dirs_dir
result = []
try:
for name in sorted(os.listdir(subdir)):
if not name.endswith(".json"):
continue
fpath = os.path.join(subdir, name)
try:
with open(fpath) as f:
result.append(json.load(f))
except (OSError, json.JSONDecodeError):
continue
except OSError:
pass
return result