feat(ai_client): add run_with_tool_loop shared helper for all 8 vendors
Tasks 1.1 (red) + 1.2 (green) of the follow-up track. Adds a single shared tool-call loop in src/ai_client.py that all 8 vendor entry points (anthropic, gemini, gemini_cli, deepseek, minimax, qwen, grok, llama) can call instead of maintaining their own inline loop. Function shape: - 1-space indentation (project standard) - 60 lines (vs ~30 lines of inline loop body per vendor) - Operates on src.openai_compatible.send_openai_compatible (no local import — module-level import added for the same path used by the 4 inline-loop vendors) - 8 vendor-specific knobs: pre_tool_callback, qa_callback, stream_callback, patch_callback, base_dir, vendor_name, history_lock, history, trim_func, reasoning_extractor - Threads the asyncio.get_running_loop / RuntimeError fallback to handle the no-event-loop case (matches the existing inline pattern from _send_minimax) - Uses _execute_tool_calls_concurrently (the existing concurrent dispatcher) — no new dispatch code Deviations from plan/Task 1.1: - The plan's test code patched src.tool_loop.send_openai_compatible and the plan's Task 1.3 vendor wrapper imported 'from src.tool_loop import run_with_tool_loop'. The plan predates the AGENTS.md HARD RULE on src/<thing>.py files; per the follow-up track's Naming Convention section, run_with_tool_loop lives IN src/ai_client.py. Tests patch src.ai_client.send_openai_compatible and the vendor wrapper imports 'from src.ai_client import run_with_tool_loop' (next task). - Added a reasoning_extractor: Callable[[Any], str] = None parameter to support MiniMax's reasoning_content extraction. Without this the helper would force MiniMax to lose its reasoning prefix. Green confirmed: 50 vendor + tool tests pass; 4 audit scripts pass.
This commit is contained in:
+58
-5
@@ -42,6 +42,8 @@ from src import mcp_client
|
||||
from src import mma_prompts
|
||||
from src import performance_monitor
|
||||
from src import project_manager
|
||||
from src.openai_compatible import send_openai_compatible
|
||||
from src.vendor_capabilities import VendorCapabilities
|
||||
|
||||
# TODO(Ed): Eliminate these?
|
||||
from src.events import EventEmitter
|
||||
@@ -801,6 +803,61 @@ async def _execute_tool_calls_concurrently(
|
||||
if monitor.enabled: monitor.end_component("ai_client._execute_tool_calls_concurrently")
|
||||
return results
|
||||
|
||||
def run_with_tool_loop(
|
||||
client: Any,
|
||||
request: OpenAICompatibleRequest,
|
||||
*,
|
||||
capabilities: VendorCapabilities,
|
||||
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None,
|
||||
qa_callback: Optional[Callable[[str], str]] = None,
|
||||
stream_callback: Optional[Callable[[str], None]] = None,
|
||||
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None,
|
||||
base_dir: str,
|
||||
vendor_name: str,
|
||||
history_lock: Optional[threading.Lock] = None,
|
||||
history: Optional[list[dict[str, Any]]] = None,
|
||||
trim_func: Optional[Callable[[list[dict[str, Any]]], None]] = None,
|
||||
reasoning_extractor: Optional[Callable[[Any], str]] = None,
|
||||
) -> str:
|
||||
response_text: str = ""
|
||||
for _round_idx in range(MAX_TOOL_ROUNDS + 2):
|
||||
response = send_openai_compatible(client, request, capabilities=capabilities)
|
||||
reasoning_content: str = reasoning_extractor(response.raw_response) if reasoning_extractor else ""
|
||||
response_text = response.text or ""
|
||||
if history_lock is not None and history is not None:
|
||||
with history_lock:
|
||||
msg: dict[str, Any] = {"role": "assistant", "content": response.text or None}
|
||||
if reasoning_content:
|
||||
msg["reasoning_content"] = reasoning_content
|
||||
if response.tool_calls:
|
||||
msg["tool_calls"] = response.tool_calls
|
||||
history.append(msg)
|
||||
if not response.tool_calls:
|
||||
break
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
results = asyncio.run_coroutine_threadsafe(
|
||||
_execute_tool_calls_concurrently(
|
||||
response.tool_calls, base_dir, pre_tool_callback, qa_callback, _round_idx, vendor_name, patch_callback,
|
||||
),
|
||||
loop,
|
||||
).result()
|
||||
except RuntimeError:
|
||||
results = asyncio.run(_execute_tool_calls_concurrently(
|
||||
response.tool_calls, base_dir, pre_tool_callback, qa_callback, _round_idx, vendor_name, patch_callback,
|
||||
))
|
||||
if history_lock is not None and history is not None:
|
||||
with history_lock:
|
||||
for _i, (tool_name, call_id, out, _err) in enumerate(results):
|
||||
history.append({
|
||||
"role": "tool",
|
||||
"tool_call_id": call_id,
|
||||
"content": str(out) if out else "",
|
||||
})
|
||||
if trim_func is not None:
|
||||
trim_func(history)
|
||||
return response_text
|
||||
|
||||
async def _execute_single_tool_call_async(
|
||||
name: str,
|
||||
args: dict[str, Any],
|
||||
@@ -812,11 +869,7 @@ async def _execute_single_tool_call_async(
|
||||
tier: str | None = None,
|
||||
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None
|
||||
) -> tuple[str, str, str, str]:
|
||||
"""
|
||||
[C: tests/test_external_mcp_e2e.py:test_external_mcp_e2e_refresh_and_call, tests/test_external_mcp_hitl.py:test_external_mcp_hitl_approval, tests/test_external_mcp_hitl.py:test_external_mcp_hitl_rejection, tests/test_tool_presets_execution.py:test_tool_ask_approval, tests/test_tool_presets_execution.py:test_tool_auto_approval, tests/test_tool_presets_execution.py:test_tool_rejection]
|
||||
"""
|
||||
if tier:
|
||||
set_current_tier(tier)
|
||||
set_current_tier(tier)
|
||||
out = ""
|
||||
tool_executed = False
|
||||
events.emit("tool_execution", payload={"status": "started", "tool": name, "args": args, "round": r_idx})
|
||||
|
||||
Reference in New Issue
Block a user