From 427f66b48848c37c5f495c817bc0114a0e08a213 Mon Sep 17 00:00:00 2001 From: Jeff Smith Date: Sat, 11 Apr 2026 10:02:21 -0600 Subject: [PATCH] refactor(ai): extract _run_dir_loop into three focused helpers (#57) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _run_dir_loop was ~160 lines holding four conceptual layers in one function: pre-loop setup, budget check + partial-flush, API call + response printing, and tool dispatch + done detection. Phase 3 dynamic turn allocation will inject more state into the same code path, so this debt is paid before that lands. Three new helpers above _run_dir_loop: - _build_dir_loop_context(): pure setup. Builds the dir context, child summaries, survey block, filtered tool list, system prompt, and seed user message. Returns a _DirLoopContext namedtuple. - _flush_partial_dir_entry(): idempotent partial-cache writer for the budget-exceeded path. Returns the partial summary string. Idempotent via cache.has_entry() guard, so callers can call it without checking. - _handle_turn_response(): per-turn response processing. Prints text blocks and tool decisions, appends the assistant message, dispatches tools (or nudges the agent to call submit_report), appends tool_results. Returns (done, summary). _run_dir_loop is now a ~25-line coordinator: build context, then for-loop calls budget check, API, and turn handler in sequence. No behavior change. 164 tests pass. Internals.md ยง4 updated for the new structure and the file:line refs that drifted. --- luminos_lib/ai.py | 264 +++++++++++++++++++++++++++------------------- 1 file changed, 157 insertions(+), 107 deletions(-) diff --git a/luminos_lib/ai.py b/luminos_lib/ai.py index 67cd8b9..1efcefe 100644 --- a/luminos_lib/ai.py +++ b/luminos_lib/ai.py @@ -15,6 +15,7 @@ import json import os import subprocess import sys +from collections import namedtuple from datetime import datetime, timezone import anthropic @@ -846,9 +847,18 @@ def _filter_dir_tools(survey): return [t for t in _DIR_TOOLS if t["name"] not in skip] -def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14, - verbose=False, survey=None): - """Run an isolated agent loop for a single directory.""" +_DirLoopContext = namedtuple( + "_DirLoopContext", ["dir_rel", "system", "dir_tools", "messages"], +) + + +def _build_dir_loop_context(dir_path, target, cache, survey, max_turns): + """Assemble the static inputs the dir loop needs before its first turn. + + Pure data assembly: reads the cache for child summaries, builds the + formatted system prompt, filters the tool list, and returns the seed + user message. No writes. + """ dir_rel = os.path.relpath(dir_path, target) if dir_rel == ".": dir_rel = os.path.basename(target) @@ -878,131 +888,171 @@ def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14, }, ] + return _DirLoopContext( + dir_rel=dir_rel, system=system, dir_tools=dir_tools, messages=messages, + ) + + +def _flush_partial_dir_entry(dir_path, target, cache): + """Write a partial dir cache entry from any already-cached file entries. + + Called when the per-loop context budget is exceeded before the agent + reaches submit_report. Idempotent: returns "" without writing if a dir + entry already exists. Returns the partial summary string (empty if no + file entries were available to synthesize from). + """ + if cache.has_entry("dir", dir_path): + return "" + + dir_real = os.path.realpath(dir_path) + file_entries = [ + e for e in cache.read_all_entries("file") + if os.path.realpath(e.get("path", "")).startswith(dir_real + os.sep) + or os.path.dirname( + os.path.join(target, e.get("relative_path", "")) + ) == dir_real + ] + + if file_entries: + file_summaries = [ + e["summary"] for e in file_entries if e.get("summary") + ] + notable = [ + e.get("relative_path", e.get("path", "")) + for e in file_entries if e.get("notable") + ] + partial_summary = " ".join(file_summaries) + cache.write_entry("dir", dir_path, { + "path": dir_path, + "relative_path": os.path.relpath(dir_path, target), + "child_count": len([ + n for n in os.listdir(dir_path) + if not n.startswith(".") + ]) if os.path.isdir(dir_path) else 0, + "summary": partial_summary, + "dominant_category": "unknown", + "notable_files": notable, + "partial": True, + "partial_reason": "context budget reached", + "cached_at": _now_iso(), + }) + return partial_summary + + cache.write_entry("dir", dir_path, { + "path": dir_path, + "relative_path": os.path.relpath(dir_path, target), + "child_count": 0, + "summary": ("Investigation incomplete โ€” context budget " + "reached before any files were processed."), + "dominant_category": "unknown", + "notable_files": [], + "partial": True, + "partial_reason": ( + "context budget reached before files processed"), + "cached_at": _now_iso(), + }) + return "" + + +def _handle_turn_response(content_blocks, messages, target, cache, dir_rel, + turn, verbose): + """Process one turn's response: print, append, dispatch tools. + + Mutates `messages` in place: appends the assistant message, then either + a "please call submit_report" nudge (no tool_uses) or the tool_results + user message. Recognizes submit_report as the loop's done signal and + extracts its summary. Returns (done, summary). + """ + for b in content_blocks: + if b.type == "text" and b.text.strip(): + for line in b.text.strip().split("\n"): + print(f" [AI] {line}", file=sys.stderr) + + tool_uses = [b for b in content_blocks if b.type == "tool_use"] + for tu in tool_uses: + arg_summary = ", ".join( + f"{k}={v!r}" for k, v in tu.input.items() if k != "data" + ) if tu.input else "" + print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr) + + messages.append({ + "role": "assistant", + "content": [_block_to_dict(b) for b in content_blocks], + }) + + if not tool_uses: + messages.append({ + "role": "user", + "content": "Please call submit_report with your summary.", + }) + return False, None + + tool_results = [] + done = False + summary = None + for tu in tool_uses: + if tu.name == "submit_report": + summary = tu.input.get("summary", "") + tool_results.append({ + "type": "tool_result", + "tool_use_id": tu.id, + "content": "Summary submitted.", + }) + done = True + else: + result_text = _execute_tool( + tu.name, tu.input, target, cache, dir_rel, + turn + 1, verbose=verbose, + ) + tool_results.append({ + "type": "tool_result", + "tool_use_id": tu.id, + "content": result_text, + }) + + messages.append({"role": "user", "content": tool_results}) + return done, summary + + +def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14, + verbose=False, survey=None): + """Run an isolated agent loop for a single directory.""" + ctx = _build_dir_loop_context( + dir_path, target, cache, survey, max_turns, + ) tracker.reset_loop() summary = None for turn in range(max_turns): - # Check context budget if tracker.budget_exceeded(): print(f" [AI] Context budget reached โ€” exiting early " f"(context size {tracker.last_input:,} > " f"{CONTEXT_BUDGET:,} budget; " f"loop spend {tracker.loop_total:,} tokens)", file=sys.stderr) - # Flush a partial directory summary from cached file entries - if not cache.has_entry("dir", dir_path): - dir_real = os.path.realpath(dir_path) - file_entries = [ - e for e in cache.read_all_entries("file") - if os.path.realpath(e.get("path", "")).startswith( - dir_real + os.sep) - or os.path.dirname( - os.path.join(target, e.get("relative_path", "")) - ) == dir_real - ] - if file_entries: - file_summaries = [ - e["summary"] for e in file_entries if e.get("summary") - ] - notable = [ - e.get("relative_path", e.get("path", "")) - for e in file_entries if e.get("notable") - ] - partial_summary = " ".join(file_summaries) - cache.write_entry("dir", dir_path, { - "path": dir_path, - "relative_path": os.path.relpath(dir_path, target), - "child_count": len([ - n for n in os.listdir(dir_path) - if not n.startswith(".") - ]) if os.path.isdir(dir_path) else 0, - "summary": partial_summary, - "dominant_category": "unknown", - "notable_files": notable, - "partial": True, - "partial_reason": "context budget reached", - "cached_at": _now_iso(), - }) - if not summary: - summary = partial_summary - else: - cache.write_entry("dir", dir_path, { - "path": dir_path, - "relative_path": os.path.relpath(dir_path, target), - "child_count": 0, - "summary": ("Investigation incomplete โ€” context budget " - "reached before any files were processed."), - "dominant_category": "unknown", - "notable_files": [], - "partial": True, - "partial_reason": ( - "context budget reached before files processed"), - "cached_at": _now_iso(), - }) + partial = _flush_partial_dir_entry(dir_path, target, cache) + if partial and not summary: + summary = partial break try: - content_blocks, usage = _call_api_streaming( - client, system, messages, dir_tools, tracker, + content_blocks, _usage = _call_api_streaming( + client, ctx.system, ctx.messages, ctx.dir_tools, tracker, ) except anthropic.APIError as e: print(f" [AI] API error: {e}", file=sys.stderr) break - # Print text blocks (step numbering, reasoning) to stderr - for b in content_blocks: - if b.type == "text" and b.text.strip(): - for line in b.text.strip().split("\n"): - print(f" [AI] {line}", file=sys.stderr) - - # Print tool decisions now that we have the full response - tool_uses = [b for b in content_blocks if b.type == "tool_use"] - for tu in tool_uses: - arg_summary = ", ".join( - f"{k}={v!r}" for k, v in tu.input.items() if k != "data" - ) if tu.input else "" - print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr) - - messages.append({ - "role": "assistant", - "content": [_block_to_dict(b) for b in content_blocks], - }) - - if not tool_uses: - messages.append({ - "role": "user", - "content": "Please call submit_report with your summary.", - }) - continue - - tool_results = [] - done = False - for tu in tool_uses: - if tu.name == "submit_report": - summary = tu.input.get("summary", "") - tool_results.append({ - "type": "tool_result", - "tool_use_id": tu.id, - "content": "Summary submitted.", - }) - done = True - else: - result_text = _execute_tool( - tu.name, tu.input, target, cache, dir_rel, - turn + 1, verbose=verbose, - ) - tool_results.append({ - "type": "tool_result", - "tool_use_id": tu.id, - "content": result_text, - }) - - messages.append({"role": "user", "content": tool_results}) - + done, turn_summary = _handle_turn_response( + content_blocks, ctx.messages, target, cache, + ctx.dir_rel, turn, verbose, + ) + if turn_summary is not None: + summary = turn_summary if done: break else: - print(f" [AI] Warning: max turns reached for {dir_rel}", + print(f" [AI] Warning: max turns reached for {ctx.dir_rel}", file=sys.stderr) return summary