refactor(ai): extract _run_dir_loop into three focused helpers (#57) #67

Merged
claude-code merged 1 commit from refactor/issue-57-dir-loop-helpers into main 2026-04-11 10:02:46 -06:00

View file

@ -15,6 +15,7 @@ import json
import os import os
import subprocess import subprocess
import sys import sys
from collections import namedtuple
from datetime import datetime, timezone from datetime import datetime, timezone
import anthropic import anthropic
@ -846,9 +847,18 @@ def _filter_dir_tools(survey):
return [t for t in _DIR_TOOLS if t["name"] not in skip] return [t for t in _DIR_TOOLS if t["name"] not in skip]
def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14, _DirLoopContext = namedtuple(
verbose=False, survey=None): "_DirLoopContext", ["dir_rel", "system", "dir_tools", "messages"],
"""Run an isolated agent loop for a single directory.""" )
def _build_dir_loop_context(dir_path, target, cache, survey, max_turns):
"""Assemble the static inputs the dir loop needs before its first turn.
Pure data assembly: reads the cache for child summaries, builds the
formatted system prompt, filters the tool list, and returns the seed
user message. No writes.
"""
dir_rel = os.path.relpath(dir_path, target) dir_rel = os.path.relpath(dir_path, target)
if dir_rel == ".": if dir_rel == ".":
dir_rel = os.path.basename(target) dir_rel = os.path.basename(target)
@ -878,131 +888,171 @@ def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14,
}, },
] ]
return _DirLoopContext(
dir_rel=dir_rel, system=system, dir_tools=dir_tools, messages=messages,
)
def _flush_partial_dir_entry(dir_path, target, cache):
"""Write a partial dir cache entry from any already-cached file entries.
Called when the per-loop context budget is exceeded before the agent
reaches submit_report. Idempotent: returns "" without writing if a dir
entry already exists. Returns the partial summary string (empty if no
file entries were available to synthesize from).
"""
if cache.has_entry("dir", dir_path):
return ""
dir_real = os.path.realpath(dir_path)
file_entries = [
e for e in cache.read_all_entries("file")
if os.path.realpath(e.get("path", "")).startswith(dir_real + os.sep)
or os.path.dirname(
os.path.join(target, e.get("relative_path", ""))
) == dir_real
]
if file_entries:
file_summaries = [
e["summary"] for e in file_entries if e.get("summary")
]
notable = [
e.get("relative_path", e.get("path", ""))
for e in file_entries if e.get("notable")
]
partial_summary = " ".join(file_summaries)
cache.write_entry("dir", dir_path, {
"path": dir_path,
"relative_path": os.path.relpath(dir_path, target),
"child_count": len([
n for n in os.listdir(dir_path)
if not n.startswith(".")
]) if os.path.isdir(dir_path) else 0,
"summary": partial_summary,
"dominant_category": "unknown",
"notable_files": notable,
"partial": True,
"partial_reason": "context budget reached",
"cached_at": _now_iso(),
})
return partial_summary
cache.write_entry("dir", dir_path, {
"path": dir_path,
"relative_path": os.path.relpath(dir_path, target),
"child_count": 0,
"summary": ("Investigation incomplete — context budget "
"reached before any files were processed."),
"dominant_category": "unknown",
"notable_files": [],
"partial": True,
"partial_reason": (
"context budget reached before files processed"),
"cached_at": _now_iso(),
})
return ""
def _handle_turn_response(content_blocks, messages, target, cache, dir_rel,
turn, verbose):
"""Process one turn's response: print, append, dispatch tools.
Mutates `messages` in place: appends the assistant message, then either
a "please call submit_report" nudge (no tool_uses) or the tool_results
user message. Recognizes submit_report as the loop's done signal and
extracts its summary. Returns (done, summary).
"""
for b in content_blocks:
if b.type == "text" and b.text.strip():
for line in b.text.strip().split("\n"):
print(f" [AI] {line}", file=sys.stderr)
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items() if k != "data"
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_report with your summary.",
})
return False, None
tool_results = []
done = False
summary = None
for tu in tool_uses:
if tu.name == "submit_report":
summary = tu.input.get("summary", "")
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Summary submitted.",
})
done = True
else:
result_text = _execute_tool(
tu.name, tu.input, target, cache, dir_rel,
turn + 1, verbose=verbose,
)
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": result_text,
})
messages.append({"role": "user", "content": tool_results})
return done, summary
def _run_dir_loop(client, target, cache, tracker, dir_path, max_turns=14,
verbose=False, survey=None):
"""Run an isolated agent loop for a single directory."""
ctx = _build_dir_loop_context(
dir_path, target, cache, survey, max_turns,
)
tracker.reset_loop() tracker.reset_loop()
summary = None summary = None
for turn in range(max_turns): for turn in range(max_turns):
# Check context budget
if tracker.budget_exceeded(): if tracker.budget_exceeded():
print(f" [AI] Context budget reached — exiting early " print(f" [AI] Context budget reached — exiting early "
f"(context size {tracker.last_input:,} > " f"(context size {tracker.last_input:,} > "
f"{CONTEXT_BUDGET:,} budget; " f"{CONTEXT_BUDGET:,} budget; "
f"loop spend {tracker.loop_total:,} tokens)", f"loop spend {tracker.loop_total:,} tokens)",
file=sys.stderr) file=sys.stderr)
# Flush a partial directory summary from cached file entries partial = _flush_partial_dir_entry(dir_path, target, cache)
if not cache.has_entry("dir", dir_path): if partial and not summary:
dir_real = os.path.realpath(dir_path) summary = partial
file_entries = [
e for e in cache.read_all_entries("file")
if os.path.realpath(e.get("path", "")).startswith(
dir_real + os.sep)
or os.path.dirname(
os.path.join(target, e.get("relative_path", ""))
) == dir_real
]
if file_entries:
file_summaries = [
e["summary"] for e in file_entries if e.get("summary")
]
notable = [
e.get("relative_path", e.get("path", ""))
for e in file_entries if e.get("notable")
]
partial_summary = " ".join(file_summaries)
cache.write_entry("dir", dir_path, {
"path": dir_path,
"relative_path": os.path.relpath(dir_path, target),
"child_count": len([
n for n in os.listdir(dir_path)
if not n.startswith(".")
]) if os.path.isdir(dir_path) else 0,
"summary": partial_summary,
"dominant_category": "unknown",
"notable_files": notable,
"partial": True,
"partial_reason": "context budget reached",
"cached_at": _now_iso(),
})
if not summary:
summary = partial_summary
else:
cache.write_entry("dir", dir_path, {
"path": dir_path,
"relative_path": os.path.relpath(dir_path, target),
"child_count": 0,
"summary": ("Investigation incomplete — context budget "
"reached before any files were processed."),
"dominant_category": "unknown",
"notable_files": [],
"partial": True,
"partial_reason": (
"context budget reached before files processed"),
"cached_at": _now_iso(),
})
break break
try: try:
content_blocks, usage = _call_api_streaming( content_blocks, _usage = _call_api_streaming(
client, system, messages, dir_tools, tracker, client, ctx.system, ctx.messages, ctx.dir_tools, tracker,
) )
except anthropic.APIError as e: except anthropic.APIError as e:
print(f" [AI] API error: {e}", file=sys.stderr) print(f" [AI] API error: {e}", file=sys.stderr)
break break
# Print text blocks (step numbering, reasoning) to stderr done, turn_summary = _handle_turn_response(
for b in content_blocks: content_blocks, ctx.messages, target, cache,
if b.type == "text" and b.text.strip(): ctx.dir_rel, turn, verbose,
for line in b.text.strip().split("\n"): )
print(f" [AI] {line}", file=sys.stderr) if turn_summary is not None:
summary = turn_summary
# Print tool decisions now that we have the full response
tool_uses = [b for b in content_blocks if b.type == "tool_use"]
for tu in tool_uses:
arg_summary = ", ".join(
f"{k}={v!r}" for k, v in tu.input.items() if k != "data"
) if tu.input else ""
print(f" [AI] -> {tu.name}({arg_summary})", file=sys.stderr)
messages.append({
"role": "assistant",
"content": [_block_to_dict(b) for b in content_blocks],
})
if not tool_uses:
messages.append({
"role": "user",
"content": "Please call submit_report with your summary.",
})
continue
tool_results = []
done = False
for tu in tool_uses:
if tu.name == "submit_report":
summary = tu.input.get("summary", "")
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": "Summary submitted.",
})
done = True
else:
result_text = _execute_tool(
tu.name, tu.input, target, cache, dir_rel,
turn + 1, verbose=verbose,
)
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": result_text,
})
messages.append({"role": "user", "content": tool_results})
if done: if done:
break break
else: else:
print(f" [AI] Warning: max turns reached for {dir_rel}", print(f" [AI] Warning: max turns reached for {ctx.dir_rel}",
file=sys.stderr) file=sys.stderr)
return summary return summary