Private
Public Access
0
0
Files
manual_slop/tests/test_parallel_execution.py
T
ed 8f11340b38 refactor(consumers): migrate 85 'from src.models import' sites to direct subsystem imports
Per post_module_taxonomy_de_cruft_20260627 Phase 2 (FR7). Each
'from src.models import X' for a moved class is rewritten to
'from src.<destination> import X':

  Ticket, Track, WorkerContext, TrackState, TrackMetadata,
    ThinkingSegment, EMPTY_TRACK_STATE            -> src.mma
  ProjectContext, ProjectMeta, ProjectOutput, ProjectFiles,
    ProjectScreenshots, ProjectDiscussion, EMPTY_PROJECT_CONTEXT -> src.project
  FileItem, Preset, ContextPreset, ContextFileEntry,
    NamedViewPreset                                -> src.project_files
  Tool, ToolPreset                                 -> src.tool_presets
  BiasProfile                                      -> src.tool_bias
  TextEditorConfig, ExternalEditorConfig,
    EMPTY_TEXT_EDITOR_CONFIG                       -> src.external_editor
  Persona                                          -> src.personas
  WorkspaceProfile                                -> src.workspace_manager
  MCPServerConfig, MCPConfiguration, VectorStoreConfig,
    RAGConfig, load_mcp_config                      -> src.mcp_client

NOT touched (kept on src.models; Phase 3 or Phase 4 will move them):
  GenerateRequest, ConfirmRequest, DEFAULT_TOOL_CATEGORIES, Metadata, PROVIDERS

Migration was performed by the one-time script
scripts/tier2/artifacts/post_module_taxonomy_de_cruft_20260627/migrate_imports.py
which uses a class-to-module map and re.sub() to rewrite each
'from src.models import X' line.

Total: 85 import lines rewritten across 71 files.

Note: this commit depends on the v2 SHIPPED work
(origin/tier2/module_taxonomy_refactor_20260627) being merged into
this branch NEXT. On master (without the v2 SHIPPED commits), the
destination modules do not exist and these imports would fail.
2026-06-26 13:34:03 -04:00

113 lines
3.2 KiB
Python

import threading
import time
import sys
import pytest
from unittest.mock import MagicMock
from src.multi_agent_conductor import WorkerPool
def test_worker_pool_limit():
max_workers = 2
pool = WorkerPool(max_workers=max_workers)
def slow_task(event):
event.set()
time.sleep(0.5)
event1 = threading.Event()
event2 = threading.Event()
event3 = threading.Event()
# Spawn 2 tasks
t1 = pool.spawn("t1", slow_task, (event1,))
t2 = pool.spawn("t2", slow_task, (event2,))
assert t1 is not None
assert t2 is not None
assert pool.get_active_count() == 2
assert pool.is_full() is True
# Try to spawn a 3rd task
t3 = pool.spawn("t3", slow_task, (event3,))
assert not t3.is_alive()
assert pool.get_active_count() == 2
# Wait for tasks to finish
event1.wait()
event2.wait()
pool.join_all()
assert pool.get_active_count() == 0
assert pool.is_full() is False
def test_worker_pool_tracking():
pool = WorkerPool(max_workers=4)
def task(ticket_id):
time.sleep(0.1)
pool.spawn("ticket_1", task, ("ticket_1",))
pool.spawn("ticket_2", task, ("ticket_2",))
assert "ticket_1" in pool._active
assert "ticket_2" in pool._active
pool.join_all()
assert len(pool._active) == 0
def test_worker_pool_completion_cleanup():
pool = WorkerPool(max_workers=4)
def fast_task():
pass
pool.spawn("t1", fast_task, ())
time.sleep(0.2) # Give it time to finish and run finally block
assert pool.get_active_count() == 0
assert "t1" not in pool._active
from unittest.mock import patch
from src.mma import Track, Ticket
from src.multi_agent_conductor import ConductorEngine
@patch('src.multi_agent_conductor.run_worker_lifecycle')
def test_conductor_engine_pool_integration(mock_lifecycle):
# Create 4 independent tickets
tickets = [
Ticket(id=f"t{i}", description=f"task {i}", status="todo")
for i in range(4)
]
track = Track(id="test_track", description="test", tickets=tickets)
# Set up engine with auto_queue and explicit max_workers=2.
# ConductorEngine no longer reads config itself; the caller passes max_workers.
engine = ConductorEngine(track, auto_queue=True, max_workers=2)
sys.stderr.write(f"[TEST] engine.pool.max_workers = {engine.pool.max_workers}\n")
assert engine.pool.max_workers == 2
# Slow down lifecycle to capture parallel state
def slow_lifecycle(ticket, *args, **kwargs):
# Set to in_progress immediately to simulate the status change
# (The engine usually does this, but we want to be sure)
time.sleep(0.5)
ticket.status = "completed"
mock_lifecycle.side_effect = slow_lifecycle
# Run exactly 1 tick
engine.run(max_ticks=1)
# Verify only 2 were marked in_progress/spawned
# Because we only ran for one tick, and there were 4 ready tasks,
# it should have tried to spawn as many as possible (limit 2).
in_progress = [tk for tk in tickets if tk.status == "in_progress"]
# Also count those that already finished if the sleep was too short
completed = [tk for tk in tickets if tk.status == "completed"]
sys.stderr.write(f"[TEST] in_progress={len(in_progress)} completed={len(completed)}\n")
assert len(in_progress) + len(completed) == 2
assert engine.pool.get_active_count() <= 2
# Cleanup: wait for mock threads to finish or join_all
engine.pool.join_all()