From f628e0b29abccfd6eb25c1cb6e3fd9acefbb9f4f Mon Sep 17 00:00:00 2001 From: Ed_ Date: Wed, 6 May 2026 15:27:27 -0400 Subject: [PATCH] perf(core): Optimize DAG engine, orchestrator loop, and simulations --- .../plan.md | 8 +- simulation/user_agent.py | 31 +++-- src/dag_engine.py | 116 ++++++++++-------- src/multi_agent_conductor.py | 26 +++- 4 files changed, 109 insertions(+), 72 deletions(-) diff --git a/conductor/tracks/data_oriented_optimization_20260312/plan.md b/conductor/tracks/data_oriented_optimization_20260312/plan.md index a72684a..6f354f9 100644 --- a/conductor/tracks/data_oriented_optimization_20260312/plan.md +++ b/conductor/tracks/data_oriented_optimization_20260312/plan.md @@ -14,10 +14,10 @@ - [x] Task: Conductor - User Manual Verification 'Phase 2: Audit and Profiling (`src/` and `simulation/`)' (Protocol in workflow.md) (7a72987) ## Phase 3: Targeted Optimization and Refactoring -- [ ] Task: Write/update tests for the first identified bottleneck to establish a performance or structural baseline (Red Phase). -- [ ] Task: Refactor the first identified bottleneck to align with data-oriented guidelines (Green Phase). -- [ ] Task: Write/update tests for remaining identified bottlenecks. -- [ ] Task: Refactor remaining identified bottlenecks. +- [x] Task: Write/update tests for the first identified bottleneck to establish a performance or structural baseline (Red Phase). (2e68f1e) +- [x] Task: Refactor the first identified bottleneck to align with data-oriented guidelines (Green Phase). (2e68f1e) +- [x] Task: Write/update tests for remaining identified bottlenecks. (56e9627) +- [x] Task: Refactor remaining identified bottlenecks. (d0aff71) - [ ] Task: Conductor - User Manual Verification 'Phase 3: Targeted Optimization and Refactoring' (Protocol in workflow.md) ## Phase 4: Final Evaluation and Documentation diff --git a/simulation/user_agent.py b/simulation/user_agent.py index 40a4231..c9cbda4 100644 --- a/simulation/user_agent.py +++ b/simulation/user_agent.py @@ -4,10 +4,11 @@ from typing import Any, Callable from src import ai_client class UserSimAgent: - def __init__(self, hook_client: Any, model: str = "gemini-2.5-flash-lite", enable_delays: bool = True) -> None: + def __init__(self, hook_client: Any, model: str = "gemini-2.5-flash-lite", enable_delays: bool = True, batch_typing: bool = False) -> None: self.hook_client = hook_client self.model = model self.enable_delays = enable_delays + self.batch_typing = batch_typing self.system_prompt = ( "You are a software engineer testing an AI coding assistant called 'Manual Slop'. " "You want to build a small Python project and verify the assistant's capabilities. " @@ -30,18 +31,22 @@ class UserSimAgent: delay = random.uniform(min_delay, max_delay) time.sleep(delay) - def simulate_typing(self, text: str, jitter_range: tuple[float, float] = (0.01, 0.05)) -> None: - if self.enable_delays: - # Simulate typing by sleeping after chunks or characters to balance speed and realism - if len(text) > 200: - for i in range(0, len(text), 10): - time.sleep(random.uniform(jitter_range[0] * 3, jitter_range[1] * 3)) - elif len(text) > 50: - for i in range(0, len(text), 3): - time.sleep(random.uniform(jitter_range[0] * 1.5, jitter_range[1] * 1.5)) - else: - for char in text: - time.sleep(random.uniform(jitter_range[0], jitter_range[1])) + def simulate_typing(self, text: str, jitter_range: tuple[float, float] = (0.01, 0.05), batch_typing: bool = False) -> None: + if not self.enable_delays: + return + if batch_typing or self.batch_typing: + time.sleep(0.01) + return + # Simulate typing by sleeping after chunks or characters to balance speed and realism + if len(text) > 200: + for i in range(0, len(text), 10): + time.sleep(random.uniform(jitter_range[0] * 3, jitter_range[1] * 3)) + elif len(text) > 50: + for i in range(0, len(text), 3): + time.sleep(random.uniform(jitter_range[0] * 1.5, jitter_range[1] * 1.5)) + else: + for char in text: + time.sleep(random.uniform(jitter_range[0], jitter_range[1])) def generate_response(self, conversation_history: list[dict]) -> str: """ diff --git a/src/dag_engine.py b/src/dag_engine.py index 0116761..bd96b59 100644 --- a/src/dag_engine.py +++ b/src/dag_engine.py @@ -48,19 +48,29 @@ class TrackDAG: def cascade_blocks(self) -> None: """ Transitively marks `todo` tickets as `blocked` if any dependency is `blocked`. - Runs until stable (handles multi-hop chains: A→B→C where A blocked cascades to B then C). + Propagates 'blocked' status from initially blocked nodes to their dependents. """ - changed = True - while changed: - changed = False - for ticket in self.tickets: - if ticket.status == 'todo': - for dep_id in ticket.depends_on: - dep = self.ticket_map.get(dep_id) - if dep and dep.status == 'blocked': - ticket.status = 'blocked' - changed = True - break + with get_monitor().scope("dag_cascade_blocks"): + # Build adjacency list of dependents using object references to avoid lookups + dependents = {t.id: [] for t in self.tickets} + for t in self.tickets: + for dep_id in t.depends_on: + if dep_id in dependents: + dependents[dep_id].append(t) + + # Use a queue-based propagation (BFS) from all currently blocked tickets + queue = [t for t in self.tickets if t.status == 'blocked'] + idx = 0 + while idx < len(queue): + curr = queue[idx] + idx += 1 + for dep_ticket in dependents.get(curr.id, []): + if dep_ticket.status == 'todo': + dep_ticket.status = 'blocked' + # Optional: preserve the reason for blocking + if not dep_ticket.blocked_reason: + dep_ticket.blocked_reason = f"Dependency {curr.id} is blocked." + queue.append(dep_ticket) def is_ticket_ready(self, ticket: Ticket) -> bool: """Returns True if all dependencies of the ticket are completed.""" @@ -84,62 +94,68 @@ class TrackDAG: def has_cycle(self) -> bool: """ - Performs a Depth-First Search to detect cycles in the dependency graph. + Performs an iterative Depth-First Search to detect cycles in the dependency graph. Returns: True if a cycle is detected, False otherwise. """ with get_monitor().scope("dag_has_cycle"): visited = set() - rec_stack = set() - - def is_cyclic(ticket_id: str) -> bool: - """Internal recursive helper for cycle detection.""" - if ticket_id in rec_stack: - return True - if ticket_id in visited: - return False - visited.add(ticket_id) - rec_stack.add(ticket_id) - ticket = self.ticket_map.get(ticket_id) - if ticket: - for neighbor in ticket.depends_on: - if is_cyclic(neighbor): - return True - rec_stack.remove(ticket_id) - return False - for ticket in self.tickets: - if ticket.id not in visited: - if is_cyclic(ticket.id): + for start_ticket in self.tickets: + if start_ticket.id in visited: + continue + stack = [(start_ticket.id, False)] # (id, is_backtracking) + path = set() + while stack: + node_id, is_backtracking = stack.pop() + if is_backtracking: + path.remove(node_id) + continue + if node_id in path: return True + if node_id in visited: + continue + visited.add(node_id) + path.add(node_id) + stack.append((node_id, True)) + ticket = self.ticket_map.get(node_id) + if ticket: + for neighbor_id in ticket.depends_on: + stack.append((neighbor_id, False)) return False def topological_sort(self) -> List[str]: """ Returns a list of ticket IDs in topological order (dependencies before dependents). + Uses Kahn's algorithm for efficient O(V+E) sorting and cycle detection. Returns: A list of ticket ID strings. Raises: ValueError: If a dependency cycle is detected. """ with get_monitor().scope("dag_topological_sort"): - if self.has_cycle(): + in_degree = {t.id: len(t.depends_on) for t in self.tickets} + dependents = {t.id: [] for t in self.tickets} + for t in self.tickets: + for dep_id in t.depends_on: + if dep_id in dependents: + dependents[dep_id].append(t.id) + + # Queue starts with nodes having no dependencies + queue = [t.id for t in self.tickets if in_degree[t.id] == 0] + result = [] + idx = 0 + while idx < len(queue): + u = queue[idx] + idx += 1 + result.append(u) + for v_id in dependents.get(u, []): + in_degree[v_id] -= 1 + if in_degree[v_id] == 0: + queue.append(v_id) + + if len(result) < len(self.tickets): raise ValueError("Dependency cycle detected") - visited = set() - stack = [] - - def visit(ticket_id: str) -> None: - """Internal recursive helper for topological sorting.""" - if ticket_id in visited: - return - visited.add(ticket_id) - ticket = self.ticket_map.get(ticket_id) - if ticket: - for dep_id in ticket.depends_on: - visit(dep_id) - stack.append(ticket_id) - for ticket in self.tickets: - visit(ticket.id) - return stack + return result class ExecutionEngine: """ diff --git a/src/multi_agent_conductor.py b/src/multi_agent_conductor.py index 88c3c26..672cee7 100644 --- a/src/multi_agent_conductor.py +++ b/src/multi_agent_conductor.py @@ -40,6 +40,8 @@ from src import models from src.models import Ticket, Track, WorkerContext from src.file_cache import ASTParser from pathlib import Path +from src.personas import PersonaManager +from src import paths from src.dag_engine import TrackDAG, ExecutionEngine @@ -122,6 +124,7 @@ class ConductorEngine: self._abort_events: dict[str, threading.Event] = {} self._pause_event: threading.Event = threading.Event() self._tier_usage_lock = threading.Lock() + self._dirty: bool = True def update_usage(self, tier: str, input_tokens: int, output_tokens: int) -> None: """Updates token usage for a specific tier.""" @@ -138,6 +141,16 @@ class ConductorEngine: """Resumes the pipeline execution.""" self._pause_event.clear() + def approve_task(self, task_id: str) -> None: + """Manually transition todo to in_progress and mark engine dirty.""" + self.engine.approve_task(task_id) + self._dirty = True + + def update_task_status(self, task_id: str, status: str) -> None: + """Force-update ticket status and mark engine dirty.""" + self.engine.update_task_status(task_id, status) + self._dirty = True + def kill_worker(self, ticket_id: str) -> None: """Sets the abort event for a worker and attempts to join its thread.""" if ticket_id in self._abort_events: @@ -216,10 +229,14 @@ class ConductorEngine: if max_ticks is not None and tick_count >= max_ticks: break tick_count += 1 - # 1. Identify ready tasks - ready_tasks = self.engine.tick() - + # 1. Identify ready tasks + if self._dirty: + self._ready_tasks = self.engine.tick() + self._dirty = False + ready_tasks = self._ready_tasks + # 2. Check for completion or blockage + if not ready_tasks: all_done = all(t.status == "completed" for t in self.track.tickets) if all_done: @@ -404,8 +421,6 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: persona_tool_preset = None persona = None if context.persona_id: - from src.personas import PersonaManager - from src import paths pm = PersonaManager(Path(paths.get_project_personas_path(Path.cwd())) if paths.get_project_personas_path(Path.cwd()).exists() else None) try: personas = pm.load_all() @@ -587,6 +602,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: _in_tokens = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _resp_entries) _out_tokens = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _resp_entries) engine.update_usage("Tier 3", _in_tokens, _out_tokens) + engine._dirty = True if "BLOCKED" in response.upper(): ticket.mark_blocked(response) else: