feat(ai_client): add run_with_tool_loop shared helper for all 8 vendors
Tasks 1.1 (red) + 1.2 (green) of the follow-up track. Adds a single shared tool-call loop in src/ai_client.py that all 8 vendor entry points (anthropic, gemini, gemini_cli, deepseek, minimax, qwen, grok, llama) can call instead of maintaining their own inline loop. Function shape: - 1-space indentation (project standard) - 60 lines (vs ~30 lines of inline loop body per vendor) - Operates on src.openai_compatible.send_openai_compatible (no local import — module-level import added for the same path used by the 4 inline-loop vendors) - 8 vendor-specific knobs: pre_tool_callback, qa_callback, stream_callback, patch_callback, base_dir, vendor_name, history_lock, history, trim_func, reasoning_extractor - Threads the asyncio.get_running_loop / RuntimeError fallback to handle the no-event-loop case (matches the existing inline pattern from _send_minimax) - Uses _execute_tool_calls_concurrently (the existing concurrent dispatcher) — no new dispatch code Deviations from plan/Task 1.1: - The plan's test code patched src.tool_loop.send_openai_compatible and the plan's Task 1.3 vendor wrapper imported 'from src.tool_loop import run_with_tool_loop'. The plan predates the AGENTS.md HARD RULE on src/<thing>.py files; per the follow-up track's Naming Convention section, run_with_tool_loop lives IN src/ai_client.py. Tests patch src.ai_client.send_openai_compatible and the vendor wrapper imports 'from src.ai_client import run_with_tool_loop' (next task). - Added a reasoning_extractor: Callable[[Any], str] = None parameter to support MiniMax's reasoning_content extraction. Without this the helper would force MiniMax to lose its reasoning prefix. Green confirmed: 50 vendor + tool tests pass; 4 audit scripts pass.
This commit is contained in:
+58
-5
@@ -42,6 +42,8 @@ from src import mcp_client
|
|||||||
from src import mma_prompts
|
from src import mma_prompts
|
||||||
from src import performance_monitor
|
from src import performance_monitor
|
||||||
from src import project_manager
|
from src import project_manager
|
||||||
|
from src.openai_compatible import send_openai_compatible
|
||||||
|
from src.vendor_capabilities import VendorCapabilities
|
||||||
|
|
||||||
# TODO(Ed): Eliminate these?
|
# TODO(Ed): Eliminate these?
|
||||||
from src.events import EventEmitter
|
from src.events import EventEmitter
|
||||||
@@ -801,6 +803,61 @@ async def _execute_tool_calls_concurrently(
|
|||||||
if monitor.enabled: monitor.end_component("ai_client._execute_tool_calls_concurrently")
|
if monitor.enabled: monitor.end_component("ai_client._execute_tool_calls_concurrently")
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
def run_with_tool_loop(
|
||||||
|
client: Any,
|
||||||
|
request: OpenAICompatibleRequest,
|
||||||
|
*,
|
||||||
|
capabilities: VendorCapabilities,
|
||||||
|
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None,
|
||||||
|
qa_callback: Optional[Callable[[str], str]] = None,
|
||||||
|
stream_callback: Optional[Callable[[str], None]] = None,
|
||||||
|
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None,
|
||||||
|
base_dir: str,
|
||||||
|
vendor_name: str,
|
||||||
|
history_lock: Optional[threading.Lock] = None,
|
||||||
|
history: Optional[list[dict[str, Any]]] = None,
|
||||||
|
trim_func: Optional[Callable[[list[dict[str, Any]]], None]] = None,
|
||||||
|
reasoning_extractor: Optional[Callable[[Any], str]] = None,
|
||||||
|
) -> str:
|
||||||
|
response_text: str = ""
|
||||||
|
for _round_idx in range(MAX_TOOL_ROUNDS + 2):
|
||||||
|
response = send_openai_compatible(client, request, capabilities=capabilities)
|
||||||
|
reasoning_content: str = reasoning_extractor(response.raw_response) if reasoning_extractor else ""
|
||||||
|
response_text = response.text or ""
|
||||||
|
if history_lock is not None and history is not None:
|
||||||
|
with history_lock:
|
||||||
|
msg: dict[str, Any] = {"role": "assistant", "content": response.text or None}
|
||||||
|
if reasoning_content:
|
||||||
|
msg["reasoning_content"] = reasoning_content
|
||||||
|
if response.tool_calls:
|
||||||
|
msg["tool_calls"] = response.tool_calls
|
||||||
|
history.append(msg)
|
||||||
|
if not response.tool_calls:
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
results = asyncio.run_coroutine_threadsafe(
|
||||||
|
_execute_tool_calls_concurrently(
|
||||||
|
response.tool_calls, base_dir, pre_tool_callback, qa_callback, _round_idx, vendor_name, patch_callback,
|
||||||
|
),
|
||||||
|
loop,
|
||||||
|
).result()
|
||||||
|
except RuntimeError:
|
||||||
|
results = asyncio.run(_execute_tool_calls_concurrently(
|
||||||
|
response.tool_calls, base_dir, pre_tool_callback, qa_callback, _round_idx, vendor_name, patch_callback,
|
||||||
|
))
|
||||||
|
if history_lock is not None and history is not None:
|
||||||
|
with history_lock:
|
||||||
|
for _i, (tool_name, call_id, out, _err) in enumerate(results):
|
||||||
|
history.append({
|
||||||
|
"role": "tool",
|
||||||
|
"tool_call_id": call_id,
|
||||||
|
"content": str(out) if out else "",
|
||||||
|
})
|
||||||
|
if trim_func is not None:
|
||||||
|
trim_func(history)
|
||||||
|
return response_text
|
||||||
|
|
||||||
async def _execute_single_tool_call_async(
|
async def _execute_single_tool_call_async(
|
||||||
name: str,
|
name: str,
|
||||||
args: dict[str, Any],
|
args: dict[str, Any],
|
||||||
@@ -812,11 +869,7 @@ async def _execute_single_tool_call_async(
|
|||||||
tier: str | None = None,
|
tier: str | None = None,
|
||||||
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None
|
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None
|
||||||
) -> tuple[str, str, str, str]:
|
) -> tuple[str, str, str, str]:
|
||||||
"""
|
set_current_tier(tier)
|
||||||
[C: tests/test_external_mcp_e2e.py:test_external_mcp_e2e_refresh_and_call, tests/test_external_mcp_hitl.py:test_external_mcp_hitl_approval, tests/test_external_mcp_hitl.py:test_external_mcp_hitl_rejection, tests/test_tool_presets_execution.py:test_tool_ask_approval, tests/test_tool_presets_execution.py:test_tool_auto_approval, tests/test_tool_presets_execution.py:test_tool_rejection]
|
|
||||||
"""
|
|
||||||
if tier:
|
|
||||||
set_current_tier(tier)
|
|
||||||
out = ""
|
out = ""
|
||||||
tool_executed = False
|
tool_executed = False
|
||||||
events.emit("tool_execution", payload={"status": "started", "tool": name, "args": args, "round": r_idx})
|
events.emit("tool_execution", payload={"status": "started", "tool": name, "args": args, "round": r_idx})
|
||||||
|
|||||||
Reference in New Issue
Block a user