perf(core): Optimize DAG engine, orchestrator loop, and simulations

This commit is contained in:
2026-05-06 15:27:27 -04:00
parent d0aff71430
commit f628e0b29a
4 changed files with 109 additions and 72 deletions
@@ -14,10 +14,10 @@
- [x] Task: Conductor - User Manual Verification 'Phase 2: Audit and Profiling (`src/` and `simulation/`)' (Protocol in workflow.md) (7a72987)
## Phase 3: Targeted Optimization and Refactoring
- [ ] Task: Write/update tests for the first identified bottleneck to establish a performance or structural baseline (Red Phase).
- [ ] Task: Refactor the first identified bottleneck to align with data-oriented guidelines (Green Phase).
- [ ] Task: Write/update tests for remaining identified bottlenecks.
- [ ] Task: Refactor remaining identified bottlenecks.
- [x] Task: Write/update tests for the first identified bottleneck to establish a performance or structural baseline (Red Phase). (2e68f1e)
- [x] Task: Refactor the first identified bottleneck to align with data-oriented guidelines (Green Phase). (2e68f1e)
- [x] Task: Write/update tests for remaining identified bottlenecks. (56e9627)
- [x] Task: Refactor remaining identified bottlenecks. (d0aff71)
- [ ] Task: Conductor - User Manual Verification 'Phase 3: Targeted Optimization and Refactoring' (Protocol in workflow.md)
## Phase 4: Final Evaluation and Documentation
+18 -13
View File
@@ -4,10 +4,11 @@ from typing import Any, Callable
from src import ai_client
class UserSimAgent:
def __init__(self, hook_client: Any, model: str = "gemini-2.5-flash-lite", enable_delays: bool = True) -> None:
def __init__(self, hook_client: Any, model: str = "gemini-2.5-flash-lite", enable_delays: bool = True, batch_typing: bool = False) -> None:
self.hook_client = hook_client
self.model = model
self.enable_delays = enable_delays
self.batch_typing = batch_typing
self.system_prompt = (
"You are a software engineer testing an AI coding assistant called 'Manual Slop'. "
"You want to build a small Python project and verify the assistant's capabilities. "
@@ -30,18 +31,22 @@ class UserSimAgent:
delay = random.uniform(min_delay, max_delay)
time.sleep(delay)
def simulate_typing(self, text: str, jitter_range: tuple[float, float] = (0.01, 0.05)) -> None:
if self.enable_delays:
# Simulate typing by sleeping after chunks or characters to balance speed and realism
if len(text) > 200:
for i in range(0, len(text), 10):
time.sleep(random.uniform(jitter_range[0] * 3, jitter_range[1] * 3))
elif len(text) > 50:
for i in range(0, len(text), 3):
time.sleep(random.uniform(jitter_range[0] * 1.5, jitter_range[1] * 1.5))
else:
for char in text:
time.sleep(random.uniform(jitter_range[0], jitter_range[1]))
def simulate_typing(self, text: str, jitter_range: tuple[float, float] = (0.01, 0.05), batch_typing: bool = False) -> None:
if not self.enable_delays:
return
if batch_typing or self.batch_typing:
time.sleep(0.01)
return
# Simulate typing by sleeping after chunks or characters to balance speed and realism
if len(text) > 200:
for i in range(0, len(text), 10):
time.sleep(random.uniform(jitter_range[0] * 3, jitter_range[1] * 3))
elif len(text) > 50:
for i in range(0, len(text), 3):
time.sleep(random.uniform(jitter_range[0] * 1.5, jitter_range[1] * 1.5))
else:
for char in text:
time.sleep(random.uniform(jitter_range[0], jitter_range[1]))
def generate_response(self, conversation_history: list[dict]) -> str:
"""
+66 -50
View File
@@ -48,19 +48,29 @@ class TrackDAG:
def cascade_blocks(self) -> None:
"""
Transitively marks `todo` tickets as `blocked` if any dependency is `blocked`.
Runs until stable (handles multi-hop chains: A→B→C where A blocked cascades to B then C).
Propagates 'blocked' status from initially blocked nodes to their dependents.
"""
changed = True
while changed:
changed = False
for ticket in self.tickets:
if ticket.status == 'todo':
for dep_id in ticket.depends_on:
dep = self.ticket_map.get(dep_id)
if dep and dep.status == 'blocked':
ticket.status = 'blocked'
changed = True
break
with get_monitor().scope("dag_cascade_blocks"):
# Build adjacency list of dependents using object references to avoid lookups
dependents = {t.id: [] for t in self.tickets}
for t in self.tickets:
for dep_id in t.depends_on:
if dep_id in dependents:
dependents[dep_id].append(t)
# Use a queue-based propagation (BFS) from all currently blocked tickets
queue = [t for t in self.tickets if t.status == 'blocked']
idx = 0
while idx < len(queue):
curr = queue[idx]
idx += 1
for dep_ticket in dependents.get(curr.id, []):
if dep_ticket.status == 'todo':
dep_ticket.status = 'blocked'
# Optional: preserve the reason for blocking
if not dep_ticket.blocked_reason:
dep_ticket.blocked_reason = f"Dependency {curr.id} is blocked."
queue.append(dep_ticket)
def is_ticket_ready(self, ticket: Ticket) -> bool:
"""Returns True if all dependencies of the ticket are completed."""
@@ -84,62 +94,68 @@ class TrackDAG:
def has_cycle(self) -> bool:
"""
Performs a Depth-First Search to detect cycles in the dependency graph.
Performs an iterative Depth-First Search to detect cycles in the dependency graph.
Returns:
True if a cycle is detected, False otherwise.
"""
with get_monitor().scope("dag_has_cycle"):
visited = set()
rec_stack = set()
def is_cyclic(ticket_id: str) -> bool:
"""Internal recursive helper for cycle detection."""
if ticket_id in rec_stack:
return True
if ticket_id in visited:
return False
visited.add(ticket_id)
rec_stack.add(ticket_id)
ticket = self.ticket_map.get(ticket_id)
if ticket:
for neighbor in ticket.depends_on:
if is_cyclic(neighbor):
return True
rec_stack.remove(ticket_id)
return False
for ticket in self.tickets:
if ticket.id not in visited:
if is_cyclic(ticket.id):
for start_ticket in self.tickets:
if start_ticket.id in visited:
continue
stack = [(start_ticket.id, False)] # (id, is_backtracking)
path = set()
while stack:
node_id, is_backtracking = stack.pop()
if is_backtracking:
path.remove(node_id)
continue
if node_id in path:
return True
if node_id in visited:
continue
visited.add(node_id)
path.add(node_id)
stack.append((node_id, True))
ticket = self.ticket_map.get(node_id)
if ticket:
for neighbor_id in ticket.depends_on:
stack.append((neighbor_id, False))
return False
def topological_sort(self) -> List[str]:
"""
Returns a list of ticket IDs in topological order (dependencies before dependents).
Uses Kahn's algorithm for efficient O(V+E) sorting and cycle detection.
Returns:
A list of ticket ID strings.
Raises:
ValueError: If a dependency cycle is detected.
"""
with get_monitor().scope("dag_topological_sort"):
if self.has_cycle():
in_degree = {t.id: len(t.depends_on) for t in self.tickets}
dependents = {t.id: [] for t in self.tickets}
for t in self.tickets:
for dep_id in t.depends_on:
if dep_id in dependents:
dependents[dep_id].append(t.id)
# Queue starts with nodes having no dependencies
queue = [t.id for t in self.tickets if in_degree[t.id] == 0]
result = []
idx = 0
while idx < len(queue):
u = queue[idx]
idx += 1
result.append(u)
for v_id in dependents.get(u, []):
in_degree[v_id] -= 1
if in_degree[v_id] == 0:
queue.append(v_id)
if len(result) < len(self.tickets):
raise ValueError("Dependency cycle detected")
visited = set()
stack = []
def visit(ticket_id: str) -> None:
"""Internal recursive helper for topological sorting."""
if ticket_id in visited:
return
visited.add(ticket_id)
ticket = self.ticket_map.get(ticket_id)
if ticket:
for dep_id in ticket.depends_on:
visit(dep_id)
stack.append(ticket_id)
for ticket in self.tickets:
visit(ticket.id)
return stack
return result
class ExecutionEngine:
"""
+21 -5
View File
@@ -40,6 +40,8 @@ from src import models
from src.models import Ticket, Track, WorkerContext
from src.file_cache import ASTParser
from pathlib import Path
from src.personas import PersonaManager
from src import paths
from src.dag_engine import TrackDAG, ExecutionEngine
@@ -122,6 +124,7 @@ class ConductorEngine:
self._abort_events: dict[str, threading.Event] = {}
self._pause_event: threading.Event = threading.Event()
self._tier_usage_lock = threading.Lock()
self._dirty: bool = True
def update_usage(self, tier: str, input_tokens: int, output_tokens: int) -> None:
"""Updates token usage for a specific tier."""
@@ -138,6 +141,16 @@ class ConductorEngine:
"""Resumes the pipeline execution."""
self._pause_event.clear()
def approve_task(self, task_id: str) -> None:
"""Manually transition todo to in_progress and mark engine dirty."""
self.engine.approve_task(task_id)
self._dirty = True
def update_task_status(self, task_id: str, status: str) -> None:
"""Force-update ticket status and mark engine dirty."""
self.engine.update_task_status(task_id, status)
self._dirty = True
def kill_worker(self, ticket_id: str) -> None:
"""Sets the abort event for a worker and attempts to join its thread."""
if ticket_id in self._abort_events:
@@ -216,10 +229,14 @@ class ConductorEngine:
if max_ticks is not None and tick_count >= max_ticks:
break
tick_count += 1
# 1. Identify ready tasks
ready_tasks = self.engine.tick()
# 1. Identify ready tasks
if self._dirty:
self._ready_tasks = self.engine.tick()
self._dirty = False
ready_tasks = self._ready_tasks
# 2. Check for completion or blockage
if not ready_tasks:
all_done = all(t.status == "completed" for t in self.track.tickets)
if all_done:
@@ -404,8 +421,6 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
persona_tool_preset = None
persona = None
if context.persona_id:
from src.personas import PersonaManager
from src import paths
pm = PersonaManager(Path(paths.get_project_personas_path(Path.cwd())) if paths.get_project_personas_path(Path.cwd()).exists() else None)
try:
personas = pm.load_all()
@@ -587,6 +602,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
_in_tokens = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _resp_entries)
_out_tokens = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _resp_entries)
engine.update_usage("Tier 3", _in_tokens, _out_tokens)
engine._dirty = True
if "BLOCKED" in response.upper():
ticket.mark_blocked(response)
else: