8f11340b38
Per post_module_taxonomy_de_cruft_20260627 Phase 2 (FR7). Each
'from src.models import X' for a moved class is rewritten to
'from src.<destination> import X':
Ticket, Track, WorkerContext, TrackState, TrackMetadata,
ThinkingSegment, EMPTY_TRACK_STATE -> src.mma
ProjectContext, ProjectMeta, ProjectOutput, ProjectFiles,
ProjectScreenshots, ProjectDiscussion, EMPTY_PROJECT_CONTEXT -> src.project
FileItem, Preset, ContextPreset, ContextFileEntry,
NamedViewPreset -> src.project_files
Tool, ToolPreset -> src.tool_presets
BiasProfile -> src.tool_bias
TextEditorConfig, ExternalEditorConfig,
EMPTY_TEXT_EDITOR_CONFIG -> src.external_editor
Persona -> src.personas
WorkspaceProfile -> src.workspace_manager
MCPServerConfig, MCPConfiguration, VectorStoreConfig,
RAGConfig, load_mcp_config -> src.mcp_client
NOT touched (kept on src.models; Phase 3 or Phase 4 will move them):
GenerateRequest, ConfirmRequest, DEFAULT_TOOL_CATEGORIES, Metadata, PROVIDERS
Migration was performed by the one-time script
scripts/tier2/artifacts/post_module_taxonomy_de_cruft_20260627/migrate_imports.py
which uses a class-to-module map and re.sub() to rewrite each
'from src.models import X' line.
Total: 85 import lines rewritten across 71 files.
Note: this commit depends on the v2 SHIPPED work
(origin/tier2/module_taxonomy_refactor_20260627) being merged into
this branch NEXT. On master (without the v2 SHIPPED commits), the
destination modules do not exist and these imports would fail.
148 lines
6.9 KiB
Python
148 lines
6.9 KiB
Python
from typing import Any
|
|
import pytest
|
|
from unittest.mock import MagicMock, patch
|
|
from src.mma import Ticket, Track
|
|
from src import multi_agent_conductor
|
|
from src.multi_agent_conductor import ConductorEngine
|
|
from src import ai_client
|
|
from src.result_types import Result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_headless_verification_full_run(vlogger) -> None:
|
|
"""
|
|
|
|
|
|
1. Initialize a ConductorEngine with a Track containing multiple dependent Tickets.
|
|
2. Simulate a full execution run using engine.run().
|
|
3. Mock ai_client.send to simulate successful tool calls and final responses.
|
|
4. Specifically verify that 'Context Amnesia' is maintained.
|
|
"""
|
|
t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
|
|
t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker1", depends_on=["T1"])
|
|
track = Track(id="track_verify", description="Verification Track", tickets=[t1, t2])
|
|
from src.events import SyncEventQueue
|
|
queue = SyncEventQueue()
|
|
engine = ConductorEngine(track=track, event_queue=queue, auto_queue=True)
|
|
|
|
vlogger.log_state("T1 Status Initial", "todo", t1.status)
|
|
vlogger.log_state("T2 Status Initial", "todo", t2.status)
|
|
|
|
# We must patch where it is USED: multi_agent_conductor
|
|
with patch("src.multi_agent_conductor.ai_client.send") as mock_send, \
|
|
patch("src.multi_agent_conductor.ai_client.reset_session") as mock_reset, \
|
|
patch("src.multi_agent_conductor.confirm_spawn", return_value=(True, "mock_prompt", "mock_ctx")):
|
|
# We need mock_send to return something that doesn't contain "BLOCKED"
|
|
mock_send.return_value = Result(data="Task completed successfully.")
|
|
engine.run()
|
|
|
|
vlogger.log_state("T1 Status Final", "todo", t1.status)
|
|
vlogger.log_state("T2 Status Final", "todo", t2.status)
|
|
|
|
# Verify both tickets are completed
|
|
assert t1.status == "completed"
|
|
assert t2.status == "completed"
|
|
# Verify that ai_client.send was called twice (once for each ticket)
|
|
assert mock_send.call_count == 2
|
|
# Verify Context Amnesia: reset_session should be called for each ticket
|
|
assert mock_reset.call_count == 2
|
|
vlogger.finalize("Headless full run with Context Amnesia", "PASS", "Tickets completed and session reset twice.")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_headless_verification_error_and_qa_interceptor(vlogger) -> None:
|
|
"""
|
|
|
|
|
|
5. Simulate a shell error and verify that the Tier 4 QA interceptor is triggered
|
|
and its summary is injected into the worker's history for the next retry.
|
|
"""
|
|
t1 = Ticket(id="T1", description="Task with error", status="todo", assigned_to="worker1")
|
|
track = Track(id="track_error", description="Error Track", tickets=[t1])
|
|
from src.events import SyncEventQueue
|
|
queue = SyncEventQueue()
|
|
engine = ConductorEngine(track=track, event_queue=queue, auto_queue=True)
|
|
# We need to simulate the tool loop inside ai_client._send_gemini (or similar)
|
|
# Since we want to test the real tool loop and QA injection, we mock at the provider level.
|
|
with patch("src.ai_client._provider", "gemini"), \
|
|
patch("src.ai_client._gemini_client") as mock_genai_client, \
|
|
patch("src.ai_client.confirm_and_run_callback") as mock_run, \
|
|
patch("src.ai_client.run_tier4_analysis", return_value="FIX: Check if path exists.") as mock_qa, \
|
|
patch("src.ai_client._ensure_gemini_client") as mock_ensure, \
|
|
patch("src.ai_client._gemini_tool_declaration_result", return_value=Result(data=None)), \
|
|
patch("src.multi_agent_conductor.confirm_spawn", return_value=(True, "mock_prompt", "mock_ctx")):
|
|
# Ensure _gemini_client is restored by the mock ensure function
|
|
|
|
def restore_client() -> None:
|
|
ai_client._gemini_client = mock_genai_client
|
|
mock_ensure.side_effect = restore_client
|
|
ai_client._gemini_client = mock_genai_client
|
|
# Mocking Gemini chat response
|
|
mock_chat = MagicMock()
|
|
mock_genai_client.chats.create.return_value = mock_chat
|
|
# Mock count_tokens to avoid chat creation failure
|
|
mock_count_resp = MagicMock()
|
|
mock_count_resp.total_tokens = 100
|
|
mock_genai_client.models.count_tokens.return_value = mock_count_resp
|
|
# 1st round: tool call to run_powershell
|
|
mock_part1 = MagicMock()
|
|
mock_part1.text = "I will run a command."
|
|
mock_part1.function_call = MagicMock()
|
|
mock_part1.function_call.name = "run_powershell"
|
|
mock_part1.function_call.args = {"script": "dir"}
|
|
mock_resp1 = MagicMock()
|
|
mock_resp1.candidates = [MagicMock(content=MagicMock(parts=[mock_part1]), finish_reason=MagicMock(name="STOP"))]
|
|
mock_resp1.usage_metadata.prompt_token_count = 10
|
|
mock_resp1.usage_metadata.candidates_token_count = 5
|
|
mock_resp1.text = mock_part1.text
|
|
# 2nd round: Final text after tool result
|
|
mock_part2 = MagicMock()
|
|
mock_part2.text = "The command failed but I understand why. Task done."
|
|
mock_part2.function_call = None
|
|
mock_resp2 = MagicMock()
|
|
mock_resp2.candidates = [MagicMock(content=MagicMock(parts=[mock_part2]), finish_reason=MagicMock(name="STOP"))]
|
|
mock_resp2.usage_metadata.prompt_token_count = 20
|
|
mock_resp2.usage_metadata.candidates_token_count = 10
|
|
mock_resp2.text = mock_part2.text
|
|
mock_chat.send_message.side_effect = [mock_resp1, mock_resp2]
|
|
# Handle streaming calls
|
|
def make_stream_mock(resp):
|
|
m = MagicMock()
|
|
m.__iter__.return_value = [resp]
|
|
m.candidates = resp.candidates
|
|
m.usage_metadata = resp.usage_metadata
|
|
return m
|
|
mock_chat.send_message_stream.side_effect = [make_stream_mock(mock_resp1), make_stream_mock(mock_resp2)]
|
|
# Mock run_powershell behavior: it should call the qa_callback on error
|
|
|
|
def run_side_effect(script: Any, base_dir: Any, qa_callback: Any, patch_callback: Any = None) -> Any:
|
|
if qa_callback:
|
|
analysis = qa_callback("Error: file not found")
|
|
return f"STDERR: Error: file not found\n\nQA ANALYSIS:\n{analysis}"
|
|
return "Error: file not found"
|
|
mock_run.side_effect = run_side_effect
|
|
|
|
vlogger.log_state("T1 Initial Status", "todo", t1.status)
|
|
|
|
# Patch engine used in test
|
|
with patch("src.multi_agent_conductor.run_worker_lifecycle", wraps=multi_agent_conductor.run_worker_lifecycle):
|
|
engine.run(max_ticks=1)
|
|
|
|
vlogger.log_state("T1 Final Status", "todo", t1.status)
|
|
|
|
# Verify QA analysis was triggered
|
|
mock_qa.assert_called_once_with("Error: file not found")
|
|
# Verify the 2nd send_message call includes the QA ANALYSIS in its payload (f_resps)
|
|
# The first call is the user message, the second is the tool response.
|
|
assert (mock_chat.send_message.call_count + mock_chat.send_message_stream.call_count) == 2
|
|
|
|
# Get the second call's payload (either from send_message or send_message_stream)
|
|
calls = mock_chat.send_message.call_args_list + mock_chat.send_message_stream.call_args_list
|
|
args, kwargs = calls[1]
|
|
f_resps = args[0]
|
|
|
|
found_qa = False
|
|
for part in f_resps:
|
|
part_str = str(part)
|
|
if "QA ANALYSIS:" in part_str and "FIX: Check if path exists." in part_str:
|
|
found_qa = True
|
|
assert found_qa, "QA Analysis was not injected into the next round"
|
|
vlogger.finalize("Tier 4 QA Injection", "PASS", "QA summary injected into next worker round.") |