Private
Public Access
0
0
Files
manual_slop/tests/test_headless_verification.py
T
ed dc397db7ed refactor(src): eliminate 11 T | None legacy wrappers in favor of _result API
TIER-3 READ AGENTS.md + conductor/workflow.md + conductor/code_styleguides/error_handling.md + the 4 source files + 3 test files before this commit.

The code_path_audit_phase_2_20260624 track (Tier 2) shipped 11 audit
fixes (4 NG1 + 7 NG2) but used a heuristic bypass for 4 of the NG2
wrappers: legacy T | None functions that exist only to maintain test
patcher compatibility. Per the review at
docs/reports/REVIEW_TIER2_code_path_audit_phase_2_20260624.md Finding 8,
this track eliminates the legacy wrappers properly.

11 wrappers eliminated (8 main + 3 _legacy_compat inner):
- src/ai_client.py: get_current_tier (1 src + 1 test consumer)
- src/ai_client.py: _gemini_tool_declaration + _legacy_compat (2 test consumers)
- src/ai_client.py: run_tier4_patch_callback + _legacy_compat (was 0 direct callers
  but had 2 callback references in app_controller/multi_agent_conductor;
  callback contract migrated to Callable[[str, str], Result[str]] instead of
  preserving an Optional[str] adapter)
