Files
manual_slop/tests/test_conductor_abort_event.py
T
ed b5e512f483 feat(sdm): inject structural dependency mapping tags across codebase
Adds [C: caller] tags to functions/methods and [M: mutation] / [U: usage] tags to class variables based on cross-module call analysis.
2026-05-13 22:35:52 -04:00

34 lines
1.2 KiB
Python

import pytest
from unittest.mock import MagicMock, patch
from src.multi_agent_conductor import ConductorEngine
from src.models import Ticket, Track
import threading
def test_conductor_abort_event_populated():
"""
Test that ConductorEngine populates _abort_events when spawning a worker.
"""
# 1. Mock WorkerPool.spawn to return a mock thread
with patch('src.multi_agent_conductor.WorkerPool.spawn') as mock_spawn:
mock_spawn.return_value = MagicMock(spec=threading.Thread)
# 2. Mock ExecutionEngine.tick
with patch('src.multi_agent_conductor.ExecutionEngine.tick') as mock_tick:
ticket_id = "test-ticket"
ticket = Ticket(id=ticket_id, description="Test description", status="todo")
mock_tick.return_value = [ticket]
mock_track = Track(id="test-track", description="Test Track", tickets=[ticket])
# 3. Set auto_queue=True
mock_queue = MagicMock()
engine = ConductorEngine(track=mock_track, event_queue=mock_queue, auto_queue=True)
# 4. Call ConductorEngine.run(max_ticks=1)
engine.run(max_ticks=1)
# 5. Assert that self._abort_events has an entry for the ticket ID
assert ticket_id in engine._abort_events
assert isinstance(engine._abort_events[ticket_id], threading.Event)