diff --git a/dag_engine.py b/dag_engine.py index 11660d2..79a52a1 100644 --- a/dag_engine.py +++ b/dag_engine.py @@ -1,13 +1,26 @@ -from typing import List +from typing import List, Optional from models import Ticket class TrackDAG: + """ + Manages a Directed Acyclic Graph of implementation tickets. + Provides methods for dependency resolution, cycle detection, and topological sorting. + """ def __init__(self, tickets: List[Ticket]): + """ + Initializes the TrackDAG with a list of Ticket objects. + Args: + tickets: A list of Ticket instances defining the graph nodes and edges. + """ self.tickets = tickets self.ticket_map = {t.id: t for t in tickets} def get_ready_tasks(self) -> List[Ticket]: - """Returns tickets that are 'todo' and whose dependencies are all 'completed'.""" + """ + Returns a list of tickets that are in 'todo' status and whose dependencies are all 'completed'. + Returns: + A list of Ticket objects ready for execution. + """ ready = [] for ticket in self.tickets: if ticket.status == 'todo': @@ -23,11 +36,16 @@ class TrackDAG: return ready def has_cycle(self) -> bool: - """Returns True if there's a dependency cycle.""" + """ + Performs a Depth-First Search to detect cycles in the dependency graph. + Returns: + True if a cycle is detected, False otherwise. + """ visited = set() rec_stack = set() - def is_cyclic(ticket_id): + def is_cyclic(ticket_id: str) -> bool: + """Internal recursive helper for cycle detection.""" if ticket_id in rec_stack: return True if ticket_id in visited: @@ -53,8 +71,11 @@ class TrackDAG: def topological_sort(self) -> List[str]: """ - Returns a list of ticket IDs in topological order. - Raises ValueError if a cycle is detected. + Returns a list of ticket IDs in topological order (dependencies before dependents). + Returns: + A list of ticket ID strings. + Raises: + ValueError: If a dependency cycle is detected. """ if self.has_cycle(): raise ValueError("Dependency cycle detected") @@ -62,7 +83,8 @@ class TrackDAG: visited = set() stack = [] - def visit(ticket_id): + def visit(ticket_id: str): + """Internal recursive helper for topological sorting.""" if ticket_id in visited: return visited.add(ticket_id) @@ -78,16 +100,26 @@ class TrackDAG: return stack class ExecutionEngine: + """ + A state machine that governs the progression of tasks within a TrackDAG. + Handles automatic queueing and manual task approval. + """ def __init__(self, dag: TrackDAG, auto_queue: bool = False): + """ + Initializes the ExecutionEngine. + Args: + dag: The TrackDAG instance to manage. + auto_queue: If True, ready tasks will automatically move to 'in_progress'. + """ self.dag = dag self.auto_queue = auto_queue def tick(self) -> List[Ticket]: """ - Returns a list of tasks that are currently 'ready' to be executed. - A task is ready if its status is 'todo' and all its dependencies are 'completed'. - If auto_queue=True, it will automatically mark 'ready' tasks as 'in-progress', - unless step_mode=True is set on the task. + Evaluates the DAG and returns a list of tasks that are currently 'ready' for execution. + If auto_queue is enabled, tasks without 'step_mode' will be marked as 'in_progress'. + Returns: + A list of ready Ticket objects. """ ready = self.dag.get_ready_tasks() @@ -100,8 +132,9 @@ class ExecutionEngine: def approve_task(self, task_id: str): """ - Manually approves a task to move it to 'in_progress'. - Typically used for tasks with step_mode=True or when auto_queue is False. + Manually transitions a task from 'todo' to 'in_progress' if its dependencies are met. + Args: + task_id: The ID of the task to approve. """ ticket = self.dag.ticket_map.get(task_id) if ticket and ticket.status == "todo": @@ -112,13 +145,16 @@ class ExecutionEngine: if not dep or dep.status != "completed": all_done = False break - + if all_done: ticket.status = "in_progress" def update_task_status(self, task_id: str, status: str): """ - Updates the status of a specific task within the DAG. + Force-updates the status of a specific task. + Args: + task_id: The ID of the task. + status: The new status string (e.g., 'todo', 'in_progress', 'completed', 'blocked'). """ ticket = self.dag.ticket_map.get(task_id) if ticket: diff --git a/gui_2.py b/gui_2.py index a3f13a3..eef3995 100644 --- a/gui_2.py +++ b/gui_2.py @@ -2088,11 +2088,24 @@ class App: track_id = f"track_{uuid.uuid4().hex[:8]}" track = Track(id=track_id, description=title, tickets=tickets) - # 4. Initialize ConductorEngine and run_linear loop + # Initialize track state in the filesystem + from models import TrackState, Metadata + from datetime import datetime + now = datetime.now() + meta = Metadata(id=track_id, name=title, status="todo", created_at=now, updated_at=now) + state = TrackState(metadata=meta, discussion=[], tasks=tickets) + project_manager.save_track_state(track_id, state, self.ui_files_base_dir) + + # 4. Initialize ConductorEngine and run loop engine = multi_agent_conductor.ConductorEngine(track, self.event_queue) + # Use current full markdown context for the track execution + track_id_param = track.id + flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param) + full_md, _, _ = aggregate.run(flat) + # Schedule the coroutine on the internal event loop - asyncio.run_coroutine_threadsafe(engine.run_linear(), self._loop) + asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop) except Exception as e: self.ai_status = f"Track start error: {e}" print(f"ERROR in _start_track_logic: {e}") diff --git a/multi_agent_conductor.py b/multi_agent_conductor.py index 7606fd9..b5a1f93 100644 --- a/multi_agent_conductor.py +++ b/multi_agent_conductor.py @@ -73,9 +73,11 @@ class ConductorEngine: except KeyError as e: print(f"Missing required field in ticket definition: {e}") - async def run(self): + async def run(self, md_content: str = ""): """ Main execution loop using the DAG engine. + Args: + md_content: The full markdown context (history + files) for AI workers. """ await self._push_state(status="running", active_tier="Tier 2 (Tech Lead)") @@ -91,7 +93,7 @@ class ConductorEngine: await self._push_state(status="done", active_tier=None) else: # Check if any tasks are in-progress or could be ready - if any(t.status == "running" for t in self.track.tickets): + if any(t.status == "in_progress" for t in self.track.tickets): # Wait for async tasks to complete await asyncio.sleep(1) continue @@ -101,14 +103,12 @@ class ConductorEngine: break # 3. Process ready tasks + loop = asyncio.get_event_loop() for ticket in ready_tasks: - # If auto_queue is on and step_mode is off, engine.tick() already marked it 'running' + # If auto_queue is on and step_mode is off, engine.tick() already marked it 'in_progress' # but we need to verify and handle the lifecycle. - if ticket.status != "running" and not ticket.step_mode and self.engine.auto_queue: - # This shouldn't happen with current ExecutionEngine.tick() but for safety: - ticket.status = "running" - - if ticket.status == "running": + if ticket.status == "in_progress" or (not ticket.step_mode and self.engine.auto_queue): + ticket.status = "in_progress" print(f"Executing ticket {ticket.id}: {ticket.description}") await self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}") @@ -117,8 +117,20 @@ class ConductorEngine: model_name="gemini-2.5-flash-lite", messages=[] ) - # Note: In a fully async version, we would wrap this in a task - run_worker_lifecycle(ticket, context, event_queue=self.event_queue, engine=self) + + # Offload the blocking lifecycle call to a thread to avoid blocking the async event loop. + # We pass the md_content so the worker has full context. + context_files = ticket.context_requirements if ticket.context_requirements else None + await loop.run_in_executor( + None, + run_worker_lifecycle, + ticket, + context, + context_files, + self.event_queue, + self, + md_content + ) await self._push_state(active_tier="Tier 2 (Tech Lead)") elif ticket.status == "todo" and (ticket.step_mode or not self.engine.auto_queue): @@ -126,7 +138,6 @@ class ConductorEngine: print(f"Ticket {ticket.id} is ready and awaiting approval.") await self._push_state(active_tier=f"Awaiting Approval: {ticket.id}") # In a real UI, this would wait for a user event. - # For headless/linear run, we might need a signal. # For now, we'll treat it as a pause point if not auto-queued. pass @@ -170,10 +181,17 @@ def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_ return False -def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None, event_queue: events.AsyncEventQueue = None, engine: Optional['ConductorEngine'] = None): +def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None, event_queue: events.AsyncEventQueue = None, engine: Optional['ConductorEngine'] = None, md_content: str = ""): """ Simulates the lifecycle of a single agent working on a ticket. Calls the AI client and updates the ticket status based on the response. + Args: + ticket: The ticket to process. + context: The worker context. + context_files: List of files to include in the context. + event_queue: Queue for pushing state updates and receiving approvals. + engine: The conductor engine. + md_content: The markdown context (history + files) for AI workers. """ # Enforce Context Amnesia: each ticket starts with a clean slate. ai_client.reset_session() @@ -183,6 +201,11 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: parser = ASTParser(language="python") for i, file_path in enumerate(context_files): try: + abs_path = Path(file_path) + if not abs_path.is_absolute() and engine: + # Resolve relative to project base if possible + # (This is a bit simplified, but helps) + pass with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if i == 0: @@ -206,8 +229,6 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: "start your response with 'BLOCKED' and explain why." ) - # In a real scenario, we would pass md_content from the aggregator - # and manage the conversation history in the context. # HITL Clutch: pass the queue and ticket_id to confirm_execution def clutch_callback(payload: str) -> bool: if not event_queue: @@ -215,7 +236,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: return confirm_execution(payload, event_queue, ticket.id) response = ai_client.send( - md_content="", + md_content=md_content, user_message=user_message, base_dir=".", pre_tool_callback=clutch_callback if ticket.step_mode else None,