Private
Public Access
0
0
Files
manual_slop/tests/test_headless_verification.py
T
ed ada9617308 test(ai_client): rename send_result to send in 22 remaining test files
Batch rename of 22 test files. 62 references renamed total.

The full test suite is now GREEN again, matching the pre-rename baseline
from Task 1.1. Pure mechanical rename. No behavior change.

Files affected: test_ai_cache_tracking, test_ai_client_cli,
test_ai_client_result, test_api_events, test_context_pruner,
test_deepseek_provider, test_gemini_cli_* (3 files), test_gui2_mcp,
test_headless_* (2 files), test_live_gui_integration_v2,
test_orchestration_logic, test_phase6_engine, test_rag_integration,
test_run_worker_lifecycle_abort, test_spawn_interception_v2,
test_symbol_parsing, test_tier4_interceptor, test_tiered_aggregation,
test_token_usage.

Note: spec estimated 24 files; actual is 22 (test_deprecation_warnings
no longer exists, and 1 fewer file than spec's list).

Refs: conductor/tracks/send_result_to_send_20260616/
2026-06-17 00:38:29 -04:00

148 lines
6.9 KiB
Python

from typing import Any
import pytest
from unittest.mock import MagicMock, patch
from src.models import Ticket, Track
from src import multi_agent_conductor
from src.multi_agent_conductor import ConductorEngine
from src import ai_client
from src.result_types import Result
@pytest.mark.asyncio
async def test_headless_verification_full_run(vlogger) -> None:
"""
1. Initialize a ConductorEngine with a Track containing multiple dependent Tickets.
2. Simulate a full execution run using engine.run().
3. Mock ai_client.send to simulate successful tool calls and final responses.
4. Specifically verify that 'Context Amnesia' is maintained.
"""
t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker1", depends_on=["T1"])
track = Track(id="track_verify", description="Verification Track", tickets=[t1, t2])
from src.events import SyncEventQueue
queue = SyncEventQueue()
engine = ConductorEngine(track=track, event_queue=queue, auto_queue=True)
vlogger.log_state("T1 Status Initial", "todo", t1.status)
vlogger.log_state("T2 Status Initial", "todo", t2.status)
# We must patch where it is USED: multi_agent_conductor
with patch("src.multi_agent_conductor.ai_client.send") as mock_send, \
patch("src.multi_agent_conductor.ai_client.reset_session") as mock_reset, \
patch("src.multi_agent_conductor.confirm_spawn", return_value=(True, "mock_prompt", "mock_ctx")):
# We need mock_send to return something that doesn't contain "BLOCKED"
mock_send.return_value = Result(data="Task completed successfully.")
engine.run()
vlogger.log_state("T1 Status Final", "todo", t1.status)
vlogger.log_state("T2 Status Final", "todo", t2.status)
# Verify both tickets are completed
assert t1.status == "completed"
assert t2.status == "completed"
# Verify that ai_client.send was called twice (once for each ticket)
assert mock_send.call_count == 2
# Verify Context Amnesia: reset_session should be called for each ticket
assert mock_reset.call_count == 2
vlogger.finalize("Headless full run with Context Amnesia", "PASS", "Tickets completed and session reset twice.")
@pytest.mark.asyncio
async def test_headless_verification_error_and_qa_interceptor(vlogger) -> None:
"""
5. Simulate a shell error and verify that the Tier 4 QA interceptor is triggered
and its summary is injected into the worker's history for the next retry.
"""
t1 = Ticket(id="T1", description="Task with error", status="todo", assigned_to="worker1")
track = Track(id="track_error", description="Error Track", tickets=[t1])
from src.events import SyncEventQueue
queue = SyncEventQueue()
engine = ConductorEngine(track=track, event_queue=queue, auto_queue=True)
# We need to simulate the tool loop inside ai_client._send_gemini (or similar)
# Since we want to test the real tool loop and QA injection, we mock at the provider level.
with patch("src.ai_client._provider", "gemini"), \
patch("src.ai_client._gemini_client") as mock_genai_client, \
patch("src.ai_client.confirm_and_run_callback") as mock_run, \
patch("src.ai_client.run_tier4_analysis", return_value="FIX: Check if path exists.") as mock_qa, \
patch("src.ai_client._ensure_gemini_client") as mock_ensure, \
patch("src.ai_client._gemini_tool_declaration", return_value=None), \
patch("src.multi_agent_conductor.confirm_spawn", return_value=(True, "mock_prompt", "mock_ctx")):
# Ensure _gemini_client is restored by the mock ensure function
def restore_client() -> None:
ai_client._gemini_client = mock_genai_client
mock_ensure.side_effect = restore_client
ai_client._gemini_client = mock_genai_client
# Mocking Gemini chat response
mock_chat = MagicMock()
mock_genai_client.chats.create.return_value = mock_chat
# Mock count_tokens to avoid chat creation failure
mock_count_resp = MagicMock()
mock_count_resp.total_tokens = 100
mock_genai_client.models.count_tokens.return_value = mock_count_resp
# 1st round: tool call to run_powershell
mock_part1 = MagicMock()
mock_part1.text = "I will run a command."
mock_part1.function_call = MagicMock()
mock_part1.function_call.name = "run_powershell"
mock_part1.function_call.args = {"script": "dir"}
mock_resp1 = MagicMock()
mock_resp1.candidates = [MagicMock(content=MagicMock(parts=[mock_part1]), finish_reason=MagicMock(name="STOP"))]
mock_resp1.usage_metadata.prompt_token_count = 10
mock_resp1.usage_metadata.candidates_token_count = 5
mock_resp1.text = mock_part1.text
# 2nd round: Final text after tool result
mock_part2 = MagicMock()
mock_part2.text = "The command failed but I understand why. Task done."
mock_part2.function_call = None
mock_resp2 = MagicMock()
mock_resp2.candidates = [MagicMock(content=MagicMock(parts=[mock_part2]), finish_reason=MagicMock(name="STOP"))]
mock_resp2.usage_metadata.prompt_token_count = 20
mock_resp2.usage_metadata.candidates_token_count = 10
mock_resp2.text = mock_part2.text
mock_chat.send_message.side_effect = [mock_resp1, mock_resp2]
# Handle streaming calls
def make_stream_mock(resp):
m = MagicMock()
m.__iter__.return_value = [resp]
m.candidates = resp.candidates
m.usage_metadata = resp.usage_metadata
return m
mock_chat.send_message_stream.side_effect = [make_stream_mock(mock_resp1), make_stream_mock(mock_resp2)]
# Mock run_powershell behavior: it should call the qa_callback on error
def run_side_effect(script: Any, base_dir: Any, qa_callback: Any, patch_callback: Any = None) -> Any:
if qa_callback:
analysis = qa_callback("Error: file not found")
return f"STDERR: Error: file not found\n\nQA ANALYSIS:\n{analysis}"
return "Error: file not found"
mock_run.side_effect = run_side_effect
vlogger.log_state("T1 Initial Status", "todo", t1.status)
# Patch engine used in test
with patch("src.multi_agent_conductor.run_worker_lifecycle", wraps=multi_agent_conductor.run_worker_lifecycle):
engine.run(max_ticks=1)
vlogger.log_state("T1 Final Status", "todo", t1.status)
# Verify QA analysis was triggered
mock_qa.assert_called_once_with("Error: file not found")
# Verify the 2nd send_message call includes the QA ANALYSIS in its payload (f_resps)
# The first call is the user message, the second is the tool response.
assert (mock_chat.send_message.call_count + mock_chat.send_message_stream.call_count) == 2
# Get the second call's payload (either from send_message or send_message_stream)
calls = mock_chat.send_message.call_args_list + mock_chat.send_message_stream.call_args_list
args, kwargs = calls[1]
f_resps = args[0]
found_qa = False
for part in f_resps:
part_str = str(part)
if "QA ANALYSIS:" in part_str and "FIX: Check if path exists." in part_str:
found_qa = True
assert found_qa, "QA Analysis was not injected into the next round"
vlogger.finalize("Tier 4 QA Injection", "PASS", "QA summary injected into next worker round.")