From f3abbce7d4565eb9d3279c357078165b68de81aa Mon Sep 17 00:00:00 2001 From: Jeff Smith Date: Mon, 6 Apr 2026 22:36:14 -0600 Subject: [PATCH] feat(filetypes): expose raw signals to survey, remove classifier bias (#42) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The survey pass no longer receives the bucketed file_categories histogram, which was biased toward source-code targets and would mislabel mail, notebooks, ledgers, and other non-code domains as "source" via the file --brief "text" pattern fallback. Adds filetypes.survey_signals(), which assembles raw signals from the same `classified` data the bucketer already processes — no new walks, no new dependencies: total_files — total count extension_histogram — top 20 extensions, raw, no taxonomy file_descriptions — top 20 `file --brief` outputs, by count filename_samples — 20 names, evenly drawn (not first-20) `survey --brief` descriptions are truncated at 80 chars before counting so prefixes group correctly without exploding key cardinality. The Band-Aid in _SURVEY_SYSTEM_PROMPT (warning the LLM that the histogram was biased toward source code) is removed and replaced with neutral guidance on how to read the raw signals together. The {file_type_distribution} placeholder is renamed to {survey_signals} to reflect the broader content. luminos.py base scan computes survey_signals once and stores it on report["survey_signals"]; AI consumers read from there. summarize_categories() and report["file_categories"] are unchanged — the terminal report still uses the bucketed view (#49 tracks fixing that follow-up). Smoke tested on two targets: - luminos_lib: identical-quality survey ("Python library package", confidence 0.85), unchanged behavior on code targets. - A synthetic Maildir of 8 messages with `:2,S` flag suffixes: survey now correctly identifies it as "A Maildir-format mailbox containing 8 email messages" with confidence 0.90, names the Maildir naming convention in domain_notes, and correctly marks parse_structure as a skip tool. Before #42 this would have been "8 source files." Adds 8 unit tests for survey_signals covering empty input, extension histogram, description aggregation/truncation, top-N cap, and even-stride filename sampling. #48 tracks the unit-of-analysis limitation (file is the wrong unit for mbox, SQLite, archives, notebooks) — explicitly out of scope for #42 and documented in survey_signals' docstring. --- luminos.py | 7 ++++- luminos_lib/ai.py | 43 +++++++++++++++++++------- luminos_lib/filetypes.py | 61 +++++++++++++++++++++++++++++++++++++ luminos_lib/prompts.py | 25 +++++++++------- tests/test_filetypes.py | 65 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 180 insertions(+), 21 deletions(-) diff --git a/luminos.py b/luminos.py index e1dc110..497519e 100644 --- a/luminos.py +++ b/luminos.py @@ -8,7 +8,11 @@ import shutil import sys from luminos_lib.tree import build_tree, render_tree -from luminos_lib.filetypes import classify_files, summarize_categories +from luminos_lib.filetypes import ( + classify_files, + summarize_categories, + survey_signals, +) from luminos_lib.code import detect_languages, find_large_files from luminos_lib.recency import find_recent_files from luminos_lib.disk import get_disk_usage, top_directories @@ -56,6 +60,7 @@ def scan(target, depth=3, show_hidden=False, exclude=None): finish() report["file_categories"] = summarize_categories(classified) report["classified_files"] = classified + report["survey_signals"] = survey_signals(classified) on_file, finish = _progress("Counting lines") languages, loc = detect_languages(classified, on_file=on_file) diff --git a/luminos_lib/ai.py b/luminos_lib/ai.py index 4115957..a240ec1 100644 --- a/luminos_lib/ai.py +++ b/luminos_lib/ai.py @@ -996,21 +996,44 @@ def _block_to_dict(block): # Synthesis pass # --------------------------------------------------------------------------- +def _format_survey_signals(signals): + """Render the survey_signals dict as a labeled text block.""" + if not signals or not signals.get("total_files"): + return "(no files classified)" + + lines = [f"Total files: {signals.get('total_files', 0)}", ""] + + ext_hist = signals.get("extension_histogram") or {} + if ext_hist: + lines.append("Extensions (top, by count):") + for ext, n in ext_hist.items(): + lines.append(f" {ext}: {n}") + lines.append("") + + descs = signals.get("file_descriptions") or {} + if descs: + lines.append("file --brief output (top, by count):") + for desc, n in descs.items(): + lines.append(f" {desc}: {n}") + lines.append("") + + samples = signals.get("filename_samples") or [] + if samples: + lines.append("Filename samples (evenly drawn):") + for name in samples: + lines.append(f" {name}") + + return "\n".join(lines).rstrip() + + def _run_survey(client, target, report, tracker, max_turns=3, verbose=False): """Run the reconnaissance survey pass. Returns a survey dict on success, or None on failure / out-of-turns. Survey is advisory — callers must treat None as "no survey context". """ - categories = report.get("file_categories", {}) or {} - if categories: - ftd_lines = [ - f" {cat}: {n}" - for cat, n in sorted(categories.items(), key=lambda kv: -kv[1]) - ] - file_type_distribution = "\n".join(ftd_lines) - else: - file_type_distribution = " (no files classified)" + signals = report.get("survey_signals") or {} + survey_signals_text = _format_survey_signals(signals) try: tree_node = build_tree(target, max_depth=2) @@ -1023,7 +1046,7 @@ def _run_survey(client, target, report, tracker, max_turns=3, verbose=False): system = _SURVEY_SYSTEM_PROMPT.format( target=target, - file_type_distribution=file_type_distribution, + survey_signals=survey_signals_text, tree_preview=tree_preview, available_tools=available_tools, ) diff --git a/luminos_lib/filetypes.py b/luminos_lib/filetypes.py index 0bed193..5fdf76a 100644 --- a/luminos_lib/filetypes.py +++ b/luminos_lib/filetypes.py @@ -128,3 +128,64 @@ def summarize_categories(classified): cat = f["category"] summary[cat] = summary.get(cat, 0) + 1 return summary + + +_SURVEY_TOP_N = 20 +_SURVEY_DESC_TRUNCATE = 80 + + +def survey_signals(classified, max_samples=20): + """Return raw, unbucketed signals for the AI survey pass. + + Unlike `summarize_categories`, which collapses files into a small + biased taxonomy, this exposes the primary signals so the survey + LLM can characterize the target without being misled by the + classifier's source-code bias. + + See #42 for the rationale and #48 for the unit-of-analysis + limitation: the unit here is still "file" — containers like mbox, + SQLite, and zip will under-count, while dense file collections like + Maildir will over-count. + + Returns a dict with: + total_files — total count + extension_histogram — {ext: count}, top _SURVEY_TOP_N by count + file_descriptions — {description: count}, top _SURVEY_TOP_N by count + filename_samples — up to max_samples filenames, evenly drawn + """ + total = len(classified) + + ext_counts = {} + desc_counts = {} + for f in classified: + ext = os.path.splitext(f.get("name", ""))[1].lower() or "(none)" + ext_counts[ext] = ext_counts.get(ext, 0) + 1 + + desc = (f.get("description") or "").strip() + if desc: + if len(desc) > _SURVEY_DESC_TRUNCATE: + desc = desc[:_SURVEY_DESC_TRUNCATE] + "..." + desc_counts[desc] = desc_counts.get(desc, 0) + 1 + + def _top(d): + items = sorted(d.items(), key=lambda kv: (-kv[1], kv[0])) + return dict(items[:_SURVEY_TOP_N]) + + if total > 0 and max_samples > 0: + if total <= max_samples: + samples = [f.get("name", "") for f in classified] + else: + stride = total / max_samples + samples = [ + classified[int(i * stride)].get("name", "") + for i in range(max_samples) + ] + else: + samples = [] + + return { + "total_files": total, + "extension_histogram": _top(ext_counts), + "file_descriptions": _top(desc_counts), + "filename_samples": samples, + } diff --git a/luminos_lib/prompts.py b/luminos_lib/prompts.py index eb49100..6cba386 100644 --- a/luminos_lib/prompts.py +++ b/luminos_lib/prompts.py @@ -131,17 +131,22 @@ Answer three questions about the target: {target} ## Inputs You have exactly two signals. Do not ask for more. -File type distribution (counts by category): -{file_type_distribution} +File-level signals (raw, unbucketed): +{survey_signals} -IMPORTANT: the file type distribution is produced by a classifier -that is biased toward source code. Its categories are: source, -config, data, document, media, archive, unknown. It has NO concept -of mail, notebooks, calendars, contacts, ledgers, photo libraries, -or other personal-data domains — anything text-shaped tends to be -labeled `source` even when it is not code. If the tree preview -suggests a non-code target, trust the tree over the histogram and -say so in `domain_notes`. +These signals are intentionally raw. The extension histogram and +the `file --brief` descriptions reflect what is actually on disk, +without any taxonomy collapsing distinct content into one bucket. +Use them together: an extension alone can mislead (`.txt` could be +notes, logs, or message bodies); the `file` command output and +filename samples disambiguate. + +Note on units: each signal counts filesystem files. Some targets +have a different natural unit — a Maildir is one logical mailbox +with thousands of message files; an mbox is one file containing +many messages; an archive is one file containing many entries. If +the signals point at a container shape, name it in `description` +and `domain_notes` even though the count is in files. Top-level tree (2 levels deep): {tree_preview} diff --git a/tests/test_filetypes.py b/tests/test_filetypes.py index 40d34e1..490b567 100644 --- a/tests/test_filetypes.py +++ b/tests/test_filetypes.py @@ -10,6 +10,7 @@ from luminos_lib.filetypes import ( _classify_one, classify_files, summarize_categories, + survey_signals, ) @@ -140,5 +141,69 @@ class TestClassifyFiles(unittest.TestCase): self.assertGreater(item["size"], 0) +class TestSurveySignals(unittest.TestCase): + def _f(self, name, description="", category="source"): + return {"name": name, "path": f"/x/{name}", "category": category, + "size": 10, "description": description} + + def test_empty_input(self): + s = survey_signals([]) + self.assertEqual(s["total_files"], 0) + self.assertEqual(s["extension_histogram"], {}) + self.assertEqual(s["file_descriptions"], {}) + self.assertEqual(s["filename_samples"], []) + + def test_extension_histogram_uses_lowercase_and_keeps_none(self): + files = [ + self._f("a.PY"), self._f("b.py"), self._f("c.py"), + self._f("README"), self._f("Makefile"), + ] + s = survey_signals(files) + self.assertEqual(s["extension_histogram"][".py"], 3) + self.assertEqual(s["extension_histogram"]["(none)"], 2) + + def test_file_descriptions_aggregated_and_truncated(self): + long_desc = "x" * 200 + files = [ + self._f("a.py", "Python script, ASCII text"), + self._f("b.py", "Python script, ASCII text"), + self._f("c.bin", long_desc), + ] + s = survey_signals(files) + self.assertEqual(s["file_descriptions"]["Python script, ASCII text"], 2) + # The long description was truncated and still counted once + truncated_keys = [k for k in s["file_descriptions"] if k.startswith("xxx") and k.endswith("...")] + self.assertEqual(len(truncated_keys), 1) + self.assertLessEqual(len(truncated_keys[0]), 84) # 80 + "..." + + def test_descriptions_skipped_when_empty(self): + files = [self._f("a.py", ""), self._f("b.py", None)] + s = survey_signals(files) + self.assertEqual(s["file_descriptions"], {}) + + def test_top_n_caps_at_20(self): + files = [self._f(f"f{i}.ext{i}") for i in range(50)] + s = survey_signals(files) + self.assertEqual(len(s["extension_histogram"]), 20) + + def test_filename_samples_evenly_drawn(self): + files = [self._f(f"file_{i:04d}.txt") for i in range(100)] + s = survey_signals(files, max_samples=10) + self.assertEqual(len(s["filename_samples"]), 10) + # First sample is the first file (stride 10, index 0) + self.assertEqual(s["filename_samples"][0], "file_0000.txt") + # Last sample is around index 90, not 99 + self.assertTrue(s["filename_samples"][-1].startswith("file_009")) + + def test_filename_samples_returns_all_when_under_cap(self): + files = [self._f(f"f{i}.txt") for i in range(5)] + s = survey_signals(files, max_samples=20) + self.assertEqual(len(s["filename_samples"]), 5) + + def test_total_files_matches_input(self): + files = [self._f(f"f{i}.py") for i in range(7)] + self.assertEqual(survey_signals(files)["total_files"], 7) + + if __name__ == "__main__": unittest.main()