- src/mcp_client.py: _get_symbol_node + _legacy_compat (8 in-file consumers)
- src/mcp_client.py: find_in_scope (nested inside _get_symbol_node_result;
  private impl detail, audit doesn't catch T | None, left as-is)
- src/external_editor.py: launch_diff (1 src + 3 test + 1 live_gui test consumer)
- src/external_editor.py: launch_editor (no consumers; deleted)
- src/session_logger.py: log_tool_output (2 src + 3 test consumers)
- src/project_manager.py: parse_ts (no consumers; deleted)

For each consumer: replace legacy_fn(args) with legacy_fn_result(args).data.
For T | None checks: replace if x is None: with if not result.ok: or
if not result.ok or not isinstance(result.data, ...) (depending on pattern).

For run_tier4_patch_callback specifically: the wrapper was a callback adapter
(not a backward-compat shim) and had 2 callback references as consumers.
Rather than keep the adapter (which would re-introduce the Optional[str]
return that the strict audit catches), the patch_callback contract was migrated
from Callable[[str, str], Optional[str]] to Callable[[str, str], Result[str]]
in shell_runner.py + app_controller.py + 9 _send_<vendor>_result signatures
in ai_client.py. This propagates the Result[str] through the callback and
lets shell_runner unwrap with if r.ok and r.data instead of if patch_text.

Verification:
- audit_optional_in_3_files --strict: 0 return-type Optional[T] (down from 1)
- audit_exception_handling --strict: 0 violations (unchanged)
- audit_legacy_wrappers: 0 legacy wrappers (unchanged)
- 15 affected test files: 168 tests pass
- 8 mcp_client/structural/baseline test files: 55 tests pass
- 3 session/gui test files: 7 tests pass
- 0 return-type Optional[T] in src/ai_client.py (was 1: run_tier4_patch_callback)
2026-06-25 11:18:03 -04:00

148 lines
6.9 KiB
Python

from typing import Any
import pytest
from unittest.mock import MagicMock, patch
from src.models import Ticket, Track
from src import multi_agent_conductor
from src.multi_agent_conductor import ConductorEngine
from src import ai_client
from src.result_types import Result
@pytest.mark.asyncio
async def test_headless_verification_full_run(vlogger) -> None:
"""
1. Initialize a ConductorEngine with a Track containing multiple dependent Tickets.
2. Simulate a full execution run using engine.run().
3. Mock ai_client.send to simulate successful tool calls and final responses.
4. Specifically verify that 'Context Amnesia' is maintained.
"""
t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker1", depends_on=["T1"])
track = Track(id="track_verify", description="Verification Track", tickets=[t1, t2])
from src.events import SyncEventQueue
queue = SyncEventQueue()
engine = ConductorEngine(track=track, event_queue=queue, auto_queue=True)
vlogger.log_state("T1 Status Initial", "todo", t1.status)
vlogger.log_state("T2 Status Initial", "todo", t2.status)
# We must patch where it is USED: multi_agent_conductor
with patch("src.multi_agent_conductor.ai_client.send") as mock_send, \
patch("src.multi_agent_conductor.ai_client.reset_session") as mock_reset, \
patch("src.multi_agent_conductor.confirm_spawn", return_value=(True, "mock_prompt", "mock_ctx")):
# We need mock_send to return something that doesn't contain "BLOCKED"
mock_send.return_value = Result(data="Task completed successfully.")
engine.run()
vlogger.log_state("T1 Status Final", "todo", t1.status)
vlogger.log_state("T2 Status Final", "todo", t2.status)
# Verify both tickets are completed
assert t1.status == "completed"
assert t2.status == "completed"
# Verify that ai_client.send was called twice (once for each ticket)
assert mock_send.call_count == 2
# Verify Context Amnesia: reset_session should be called for each ticket
assert mock_reset.call_count == 2
vlogger.finalize("Headless full run with Context Amnesia", "PASS", "Tickets completed and session reset twice.")
@pytest.mark.asyncio
async def test_headless_verification_error_and_qa_interceptor(vlogger) -> None:
"""
5. Simulate a shell error and verify that the Tier 4 QA interceptor is triggered
and its summary is injected into the worker's history for the next retry.
"""
t1 = Ticket(id="T1", description="Task with error", status="todo", assigned_to="worker1")
track = Track(id="track_error", description="Error Track", tickets=[t1])
from src.events import SyncEventQueue
queue = SyncEventQueue()
engine = ConductorEngine(track=track, event_queue=queue, auto_queue=True)
# We need to simulate the tool loop inside ai_client._send_gemini (or similar)
# Since we want to test the real tool loop and QA injection, we mock at the provider level.
with patch("src.ai_client._provider", "gemini"), \
patch("src.ai_client._gemini_client") as mock_genai_client, \
patch("src.ai_client.confirm_and_run_callback") as mock_run, \
patch("src.ai_client.run_tier4_analysis", return_value="FIX: Check if path exists.") as mock_qa, \
patch("src.ai_client._ensure_gemini_client") as mock_ensure, \
patch("src.ai_client._gemini_tool_declaration_result", return_value=Result(data=None)), \
patch("src.multi_agent_conductor.confirm_spawn", return_value=(True, "mock_prompt", "mock_ctx")):
# Ensure _gemini_client is restored by the mock ensure function
def restore_client() -> None:
ai_client._gemini_client = mock_genai_client
mock_ensure.side_effect = restore_client
ai_client._gemini_client = mock_genai_client
# Mocking Gemini chat response
mock_chat = MagicMock()
mock_genai_client.chats.create.return_value = mock_chat
# Mock count_tokens to avoid chat creation failure
mock_count_resp = MagicMock()
mock_count_resp.total_tokens = 100
mock_genai_client.models.count_tokens.return_value = mock_count_resp
# 1st round: tool call to run_powershell
mock_part1 = MagicMock()
mock_part1.text = "I will run a command."
mock_part1.function_call = MagicMock()
mock_part1.function_call.name = "run_powershell"
mock_part1.function_call.args = {"script": "dir"}
mock_resp1 = MagicMock()
mock_resp1.candidates = [MagicMock(content=MagicMock(parts=[mock_part1]), finish_reason=MagicMock(name="STOP"))]
mock_resp1.usage_metadata.prompt_token_count = 10
mock_resp1.usage_metadata.candidates_token_count = 5
mock_resp1.text = mock_part1.text
# 2nd round: Final text after tool result
mock_part2 = MagicMock()
mock_part2.text = "The command failed but I understand why. Task done."
mock_part2.function_call = None
mock_resp2 = MagicMock()
mock_resp2.candidates = [MagicMock(content=MagicMock(parts=[mock_part2]), finish_reason=MagicMock(name="STOP"))]
mock_resp2.usage_metadata.prompt_token_count = 20
mock_resp2.usage_metadata.candidates_token_count = 10
mock_resp2.text = mock_part2.text
mock_chat.send_message.side_effect = [mock_resp1, mock_resp2]
# Handle streaming calls
def make_stream_mock(resp):
m = MagicMock()
m.__iter__.return_value = [resp]
m.candidates = resp.candidates
m.usage_metadata = resp.usage_metadata
return m
mock_chat.send_message_stream.side_effect = [make_stream_mock(mock_resp1), make_stream_mock(mock_resp2)]
# Mock run_powershell behavior: it should call the qa_callback on error
def run_side_effect(script: Any, base_dir: Any, qa_callback: Any, patch_callback: Any = None) -> Any:
if qa_callback:
analysis = qa_callback("Error: file not found")
return f"STDERR: Error: file not found\n\nQA ANALYSIS:\n{analysis}"
return "Error: file not found"
mock_run.side_effect = run_side_effect
vlogger.log_state("T1 Initial Status", "todo", t1.status)
# Patch engine used in test
with patch("src.multi_agent_conductor.run_worker_lifecycle", wraps=multi_agent_conductor.run_worker_lifecycle):
engine.run(max_ticks=1)
vlogger.log_state("T1 Final Status", "todo", t1.status)
# Verify QA analysis was triggered
mock_qa.assert_called_once_with("Error: file not found")
# Verify the 2nd send_message call includes the QA ANALYSIS in its payload (f_resps)
# The first call is the user message, the second is the tool response.
assert (mock_chat.send_message.call_count + mock_chat.send_message_stream.call_count) == 2
# Get the second call's payload (either from send_message or send_message_stream)
calls = mock_chat.send_message.call_args_list + mock_chat.send_message_stream.call_args_list
args, kwargs = calls[1]
f_resps = args[0]
found_qa = False
for part in f_resps:
part_str = str(part)
if "QA ANALYSIS:" in part_str and "FIX: Check if path exists." in part_str:
found_qa = True
assert found_qa, "QA Analysis was not injected into the next round"
vlogger.finalize("Tier 4 QA Injection", "PASS", "QA summary injected into next worker round.")