This commit is contained in:
2026-03-06 15:57:39 -05:00
parent 49ae811be9
commit bf24164b1f
4 changed files with 435 additions and 212 deletions

View File

@@ -1,28 +1,88 @@
# Track Specification: Cost & Token Analytics Panel (cost_token_analytics_20260306)
# Implementation Plan: Cost & Token Analytics Panel (cost_token_analytics_20260306)
## Overview
Real-time cost tracking panel displaying cost per model, session totals, and breakdown by tier. Uses existing `cost_tracker.py` which is implemented but has no GUI representation.
> **Reference:** [Spec](./spec.md) | [Architecture Guide](../../../docs/guide_architecture.md)
## Current State Audit
## Phase 1: Foundation & Research
Focus: Verify existing infrastructure
- [ ] Task 1.1: Initialize MMA Environment
- Run `activate_skill mma-orchestrator` before starting
- [ ] Task 1.2: Verify cost_tracker.py implementation
- WHERE: `src/cost_tracker.py`
- WHAT: Confirm `MODEL_PRICING` list structure
- HOW: Use `manual-slop_py_get_definition` on `estimate_cost`
- OUTPUT: Document exact regex-based matching
- **Note**: `estimate_cost` loops through patterns, Unknown models return 0.0.
- **SHA verification**: Run `uv run pytest tests/test_cost_tracker.py -v`
- COMMAND: `uv run pytest tests/test_cost_panel.py tests/test_conductor_engine_v2.py tests/test_cost_tracker.py -v --batched (4 files max due to complex threading issues)
- **Example Announcement:** "I will now run the automated test suite to verify the phase. **Command:** `uv run pytest tests/test_specific_feature.py` (substitute actual file)"
- Execute the announced command.
- Execute the announced command.
- Execute and commands in parallel for potentially slow simulation tests ( batching: maximum 4 test files at a time, use `--timeout=60` or `--timeout=120` if the specific tests in the batch are known to be slow (e.g., simulation tests), increase timeout or `--timeout` appropriately.
- **Example Announcement:** "I will now run the automated test suite to verify the phase. **Command:** `uv run pytest tests/test_cache_panel.py tests/test_conductor_engine_v2.py tests/test_cost_tracker.py tests/test_cost_panel.py -v`
- **CRITICAL:** The full suite frequently can lead to random timeouts or threading access violations. To prevent waiting the full timeout if the GUI exits early. the test file should check its extension.
- For each remaining code file, verify a corresponding test file exists.
- If a test file is missing, create one. Before writing the test, be aware that the may tests may have `@pytest` decorators (e.g., `@pytest.mark.integration`), - In every test file before verifying a test file exists.
- For each remaining code file, verify a corresponding test file exists
- If a test file is missing, create one. Before writing the test, be aware of the naming convention and testing style. The new tests **must** validate the functionality described in this phase's tasks (`plan.md`).
- Use `live_gui` fixture to interact with a real instance of the application via the Hook API, `test_gui2_events.py` and `test_gui2_parity.py` already verify this pattern.
- For each test file over 50 lines without using `py_get_skeleton`, `py_get_code_outline`, `py_get_definition` first to map the architecture when uncertain about threading, event flow, data structures, or module interactions, consult the deep-dive docs in `docs/` (last updated: 08e003a):
- **[docs/guide_architecture.md](../docs/guide_architecture.md):** Threading model, event system, AI client, HITL mechanism.
- **[docs/guide_mma.md](../docs/guide_mma.md):** Ticket/Track/WorkerContext data structures, DAG engine algorithms, ConductorEngine execution loop, Tier 2 ticket generation, Tier 3 worker lifecycle with context amnesia.
- **[docs/guide_simulations.md](../docs/guide_simulations.md):** `live_gui` fixture and Puppeteer pattern, mock provider protocol, visual verification patterns.
- `get_file_summary` first to decide whether you need the full content. Use `get_file_summary`, `py_get_skeleton`, or `py_get_code_outline` to map the architecture when uncertain about threading, event flow, data structures, or module interactions, consult the deep-dive docs in `docs/` (last updated: 08e003a):
- **[docs/guide_tools.md](../docs/guide_tools.md):** MCP Bridge 3-layer security model, 26-tool inventory with parameters, Hook API endpoint reference (GET/POST), ApiHookClient method reference.
- **[docs/guide_meta_boundary.md](../docs/guide_meta_boundary.md):** The critical distinction between the Application's Strict-HITL environment and the Meta-Tooling environment used to build it.
- **Application Layer** (`gui_2.py`, `app_controller.py`): Threads run in `src/` directory. Events flow through `SyncEventQueue` and `EventEmitter` for decoupled communication.
- **`api_hooks.py`**: HTTP server exposing internal state via REST API when launched with `--enable-test-hooks` flag
otherwise only for CLI adapter, uses `SyncEventQueue` to push events to the GUI.
- **ApiHookClient** (`api_hook_client.py`): Client for interacting with the running application via the Hook API.
- `get_status()`: Health check endpoint
- `get_mma_status()`: Returns full MMA engine status
- `get_gui_state()`: Returns full GUI state
- `get_value(item)`: Gets a GUI value by mapped field name
- `get_performance()`: Returns performance metrics
- `click(item, user_data)`: Simulates a button click
- `set_value(item, value)`: Sets a GUI value
- `select_tab(item, value)`: Selects a specific tab
- `reset_session()`: Resets the session via button click
- **MMA Prompts** (`mma_prompts.py`): Structured system prompts for MMA tiers
- **ConductorTechLead** (`conductor_tech_lead.py`): Generates tickets from track brief
- **models.py** (`models.py`): Data structures (Ticket, Track, TrackState, WorkerContext)
- **dag_engine.py** (`dag_engine.py`): DAG execution engine with cycle detection and topological sorting
- **multi_agent_conductor.py** (`multi_agent_conductor.py`): MMA orchestration engine
- **shell_runner.py** (`shell_runner.py`): Sandboxed PowerShell execution
- **file_cache.py** (`file_cache.py`): AST parser with tree-sitter
- **summarize.py** (`summarize.py`): Heuristic file summaries
- **outline_tool.py** (`outline_tool.py`): Code outlining with line ranges
- **theme.py** / **theme_2.py** (`theme.py`, `theme_2.py`): ImGui theme/color palettes
- **log_registry.py** (`log_registry.py`): Session log registry with TOML persistence
- **log_pruner.py** (`log_pruner.py`): Automated log pruning
- **performance_monitor.py** (`performance_monitor.py`): FPS, frame time, CPU tracking
- **gui_2.py**: Main GUI (79KB) - Primary ImGui interface
- **ai_client.py**: Multi-provider LLM abstraction (71KB)
- **mcp_client.py**: 26 MCP-style tools (48KB)
- **app_controller.py**: Headless controller (82KB) - FastAPI for headless mode
- **project_manager.py**: Project configuration management (13KB)
- **aggregate.py**: Context aggregation (14kb)
- **session_logger.py**: Session logging (6kb)
- **gemini_cli_adapter.py**: CLI subprocess adapter (6KB)
- **events.py**: Event system (3KB)
- **cost_tracker.py**: Cost estimation (1KB)
## Current State Audit (as of {commit_sha})
### Already Implemented (DO NOT re-implement)
#### cost_tracker.py (src/cost_tracker.py)
- **`MODEL_PRICING` list**: List of (regex_pattern, rates_dict) tuples
```python
MODEL_PRICING = [
(r"gemini-2\.5-flash-lite", {"input_per_mtok": 0.075, "output_per_mtok": 0.30}),
(r"gemini-2\.5-flash", {"input_per_mtok": 0.15, "output_per_mtok": 0.60}),
(r"gemini-3-flash-preview", {"input_per_mtok": 0.15, "output_per_mtok": 0.60}),
(r"gemini-3\.1-pro-preview", {"input_per_mtok": 3.50, "output_per_mtok": 10.50}),
(r"claude-.*-sonnet", {"input_per_mtok": 3.0, "output_per_mtok": 15.0}),
(r"deepseek-v3", {"input_per_mtok": 0.27, "output_per_mtok": 1.10}),
]
```
- **`estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float`**: Uses regex match, returns 0.0 for unknown models
#### MMA Tier Usage Tracking (multi_agent_conductor.py)
- **`ConductorEngine.tier_usage`** already tracks per-tier token counts AND model:
- **`tier_usage` dict in `ConductorEngine.__init__`** (multi_agent_conductor.py lines 50-60)**
```python
self.tier_usage = {
"Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview"},
@@ -31,70 +91,18 @@ Real-time cost tracking panel displaying cost per model, session totals, and bre
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
}
```
- **Key insight**: The model name is already tracked per tier in `tier_usage[tier]["model"]`
- Updated in `run_worker_lifecycle()` from comms_log token counts
### Gaps to Fill (This Track's Scope)
- No GUI panel to display cost information
- No session-level cost accumulation
- No per-tier cost breakdown in UI
## Architectural Constraints
### Non-Blocking Updates
- Cost calculations MUST NOT block UI thread
- Token counts are read from existing tier_usage - no new tracking needed
- Use cached values, update on state change events
### Cross-Thread Data Access
- `tier_usage` is updated on worker threads
- GUI reads via MMA state updates through `_pending_gui_tasks` pattern
- Already synchronized through existing state update mechanism
## Architecture Reference
### Key Integration Points
| File | Lines | Purpose |
|------|-------|---------|
| `src/cost_tracker.py` | 10-40 | `MODEL_PRICING`, `estimate_cost()` |
| `src/multi_agent_conductor.py` | ~50-60 | `tier_usage` dict with input/output/model |
| `src/gui_2.py` | ~2700-2800 | `_render_mma_dashboard()` - existing tier usage display |
### Cost Calculation Pattern
```python
from src import cost_tracker
usage = engine.tier_usage["Tier 3"]
cost = cost_tracker.estimate_cost(
usage["model"], # Already tracked!
usage["input"],
usage["output"]
)
```
## Functional Requirements
### FR1: Session Cost Accumulation
- Track total cost for the current session in App/AppController state
- Reset on session reset
- Sum of all tier costs
### FR2: Per-Tier Cost Display
- Show cost per MMA tier using existing `tier_usage[tier]["model"]` for model
- Show input/output tokens alongside cost
- Calculate using `cost_tracker.estimate_cost()`
### FR3: Real-Time Updates
- Update cost display when MMA state changes
- Hook into existing `mma_state_update` event handling
- No polling - event-driven
- **Per-ticket breakdown available** (already tracked by tier)
display)
- **Cost per model** grouped by model name (Gemini, Anthropic, DeepSeek)
- **Total session cost** accumulate and display total cost
- **Uses existing cost_tracker.py functions
## Non-Functional Requirements
| Requirement | Constraint |
|-------------|------------|
| Frame Time Impact | <1ms when panel visible |
| Memory Overhead | <1KB for session cost state |
| Thread Safety | Read tier_usage via state updates only |
## Testing Requirements
@@ -103,6 +111,35 @@ cost = cost_tracker.estimate_cost(
- Test unknown model returns 0.0
- Test session cost accumulation
### Integration Tests (via `live_gui` fixture)
- Verify cost panel displays after API call
- Verify costs update after MMA execution
- Verify session reset clears costs
- **NO mocking** of `cost_tracker` internals
- Use real state
- Test artifacts go to `tests/artifacts/`
## Out of Scope
- Historical cost tracking across sessions
- Cost budgeting/alerts
- Export cost reports
- API cost for web searches (no token counts available)
## Acceptance Criteria
- [ ] Cost panel displays in GUI
- [ ] Per-tier cost shown with token counts
- [ ] Tier breakdown accurate using existing `tier_usage`
- [ ] Total session cost accumulates correctly
- [ ] Panel updates on MMA state changes
- [ ] Uses existing `cost_tracker.estimate_cost()`
- [ ] Session reset clears costs
- [ ] 1-space indentation maintained
### Unit Tests
- Test `estimate_cost()` with known model/token combinations
- Test unknown model returns 0.0
- Test session cost accumulation
### Integration Tests (via `live_gui` fixture)
- Verify cost panel displays after MMA execution
- Verify session reset clears costs

View File

@@ -1,33 +1,148 @@
# Implementation Plan: True Parallel Worker Execution (true_parallel_worker_execution_20260306)
## Phase 1: Worker Pool Architecture
- [ ] Task: Initialize MMA Environment
- [ ] Task: Design worker pool architecture
- WHERE: src/dag_engine.py, src/multi_agent_conductor.py
- WHAT: Define WorkerPool class with configurable size
- HOW: Use threading.Thread with queue-based task distribution
- SAFETY: Thread-safe queue for ticket distribution
## Phase 1: Verify Existing Implementation
Focus: Understand current parallel execution and add pool management
## Phase 2: Implementation
- [ ] Task: Implement WorkerPool class
- WHERE: src/dag_engine.py
- WHAT: Create worker pool with configurable worker count
- HOW: threading.Thread pool with queue
- SAFETY: Use Lock for shared state
- [ ] Task: Add file locking mechanism
- WHERE: src/shell_runner.py or new locking module
- WHAT: Implement file locks for concurrent access
- HOW: fcntl.flock or equivalent
- SAFETY: Deadlock prevention with timeout
- [ ] Task: Update DAG engine to use pool
- WHERE: src/dag_engine.py
- WHAT: Replace sequential execution with pool-based
- HOW: Submit ready tickets to worker pool
- SAFETY: Atomic status updates
- [ ] Task 1.1: Initialize MMA Environment
- Run `activate_skill mma-orchestrator` before starting
## Phase 3: Tests & Verification
- [ ] Task: Write worker pool tests
- WHERE: tests/test_dag_engine.py
- WHAT: Test concurrent execution
- HOW: Mock workers, verify parallel execution
- [ ] Task: Conductor - Phase Verification
- [ ] Task 1.2: Verify current parallel execution
- WHERE: `src/multi_agent_conductor.py` `ConductorEngine.run()` method (lines ~80-150)
- What: Confirm parallel spawning exists
- how: Use `manual-slop_py_get_definition` on `ConductorEngine.run`
- finding: Parallel execution ALready implemented using threading.Thread
- note: All ready tickets spawn immediately, no limit
- [ ] Task 1.3: Identify what's missing
- where: `src/multi_agent_conductor.py`
- what: Document gaps for pool management
- gaps:
- No max_workers limit (all ready spawn)
- No worker tracking (threads launched but not tracked)
- No configuration (can't set pool size)
## Phase 2: Worker Pool Management
Focus: Add configurable worker pool with tracking
- [ ] Task 2.1: Add worker pool configuration
- where: `src/multi_agent_conductor.py` or new module
- what: Add `_worker_pool` class or configuration
- how:
```python
class WorkerPool:
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self._active: dict[str, threading.Thread] = {}
self._lock = threading.Lock()
def spawn(self, target, Callable, args: tuple) -> threading.Thread:
with self._lock:
if len(self._active) >= self.max_workers:
return None # Pool full
t = threading.Thread(target=target, args=args, daemon=True)
t.start()
with self._lock:
self._active[t.ident] = t
return t
def join_all(self, timeout: float = None) -> None:
with self._lock:
threads = list(self._active.values())
for t in threads:
t.join(timeout=timeout)
with self._lock:
self._active.clear()
def get_active_count(self) -> int:
with self._lock:
return len(self._active)
```
- code style: 1-space indentation
- [ ] Task 2.2: Add max_workers config to config.toml
- where: `config.toml`
- what: Add `[mma] max_workers = 4`
- how: Add section to config.toml
- safety: Default to 4 if missing
- [ ] Task 2.3: Integrate pool with ConductorEngine
- where: `src/multi_agent_conductor.py` `ConductorEngine.__init__`
- what: Initialize WorkerPool instead of raw thread spawning
- how: Replace direct thread creation with `self.pool.spawn()`
- safety: Check return value for pool-full case
## Phase 3: Thread Safety
Focus: Ensure safe concurrent access
- [ ] Task 3.1: Add locks to tier_usage updates
- where: `src/multi_agent_conductor.py`
- what: Protect tier_usage updates with lock
- how:
```python
self._tier_lock = threading.Lock()
def update_tier_usage(self, tier: str, usage: dict) -> None:
with self._tier_lock:
self.tier_usage[tier]["input"] += usage.get("input", 0)
self.tier_usage[tier]["output"] += usage.get("output", 0)
```
- safety: Already updated via comms_log, this adds explicit lock
- [ ] Task 3.2: Verify no race conditions
- where: `tests/test_parallel_execution.py` (new)
- what: Write tests for concurrent status updates
- how: Create multiple threads updating same ticket, verify atomic updates
## Phase 4: Testing & Verification
Focus: Verify all functionality
- [ ] Task 4.1: Write unit tests for worker pool
- where: `tests/test_parallel_execution.py` (new)
- what: Test pool limits, worker tracking
- how: Mock worker functions, verify pool behavior
- [ ] Task 4.2: Write integration test
- where: `tests/test_parallel_execution.py`
- what: Test with live_gui
- how: Create multiple independent tickets, verify parallel execution
- verify: All 4 complete independently
- [ ] Task 4.3: Conductor - Phase Verification
- run: `uv run pytest tests/test_parallel_execution.py -v --timeout=60`
- verify no race conditions
- verify pool limits enforced
## Implementation Notes
### Current Parallel Pattern (existing)
```python
# In ConductorEngine.run():
threads = []
for ticket in to_run:
t = threading.Thread(target=run_worker_lifecycle, args=(...), daemon=True)
threads.append(t)
t.start()
for t in threads:
t.join() # Blocks until ALL complete
```
- **Issue**: No limit on concurrent workers
- **Issue**: No tracking of individual threads
- **Issue**: join() blocks the main loop
### Proposed Changes
1. Wrap thread management in `WorkerPool` class
2. Add `max_workers` config option
3. Track active workers with thread references
4. Non-blocking: Return immediately when pool full
### Files Modified
- `src/multi_agent_conductor.py`: Add WorkerPool, update ConductorEngine.run()
- `config.toml`: Add `[mma] max_workers = 4`
- `tests/test_parallel_execution.py`: New test file
### Code Style Checklist
- [ ] 1-space indentation throughout
- [ ] CRLF line endings on Windows
- [ ] No comments unless documenting API
- [ ] Type hints on all public methods

View File

@@ -1,21 +1,30 @@
# Track Specification: True Parallel Worker Execution (true_parallel_worker_execution_20260306)
## Overview
Implement true concurrency for the DAG engine to spawn parallel Tier 3 workers. Currently workers execute sequentially; this track enables 4+ workers to process independent tickets simultaneously, dramatically reducing total pipeline execution time.
Add worker pool management and configurable concurrency limits to the DAG engine. Currently workers execute in parallel per tick but with no limits or tracking; this track adds max_workers configuration, worker tracking, and proper pool management.
## Current State Audit
### Already Implemented (DO NOT re-implement)
#### Sequential Execution (multi_agent_conductor.py)
- **`ConductorEngine.run()`**: Main loop calls `loop.run_in_executor(None, run_worker_lifecycle, ...)`
- **Single executor**: All workers share default ThreadPoolExecutor
- **Sequential dispatch**: Workers spawned one at a time per loop iteration
#### Parallel Execution EXISTS (multi_agent_conductor.py)
- **`ConductorEngine.run()` already spawns parallel workers**:
```python
threads = []
for ticket in to_run:
t = threading.Thread(
target=run_worker_lifecycle,
args=(ticket, context, context_files, self.event_queue, self, md_content),
daemon=True
)
threads.append(t)
t.start()
#### DAG Engine (dag_engine.py)
- **`get_ready_tasks()`**: Returns list of tickets with satisfied dependencies
- **No parallelism**: Returns all ready tickets but engine only processes one
- **`ExecutionEngine.tick()`**: Returns ready tasks but doesn't spawn them
for t in threads:
t.join()
```
- **Current behavior**: ALL ready tickets spawn immediately, no limit
- **Limitation**: All threads join before next tick - blocks until all complete
#### Thread Safety (existing)
- **`_send_lock`** in ai_client.py serializes API calls
@@ -23,28 +32,22 @@ Implement true concurrency for the DAG engine to spawn parallel Tier 3 workers.
- **Ticket status updates** via `engine.update_task_status()`
### Gaps to Fill (This Track's Scope)
- No parallel worker spawning
- No worker pool management
- No concurrent file access protection
- No per-worker status tracking
- No max_workers limit - spawns unlimited threads
- No worker tracking - thread references discarded after join
- No configurable pool size
- No graceful degradation under load
## Architectural Constraints
### Thread Safety (CRITICAL)
- **`_send_lock` already exists**: All AI calls serialized automatically
- **Ticket status updates MUST be atomic**: Use lock on status changes
- **File access MUST be protected**: Concurrent reads OK, writes need coordination
- **DAG state MUST be protected**: `get_ready_tasks()` returns snapshot
### File Locking Strategy
- **Read operations**: No locking needed (filesystem handles concurrent reads)
- **Write operations**: Use file-level locks or serialize via single writer
- **MCP tools already handle writes**: `mcp_client.py` dispatches synchronously
### Worker Pool Limits
- Default: 4 concurrent workers
- Configurable via config.toml
- Respect API rate limits
### Worker Pool Pattern
- Use `threading.Semaphore` to limit concurrent workers
- Track active threads in `_active_workers: dict[str, Thread]`
- Non-blocking: don't wait for all to complete before next tick
## Architecture Reference
@@ -52,85 +55,102 @@ Implement true concurrency for the DAG engine to spawn parallel Tier 3 workers.
| File | Lines | Purpose |
|------|-------|---------|
| `src/multi_agent_conductor.py` | 80-150 | `ConductorEngine.run()` - parallel dispatch |
| `src/multi_agent_conductor.py` | 100-150 | `ConductorEngine.run()` - add pool logic |
| `src/multi_agent_conductor.py` | 50-60 | `__init__` - add `_worker_semaphore`, `_active_workers` |
| `src/dag_engine.py` | 50-100 | `ExecutionEngine` - ready task query |
| `src/ai_client.py` | ~50 | `_send_lock` - API serialization |
| `src/gui_2.py` | ~170 | `_pending_gui_tasks_lock` - GUI updates |
### Parallel Execution Pattern
| `config.toml` | N/A | Add `[mma] max_workers = 4` |
### Proposed Pool Pattern
```python
# Current (sequential):
for ticket in ready_tasks:
await loop.run_in_executor(None, run_worker_lifecycle, ticket, ...)
# In ConductorEngine.__init__:
self._max_workers: int = 4 # Configurable
self._worker_semaphore = threading.Semaphore(self._max_workers)
self._active_workers: dict[str, threading.Thread] = {}
self._workers_lock = threading.Lock()
# Proposed (parallel):
tasks = [
loop.run_in_executor(None, run_worker_lifecycle, ticket, ...)
for ticket in ready_tasks[:max_workers]
]
await asyncio.gather(*tasks, return_exceptions=True)
# In run():
ready_tasks = self.engine.tick()
# Limit to available semaphore slots
available = min(len(ready_tasks), self._max_workers - len(self._active_workers))
to_spawn = ready_tasks[:available]
for ticket in to_spawn:
def worker_wrapper(ticket):
with self._worker_semaphore:
run_worker_lifecycle(ticket, ...)
with self._workers_lock:
self._active_workers.pop(ticket.id, None)
t = threading.Thread(target=worker_wrapper, args=(ticket,), daemon=True)
with self._workers_lock:
self._active_workers[ticket.id] = t
t.start()
# Don't join - let them complete independently
time.sleep(0.5) # Yield before next tick
```
## Functional Requirements
### FR1: Worker Pool
- Configurable max concurrent workers (default 4)
- Pool size from config.toml: `[mma] max_workers = 4`
- Spawn up to `max_workers` tasks in parallel
### FR1: Configurable Max Workers
- Default: 4 concurrent workers
- Read from config.toml: `[mma] max_workers = 4`
- Clamp to reasonable range (1-16)
### FR2: Parallel Spawning
- `get_ready_tasks()` returns N ready tickets
- Spawn `min(N, max_workers)` workers simultaneously
- Use `asyncio.gather()` with `return_exceptions=True`
### FR2: Worker Pool with Semaphore
- Use `threading.Semaphore(max_workers)` to limit concurrency
- Workers acquire semaphore on start, release on completion
- No spawning if semaphore exhausted
### FR3: Status Tracking
- Each worker reports status independently
- Status updates via atomic `update_task_status()` calls
- Use lock around status dict modifications
### FR3: Worker Tracking
- Store thread references in `_active_workers[ticket_id]`
- Remove on completion
- Enable kill functionality (coordinates with kill_abort_workers track)
### FR4: Error Isolation
- One worker failure doesn't kill others
- `return_exceptions=True` in gather
- Log errors, mark ticket as failed, continue
### FR4: Non-Blocking Execution
- Don't join all threads before next tick
- Check completed workers, spawn new ready tasks
- Continue until all complete or blocked
## Non-Functional Requirements
| Requirement | Constraint |
|-------------|------------|
| Throughput | 4x improvement for 4 independent tickets |
| Throughput | Configurable via max_workers |
| Memory | Per-worker stack space (~8MB each) |
| API Rate | Respect provider rate limits |
## Testing Requirements
### Unit Tests
- Test `gather` with multiple mock workers
- Test error isolation (one fails, others continue)
- Test max_workers limit enforced
- Test semaphore limits workers correctly
- Test worker tracking dict updates
- Test config loading for max_workers
### Integration Tests (via `live_gui` fixture)
- Create 4+ independent tickets
- Verify all execute simultaneously
- Verify status updates correctly
- Verify no race conditions
- Create 10 independent tickets, set max_workers=4
- Verify only 4 run at a time
- Verify no race conditions on status updates
### Stress Tests
- 10+ workers with limited pool
- 20+ workers with pool of 4
- Verify no deadlock
- Verify no memory leak
## Dependencies
- **Coordinates with**: `kill_abort_workers_20260306` (uses worker tracking)
- **Coordinates with**: `mma_multiworker_viz_20260306` (displays worker status)
## Out of Scope
- GPU parallelism (not applicable)
- Distributed execution (single machine only)
- Priority-based scheduling (separate track)
## Acceptance Criteria
- [ ] Worker pool spawns 4 concurrent workers for 4+ independent tickets
- [ ] No race conditions on ticket status updates
- [ ] File access safe (no corruption)
- [ ] Workers report individual status to GUI
- [ ] One worker failure doesn't affect others
- [ ] max_workers configurable via config.toml
- [ ] >80% test coverage for concurrency code
- [ ] Semaphore limits concurrent workers
- [ ] Worker tracking dict maintained
- [ ] No race conditions on ticket status
- [ ] Non-blocking execution (no join-all)
- [ ] >80% test coverage for pool code
- [ ] 1-space indentation maintained

View File

@@ -1,44 +1,95 @@
# Track Specification: Visual DAG & Interactive Ticket Editing (visual_dag_ticket_editing_20260306)
## Overview
Replace linear ticket list with interactive node graph using ImGui Bundle node editor.
Replace linear ticket list with interactive node graph using ImGui Bundle node editor. Users can visually drag dependency lines, split nodes, or delete tasks before execution.
## Current State Audit
### Already Implemented
### Already Implemented (DO NOT re-implement)
- **`imgui_bundle`**: Includes node editor capability
- **`_render_ticket_dag_node()`**: Renders ticket nodes (simple)
- **`dag_engine.py`**: DAG validation, cycle detection
- **`_render_ticket_dag_node()`**: Renders ticket nodes (simple tree view)
- **`dag_engine.py`**: DAG validation, cycle detection (`has_cycle()`)
### Gaps to Fill
- No true node editor integration
- No visual dependency lines
- No drag-to-connect for dependencies
### Gaps to Fill (This Track's Scope)
- No true node editor integration for visual graph
- No visual dependency lines between nodes
- No drag-to-connect for creating dependencies
- No edit validation before saving
## Functional Requirements
- Use `imgui.node_editor` for ticket visualization
- Visual dependency lines between nodes
- Color-coded status (todo=gray, running=yellow, blocked=red, done=green)
- Drag nodes to create/remove dependencies
- Validate DAG (no cycles) before saving
## Key Integration Points
| File | Purpose |
|-----|---------|
| `src/gui_2.py` | Node editor integration |
| `src/dag_engine.py` | DAG validation |
### FR1: Node Editor Integration
- Use `imgui.node_editor` context for ticket visualization
- Render each ticket as a node with:
- Title: ticket.id
- Color: based on status (todo=gray, running=yellow, blocked=red, done=green)
- Position: auto-layout or manual positioning
## Architectural Constraints
- 60fps with 50+ nodes
- Visual state synced with backend
### FR2: Visual Dependencies
- Draw lines between nodes based on `depends_on` field
- Arrow direction: from dependency to dependent
- Line style: curved or straight
### FR3: Interactive Editing
- Drag nodes to reposition
- Click-drag from output port to input port to create dependency
- Right-click to remove dependency
- Validate DAG (no cycles) before saving changes
### FR4: DAG Validation
- Use existing `dag_engine.TrackDAG.has_cycle()`
- Show error dialog if cycle detected
- Prevent save if validation fails
## Non-Functional Requirements
| Requirement | Constraint |
|-------------|------------|
| Frame Rate | 60fps with 50+ nodes |
| Response Time | <100ms for node interaction |
## Architecture Reference
### Key Integration Points
| File | Lines | Purpose |
|------|-------|---------|
| `src/gui_2.py` | 1670-1700 | `_render_ticket_dag_node()` |
| `src/dag_engine.py` | 30-50 | `TrackDAG`, `has_cycle()` |
| `src/models.py` | 30-60 | `Ticket`, `depends_on` |
### ImGui Node Editor Pattern
```python
import imgui_bundle.node_editor as ed
# Begin node editor context
ed.begin("Ticket DAG")
# Create nodes
for ticket in tickets:
ed.begin_node(ticket.id)
ed.set_node_position(ticket.id, x, y)
# ... render node content
ed.end_node()
# Create links for dependencies
for ticket in tickets:
for dep in ticket.depends_on:
ed.link(dep, ticket.id)
ed.end()
```
## Dependencies
- **Depends on**: `true_parallel_worker_execution_20260306` (for real-time status)
- **Coordinates with**: `true_parallel_worker_execution_20260306` (for real-time status updates)
## Out of Scope
- Automatic layout algorithms (manual positioning for now)
- Saving layout to disk
- Zoom/pan controls
## Acceptance Criteria
- [ ] Node editor displays all tickets
- [ ] Users can create/remove dependencies
- [ ] Visual changes sync to backend
- [ ] DAG validity enforced
- [ ] 60fps with 50+ nodes
- [ ] 1-space indentation
- [ ] Node editor displays all tickets as connected nodes
- [ ] Users can drag nodes to reposition
- [ ] Users can create/remove dependencies via drag
- [ ] Visual changes sync to backend Ticket state
- [ ] DAG validity enforced (no cycles allowed)
- [ ] 60fps maintained with 50+ nodes
- [ ] 1-space indentation maintained