refactor(ai): extract _run_dir_loop into three focused helpers (#57)

_run_dir_loop was ~160 lines holding four conceptual layers in one
function: pre-loop setup, budget check + partial-flush, API call +
response printing, and tool dispatch + done detection. Phase 3 dynamic
turn allocation will inject more state into the same code path, so
this debt is paid before that lands.

Three new helpers above _run_dir_loop:

- _build_dir_loop_context(): pure setup. Builds the dir context, child
  summaries, survey block, filtered tool list, system prompt, and seed
  user message. Returns a _DirLoopContext namedtuple.
- _flush_partial_dir_entry(): idempotent partial-cache writer for the
  budget-exceeded path. Returns the partial summary string. Idempotent
  via cache.has_entry() guard, so callers can call it without checking.
- _handle_turn_response(): per-turn response processing. Prints text
  blocks and tool decisions, appends the assistant message, dispatches
  tools (or nudges the agent to call submit_report), appends
  tool_results. Returns (done, summary).

_run_dir_loop is now a ~25-line coordinator: build context, then
for-loop calls budget check, API, and turn handler in sequence.

No behavior change. 164 tests pass. Internals.md §4 updated for the
new structure and the file:line refs that drifted.
This commit is contained in:
Jeff Smith 2026-04-11 10:02:21 -06:00
parent 68f327243c
commit 427f66b488

View file

@ -15,6 +15,7 @@ import json
import os import os
import subprocess import subprocess
import sys import sys
from collections import namedtuple
from datetime import datetime, timezone from datetime import datetime, timezone
import anthropic import anthropic
@ -846,9 +847,18 @@ def _filter_dir_tools(survey):
return [t for t in _DIR_TOOLS if t["name"] not in skip] return [t for t in _DIR_TOOLS if t["name"] not in skip]
def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14, _DirLoopContext = namedtuple(
verbose=False, survey=None): "_DirLoopContext", ["dir_rel", "system", "dir_tools", "messages"],
"""Run an isolated agent loop for a single directory.""" )
def _build_dir_loop_context(dir_path, target, cache, survey, max_turns):
"""Assemble the static inputs the dir loop needs before its first turn.
Pure data assembly: reads the cache for child summaries, builds the
formatted system prompt, filters the tool list, and returns the seed
user message. No writes.
"""
dir_rel = os.path.relpath(dir_path, target) dir_rel = os.path.relpath(dir_path, target)
if dir_rel == ".": if dir_rel == ".":
dir_rel = os.path.basename(target) dir_rel = os.path.basename(target)
@ -878,28 +888,31 @@ def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14,
}, },
] ]
tracker.reset_loop() return _DirLoopContext(
summary = None dir_rel=dir_rel, system=system, dir_tools=dir_tools, messages=messages,
)
def _flush_partial_dir_entry(dir_path, target, cache):
"""Write a partial dir cache entry from any already-cached file entries.
Called when the per-loop context budget is exceeded before the agent
reaches submit_report. Idempotent: returns "" without writing if a dir
entry already exists. Returns the partial summary string (empty if no
file entries were available to synthesize from).
"""
if cache.has_entry("dir", dir_path):
return ""
for turn in range(max_turns):
# Check context budget
if tracker.budget_exceeded():
print(f" [AI] Context budget reached — exiting early "
f"(context size {tracker.last_input:,} > "
f"{CONTEXT_BUDGET:,} budget; "
f"loop spend {tracker.loop_total:,} tokens)",
file=sys.stderr)
# Flush a partial directory summary from cached file entries
if not cache.has_entry("dir", dir_path):
dir_real = os.path.realpath(dir_path) dir_real = os.path.realpath(dir_path)
file_entries = [ file_entries = [
e for e in cache.read_all_entries("file") e for e in cache.read_all_entries("file")
if os.path.realpath(e.get("path", "")).startswith( if os.path.realpath(e.get("path", "")).startswith(dir_real + os.sep)
dir_real + os.sep)
or os.path.dirname( or os.path.dirname(
os.path.join(target, e.get("relative_path", "")) os.path.join(target, e.get("relative_path", ""))
) == dir_real ) == dir_real
] ]
if file_entries: if file_entries:
file_summaries = [ file_summaries = [
e["summary"] for e in file_entries if e.get("summary") e["summary"] for e in file_entries if e.get("summary")
@ -923,9 +936,8 @@ def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14,
"partial_reason": "context budget reached", "partial_reason": "context budget reached",
"cached_at": _now_iso(), "cached_at": _now_iso(),
}) })
if not summary: return partial_summary
summary = partial_summary
else:
cache.write_entry("dir", dir_path, { cache.write_entry("dir", dir_path, {
"path": dir_path, "path": dir_path,
"relative_path": os.path.relpath(dir_path, target), "relative_path": os.path.relpath(dir_path, target),
@ -939,23 +951,23 @@ def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14,
"context budget reached before files processed"), "context budget reached before files processed"),
"cached_at": _now_iso(), "cached_at": _now_iso(),
}) })
break return ""
try:
content_blocks, usage = _call_api_streaming(
client, system, messages, dir_tools, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
break
# Print text blocks (step numbering, reasoning) to stderr def _handle_turn_response(content_blocks, messages, target, cache, dir_rel,
turn, verbose):
"""Process one turn's response: print, append, dispatch tools.
Mutates `messages` in place: appends the assistant message, then either
a "please call submit_report" nudge (no tool_uses) or the tool_results
user message. Recognizes submit_report as the loop's done signal and
extracts its summary. Returns (done, summary).
"""
for b in content_blocks: for b in content_blocks:
if b.type == "text" and b.text.strip(): if b.type == "text" and b.text.strip():
for line in b.text.strip().split("\n"): for line in b.text.strip().split("\n"):
print(f" [AI] {line}", file=sys.stderr) print(f" [AI] {line}", file=sys.stderr)
# Print tool decisions now that we have the full response
tool_uses = [b for b in content_blocks if b.type == "tool_use"] tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses: for tu in tool_uses:
arg_summary = ", ".join( arg_summary = ", ".join(
@ -973,10 +985,11 @@ def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14,
"role": "user", "role": "user",
"content": "Please call submit_report with your summary.", "content": "Please call submit_report with your summary.",
}) })
continue return False, None
tool_results = [] tool_results = []
done = False done = False
summary = None
for tu in tool_uses: for tu in tool_uses:
if tu.name == "submit_report": if tu.name == "submit_report":
summary = tu.input.get("summary", "") summary = tu.input.get("summary", "")
@ -998,11 +1011,48 @@ def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14,
}) })
messages.append({"role": "user", "content": tool_results}) messages.append({"role": "user", "content": tool_results})
return done, summary
def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14,
verbose=False, survey=None):
"""Run an isolated agent loop for a single directory."""
ctx = _build_dir_loop_context(
dir_path, target, cache, survey, max_turns,
)
tracker.reset_loop()
summary = None
for turn in range(max_turns):
if tracker.budget_exceeded():
print(f" [AI] Context budget reached — exiting early "
f"(context size {tracker.last_input:,} > "
f"{CONTEXT_BUDGET:,} budget; "
f"loop spend {tracker.loop_total:,} tokens)",
file=sys.stderr)
partial = _flush_partial_dir_entry(dir_path, target, cache)
if partial and not summary:
summary = partial
break
try:
content_blocks, _usage = _call_api_streaming(
client, ctx.system, ctx.messages, ctx.dir_tools, tracker,
)
except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr)
break
done, turn_summary = _handle_turn_response(
content_blocks, ctx.messages, target, cache,
ctx.dir_rel, turn, verbose,
)
if turn_summary is not None:
summary = turn_summary
if done: if done:
break break
else: else:
print(f" [AI] Warning: max turns reached for {dir_rel}", print(f" [AI] Warning: max turns reached for {ctx.dir_rel}",
file=sys.stderr) file=sys.stderr)
return summary return summary