more organization
This commit is contained in:
@@ -62,11 +62,9 @@ class WorkerPool:
|
||||
|
||||
def spawn(self, ticket_id: str, target: Callable, args: tuple) -> Optional[threading.Thread]:
|
||||
"""
|
||||
|
||||
|
||||
Spawns a new worker thread if the pool is not full.
|
||||
Returns the thread object or None if full.
|
||||
[C: tests/test_parallel_execution.py:test_worker_pool_completion_cleanup, tests/test_parallel_execution.py:test_worker_pool_limit, tests/test_parallel_execution.py:test_worker_pool_tracking]
|
||||
Spawns a new worker thread if the pool is not full.
|
||||
Returns the thread object or None if full.
|
||||
[C: tests/test_parallel_execution.py:test_worker_pool_completion_cleanup, tests/test_parallel_execution.py:test_worker_pool_limit, tests/test_parallel_execution.py:test_worker_pool_tracking]
|
||||
"""
|
||||
with self._lock:
|
||||
if len(self._active) >= self.max_workers:
|
||||
@@ -88,7 +86,7 @@ class WorkerPool:
|
||||
|
||||
def join_all(self, timeout: float = None) -> None:
|
||||
"""
|
||||
[C: tests/test_parallel_execution.py:test_conductor_engine_pool_integration, tests/test_parallel_execution.py:test_worker_pool_limit, tests/test_parallel_execution.py:test_worker_pool_tracking]
|
||||
[C: tests/test_parallel_execution.py:test_conductor_engine_pool_integration, tests/test_parallel_execution.py:test_worker_pool_limit, tests/test_parallel_execution.py:test_worker_pool_tracking]
|
||||
"""
|
||||
with self._lock:
|
||||
threads = list(self._active.values())
|
||||
@@ -99,22 +97,20 @@ class WorkerPool:
|
||||
|
||||
def get_active_count(self) -> int:
|
||||
"""
|
||||
[C: tests/test_parallel_execution.py:test_conductor_engine_pool_integration, tests/test_parallel_execution.py:test_worker_pool_completion_cleanup, tests/test_parallel_execution.py:test_worker_pool_limit]
|
||||
[C: tests/test_parallel_execution.py:test_conductor_engine_pool_integration, tests/test_parallel_execution.py:test_worker_pool_completion_cleanup, tests/test_parallel_execution.py:test_worker_pool_limit]
|
||||
"""
|
||||
with self._lock:
|
||||
return len(self._active)
|
||||
|
||||
def is_full(self) -> bool:
|
||||
"""
|
||||
[C: tests/test_parallel_execution.py:test_worker_pool_limit]
|
||||
[C: tests/test_parallel_execution.py:test_worker_pool_limit]
|
||||
"""
|
||||
return self.get_active_count() >= self.max_workers
|
||||
|
||||
class ConductorEngine:
|
||||
"""
|
||||
|
||||
|
||||
Orchestrates the execution of tickets within a track.
|
||||
Orchestrates the execution of tickets within a track.
|
||||
"""
|
||||
|
||||
def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None, auto_queue: bool = False) -> None:
|
||||
@@ -123,74 +119,69 @@ class ConductorEngine:
|
||||
self.tier_usage = {
|
||||
"Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview", "tool_preset": None, "persona": None},
|
||||
"Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview", "tool_preset": None, "persona": None},
|
||||
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
|
||||
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
|
||||
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
|
||||
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
|
||||
}
|
||||
self.dag = TrackDAG(self.track.tickets)
|
||||
self.dag = TrackDAG(self.track.tickets)
|
||||
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
|
||||
|
||||
# Load MMA config
|
||||
try:
|
||||
config = models.load_config()
|
||||
mma_cfg = config.get("mma", {})
|
||||
config = models.load_config()
|
||||
mma_cfg = config.get("mma", {})
|
||||
max_workers = mma_cfg.get("max_workers", 4)
|
||||
except Exception:
|
||||
max_workers = 4
|
||||
|
||||
self.pool = WorkerPool(max_workers=max_workers)
|
||||
self._workers_lock = threading.Lock()
|
||||
self.pool = WorkerPool(max_workers=max_workers)
|
||||
self._workers_lock = threading.Lock()
|
||||
self._active_workers: dict[str, threading.Thread] = {}
|
||||
self._abort_events: dict[str, threading.Event] = {}
|
||||
self._pause_event: threading.Event = threading.Event()
|
||||
self._abort_events: dict[str, threading.Event] = {}
|
||||
self._pause_event: threading.Event = threading.Event()
|
||||
self._tier_usage_lock = threading.Lock()
|
||||
self._dirty: bool = True
|
||||
self._dirty: bool = True
|
||||
|
||||
def update_usage(self, tier: str, input_tokens: int, output_tokens: int) -> None:
|
||||
"""Updates token usage for a specific tier."""
|
||||
with self._tier_usage_lock:
|
||||
if tier in self.tier_usage:
|
||||
self.tier_usage[tier]["input"] += input_tokens
|
||||
self.tier_usage[tier]["input"] += input_tokens
|
||||
self.tier_usage[tier]["output"] += output_tokens
|
||||
|
||||
def pause(self) -> None:
|
||||
"""
|
||||
|
||||
Pauses the pipeline execution.
|
||||
[C: tests/test_pipeline_pause.py:test_pause_method, tests/test_pipeline_pause.py:test_resume_method]
|
||||
Pauses the pipeline execution.
|
||||
[C: tests/test_pipeline_pause.py:test_pause_method, tests/test_pipeline_pause.py:test_resume_method]
|
||||
"""
|
||||
self._pause_event.set()
|
||||
|
||||
def resume(self) -> None:
|
||||
"""
|
||||
|
||||
Resumes the pipeline execution.
|
||||
[C: tests/test_pipeline_pause.py:test_resume_method]
|
||||
Resumes the pipeline execution.
|
||||
[C: tests/test_pipeline_pause.py:test_resume_method]
|
||||
"""
|
||||
self._pause_event.clear()
|
||||
|
||||
def approve_task(self, task_id: str) -> None:
|
||||
"""
|
||||
|
||||
Manually transition todo to in_progress and mark engine dirty.
|
||||
[C: tests/test_execution_engine.py:test_execution_engine_approve_task, tests/test_execution_engine.py:test_execution_engine_step_mode]
|
||||
Manually transition todo to in_progress and mark engine dirty.
|
||||
[C: tests/test_execution_engine.py:test_execution_engine_approve_task, tests/test_execution_engine.py:test_execution_engine_step_mode]
|
||||
"""
|
||||
self.engine.approve_task(task_id)
|
||||
self._dirty = True
|
||||
|
||||
def update_task_status(self, task_id: str, status: str) -> None:
|
||||
"""
|
||||
|
||||
Force-update ticket status and mark engine dirty.
|
||||
[C: tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_manual_unblock_restores_todo, tests/test_execution_engine.py:test_execution_engine_auto_queue, tests/test_execution_engine.py:test_execution_engine_basic_flow, tests/test_execution_engine.py:test_execution_engine_status_persistence, tests/test_execution_engine.py:test_execution_engine_update_nonexistent_task]
|
||||
Force-update ticket status and mark engine dirty.
|
||||
[C: tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_manual_unblock_restores_todo, tests/test_execution_engine.py:test_execution_engine_auto_queue, tests/test_execution_engine.py:test_execution_engine_basic_flow, tests/test_execution_engine.py:test_execution_engine_status_persistence, tests/test_execution_engine.py:test_execution_engine_update_nonexistent_task]
|
||||
"""
|
||||
self.engine.update_task_status(task_id, status)
|
||||
self._dirty = True
|
||||
|
||||
def kill_worker(self, ticket_id: str) -> None:
|
||||
"""
|
||||
|
||||
Sets the abort event for a worker and attempts to join its thread.
|
||||
[C: tests/test_conductor_engine_abort.py:test_kill_worker_sets_abort_and_joins_thread]
|
||||
Sets the abort event for a worker and attempts to join its thread.
|
||||
[C: tests/test_conductor_engine_abort.py:test_kill_worker_sets_abort_and_joins_thread]
|
||||
"""
|
||||
if ticket_id in self._abort_events:
|
||||
print(f"[MMA] Setting abort event for {ticket_id}")
|
||||
|
||||
Reference in New Issue
Block a user