54 lines
1.5 KiB
Python
54 lines
1.5 KiB
Python
import pytest
|
|
from unittest.mock import MagicMock
|
|
import threading
|
|
import time
|
|
from src.multi_agent_conductor import ConductorEngine
|
|
from src.models import Track
|
|
|
|
def test_conductor_engine_initializes_empty_worker_and_abort_dicts() -> None:
|
|
"""
|
|
Test that ConductorEngine correctly initializes _active_workers and _abort_events as empty dictionaries.
|
|
"""
|
|
# Mock the track object
|
|
mock_track = MagicMock(spec=Track)
|
|
mock_track.tickets = []
|
|
|
|
# Initialize ConductorEngine
|
|
engine = ConductorEngine(track=mock_track)
|
|
|
|
# Verify _active_workers and _abort_events are empty dictionaries
|
|
assert engine._active_workers == {}
|
|
assert engine._abort_events == {}
|
|
|
|
def test_kill_worker_sets_abort_and_joins_thread() -> None:
|
|
"""
|
|
Test kill_worker: mock a running thread in _active_workers, call kill_worker,
|
|
assert abort_event is set and thread is joined.
|
|
"""
|
|
mock_track = MagicMock(spec=Track)
|
|
mock_track.tickets = []
|
|
engine = ConductorEngine(track=mock_track)
|
|
|
|
ticket_id = "test-ticket"
|
|
abort_event = threading.Event()
|
|
engine._abort_events[ticket_id] = abort_event
|
|
|
|
# Create a thread that waits for the abort event
|
|
def worker():
|
|
abort_event.wait(timeout=2.0)
|
|
|
|
thread = threading.Thread(target=worker)
|
|
thread.start()
|
|
|
|
with engine._workers_lock:
|
|
engine._active_workers[ticket_id] = thread
|
|
|
|
# Call kill_worker
|
|
engine.kill_worker(ticket_id)
|
|
|
|
# Assertions
|
|
assert abort_event.is_set()
|
|
assert not thread.is_alive()
|
|
with engine._workers_lock:
|
|
assert ticket_id not in engine._active_workers
|