Files
manual_slop/tests/test_conductor_engine_abort.py

54 lines
1.5 KiB
Python

import pytest
from unittest.mock import MagicMock
import threading
import time
from src.multi_agent_conductor import ConductorEngine
from src.models import Track
def test_conductor_engine_initializes_empty_worker_and_abort_dicts() -> None:
"""
Test that ConductorEngine correctly initializes _active_workers and _abort_events as empty dictionaries.
"""
# Mock the track object
mock_track = MagicMock(spec=Track)
mock_track.tickets = []
# Initialize ConductorEngine
engine = ConductorEngine(track=mock_track)
# Verify _active_workers and _abort_events are empty dictionaries
assert engine._active_workers == {}
assert engine._abort_events == {}
def test_kill_worker_sets_abort_and_joins_thread() -> None:
"""
Test kill_worker: mock a running thread in _active_workers, call kill_worker,
assert abort_event is set and thread is joined.
"""
mock_track = MagicMock(spec=Track)
mock_track.tickets = []
engine = ConductorEngine(track=mock_track)
ticket_id = "test-ticket"
abort_event = threading.Event()
engine._abort_events[ticket_id] = abort_event
# Create a thread that waits for the abort event
def worker():
abort_event.wait(timeout=2.0)
thread = threading.Thread(target=worker)
thread.start()
with engine._workers_lock:
engine._active_workers[ticket_id] = thread
# Call kill_worker
engine.kill_worker(ticket_id)
# Assertions
assert abort_event.is_set()
assert not thread.is_alive()
with engine._workers_lock:
assert ticket_id not in engine._active_workers