feat(conductor): Add pause/resume mechanism to ConductorEngine
This commit is contained in:
@@ -90,6 +90,7 @@ class ConductorEngine:
|
|||||||
self._workers_lock = threading.Lock()
|
self._workers_lock = threading.Lock()
|
||||||
self._active_workers: dict[str, threading.Thread] = {}
|
self._active_workers: dict[str, threading.Thread] = {}
|
||||||
self._abort_events: dict[str, threading.Event] = {}
|
self._abort_events: dict[str, threading.Event] = {}
|
||||||
|
self._pause_event: threading.Event = threading.Event()
|
||||||
self._tier_usage_lock = threading.Lock()
|
self._tier_usage_lock = threading.Lock()
|
||||||
|
|
||||||
def update_usage(self, tier: str, input_tokens: int, output_tokens: int) -> None:
|
def update_usage(self, tier: str, input_tokens: int, output_tokens: int) -> None:
|
||||||
@@ -98,6 +99,14 @@ class ConductorEngine:
|
|||||||
self.tier_usage[tier]["input"] += input_tokens
|
self.tier_usage[tier]["input"] += input_tokens
|
||||||
self.tier_usage[tier]["output"] += output_tokens
|
self.tier_usage[tier]["output"] += output_tokens
|
||||||
|
|
||||||
|
def pause(self) -> None:
|
||||||
|
"""Pauses the pipeline execution."""
|
||||||
|
self._pause_event.set()
|
||||||
|
|
||||||
|
def resume(self) -> None:
|
||||||
|
"""Resumes the pipeline execution."""
|
||||||
|
self._pause_event.clear()
|
||||||
|
|
||||||
def kill_worker(self, ticket_id: str) -> None:
|
def kill_worker(self, ticket_id: str) -> None:
|
||||||
"""Sets the abort event for a worker and attempts to join its thread."""
|
"""Sets the abort event for a worker and attempts to join its thread."""
|
||||||
if ticket_id in self._abort_events:
|
if ticket_id in self._abort_events:
|
||||||
@@ -164,11 +173,14 @@ class ConductorEngine:
|
|||||||
md_content: The full markdown context (history + files) for AI workers.
|
md_content: The full markdown context (history + files) for AI workers.
|
||||||
max_ticks: Optional limit on number of iterations (for testing).
|
max_ticks: Optional limit on number of iterations (for testing).
|
||||||
"""
|
"""
|
||||||
self._push_state(status="running", active_tier="Tier 2 (Tech Lead)")
|
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
tick_count = 0
|
tick_count = 0
|
||||||
while True:
|
while True:
|
||||||
|
if self._pause_event.is_set():
|
||||||
|
self._push_state(status="paused", active_tier="Paused")
|
||||||
|
time.sleep(0.5)
|
||||||
|
continue
|
||||||
|
self._push_state(status="running", active_tier="Tier 2 (Tech Lead)")
|
||||||
if max_ticks is not None and tick_count >= max_ticks:
|
if max_ticks is not None and tick_count >= max_ticks:
|
||||||
break
|
break
|
||||||
tick_count += 1
|
tick_count += 1
|
||||||
|
|||||||
24
tests/test_pipeline_pause.py
Normal file
24
tests/test_pipeline_pause.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
from src.models import Ticket, Track
|
||||||
|
from src.multi_agent_conductor import ConductorEngine
|
||||||
|
|
||||||
|
def test_conductor_engine_has_pause_event():
|
||||||
|
track = Track(id="test", description="Test", tickets=[])
|
||||||
|
engine = ConductorEngine(track)
|
||||||
|
assert hasattr(engine, '_pause_event'), "ConductorEngine must have _pause_event"
|
||||||
|
assert engine._pause_event.is_set() == False, "Pause event should start unset (not paused)"
|
||||||
|
|
||||||
|
def test_pause_method():
|
||||||
|
track = Track(id="test", description="Test", tickets=[])
|
||||||
|
engine = ConductorEngine(track)
|
||||||
|
engine.pause()
|
||||||
|
assert engine._pause_event.is_set() == True, "Pause should set the event"
|
||||||
|
|
||||||
|
def test_resume_method():
|
||||||
|
track = Track(id="test", description="Test", tickets=[])
|
||||||
|
engine = ConductorEngine(track)
|
||||||
|
engine.pause()
|
||||||
|
assert engine._pause_event.is_set() == True
|
||||||
|
engine.resume()
|
||||||
|
assert engine._pause_event.is_set() == False, "Resume should clear the event"
|
||||||
Reference in New Issue
Block a user