Compare commits

..

8 Commits

10 changed files with 500 additions and 65 deletions

View File

@@ -16,6 +16,10 @@ To serve as an expert-level utility for personal developer use on small projects
- **Tier 3 (Worker):** Surgical code implementation and TDD using `gemini-2.5-flash` or `deepseek-v3`. Operates statelessly with tool access and dependency skeletons.
- **Tier 4 (QA):** Error analysis and diagnostics using `gemini-2.5-flash` or `deepseek-v3`. Operates statelessly with tool access.
- **MMA Delegation Engine:** Route tasks, ensuring role-scoped context and detailed observability via timestamped sub-agent logs. Supports dynamic ticket creation and dependency resolution via an automated Dispatcher Loop.
- **MMA Observability Dashboard:** A high-density control center within the GUI for monitoring and managing the 4-Tier architecture.
- **Track Browser:** Real-time visualization of all implementation tracks with status indicators and progress bars.
- **Hierarchical Task DAG:** An interactive, tree-based visualizer for the active track's task dependencies, featuring color-coded state tracking (Ready, Running, Blocked, Done) and manual retry/skip overrides.
- **Strategy Visualization:** Dedicated real-time output streams for Tier 1 (Strategic Planning) and Tier 2/3 (Execution) agents, allowing the user to follow the agent's reasoning chains alongside the task DAG.
- **Track-Scoped State Management:** Segregates discussion history and task progress into per-track state files (e.g., `conductor/tracks/<track_id>/state.toml`). This prevents global context pollution and ensures the Tech Lead session is isolated to the specific track's objective.
- **Native DAG Execution Engine:** Employs a Python-based Directed Acyclic Graph (DAG) engine to manage complex task dependencies, supporting automated topological sorting and robust cycle detection.
- **Programmable Execution State Machine:** Governing the transition between "Auto-Queue" (autonomous worker spawning) and "Step Mode" (explicit manual approval for each task transition).

View File

@@ -26,7 +26,7 @@ This file tracks all major tracks for the project. Each track has its own detail
---
- [ ] **Track: MMA Dashboard Visualization Overhaul**
- [x] **Track: MMA Dashboard Visualization Overhaul**
*Link: [./tracks/mma_dashboard_visualization_overhaul/](./tracks/mma_dashboard_visualization_overhaul/)*
---

View File

@@ -1,16 +1,16 @@
# Implementation Plan: MMA Dashboard Visualization Overhaul
## Phase 1: Track Browser Panel
- [ ] Task: Implement a list view in the MMA Dashboard that reads from the `tracks` directory.
- [ ] Task: Add functionality to select an active track and load its state into the UI.
- [ ] Task: Display progress bars based on task completion within the active track.
## Phase 1: Track Browser Panel [checkpoint: 2b1cfbb]
- [x] Task: Implement a list view in the MMA Dashboard that reads from the `tracks` directory. 2b1cfbb
- [x] Task: Add functionality to select an active track and load its state into the UI. 2b1cfbb
- [x] Task: Display progress bars based on task completion within the active track. 2b1cfbb
## Phase 2: DAG Visualizer Component
- [ ] Task: Design the layout for the Task DAG using DearPyGui Node Editor or collapsible Tree Nodes.
- [ ] Task: Write the data-binding logic to map the backend Python DAG (from Track 1) to the UI visualizer.
- [ ] Task: Add visual indicators (colors/icons) for Task statuses (Ready, Blocked, Done).
## Phase 2: DAG Visualizer Component [checkpoint: 7252d75]
- [x] Task: Design the layout for the Task DAG using DearPyGui Node Editor or collapsible Tree Nodes. 7252d75
- [x] Task: Write the data-binding logic to map the backend Python DAG (from Track 1) to the UI visualizer. 7252d75
- [x] Task: Add visual indicators (colors/icons) for Task statuses (Ready, Blocked, Done). 7252d75
## Phase 3: Live Output Streams
- [ ] Task: Refactor the AI response handling to support multiple concurrent UI text streams.
- [ ] Task: Bind the output of Tier 1 (Planning) to a designated "Strategy" text box.
- [ ] Task: Bind the output of Tier 2 and spawned Tier 3/4 workers to the active Task's detail view in the DAG.
## Phase 3: Live Output Streams [checkpoint: 25b72fb]
- [x] Task: Refactor the AI response handling to support multiple concurrent UI text streams. 25b72fb
- [x] Task: Bind the output of Tier 1 (Planning) to a designated "Strategy" text box. 25b72fb
- [x] Task: Bind the output of Tier 2 and spawned Tier 3/4 workers to the active Task's detail view in the DAG. 25b72fb

225
gui_2.py
View File

@@ -336,6 +336,10 @@ class App:
agent_tools_cfg = self.project.get("agent", {}).get("tools", {})
self.ui_agent_tools: dict[str, bool] = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES}
# MMA Tracks
self.tracks: list[dict] = []
self.mma_streams: dict[str, str] = {}
# Prior session log viewing
self.is_viewing_prior_session = False
self.prior_session_entries: list[dict] = []
@@ -762,6 +766,9 @@ class App:
agent_tools_cfg = proj.get("agent", {}).get("tools", {})
self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES}
# MMA Tracks
self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir)
# Restore MMA state
mma_sec = proj.get("mma", {})
self.ui_epic_input = mma_sec.get("epic", "")
@@ -790,6 +797,40 @@ class App:
if track_history:
self.disc_entries = _parse_history_entries(track_history, self.disc_roles)
def _cb_load_track(self, track_id: str):
state = project_manager.load_track_state(track_id, self.ui_files_base_dir)
if state:
try:
# Convert list[Ticket] or list[dict] to list[Ticket] for Track object
tickets = []
for t in state.tasks:
if isinstance(t, dict):
tickets.append(Ticket(**t))
else:
tickets.append(t)
self.active_track = Track(
id=state.metadata.id,
description=state.metadata.name,
tickets=tickets
)
# Keep dicts for UI table (or convert Ticket objects back to dicts if needed)
from dataclasses import asdict
self.active_tickets = [asdict(t) if not isinstance(t, dict) else t for t in tickets]
# Load track-scoped history
history = project_manager.load_track_history(track_id, self.ui_files_base_dir)
if history:
self.disc_entries = _parse_history_entries(history, self.disc_roles)
else:
self.disc_entries = []
self._recalculate_session_usage()
self.ai_status = f"Loaded track: {state.metadata.name}"
except Exception as e:
self.ai_status = f"Load track error: {e}"
print(f"Error loading track {track_id}: {e}")
def _save_active_project(self):
if self.active_project_path:
try:
@@ -931,10 +972,20 @@ class App:
elif action == "handle_ai_response":
payload = task.get("payload", {})
self.ai_response = payload.get("text", "")
self.ai_status = payload.get("status", "done")
text = payload.get("text", "")
stream_id = payload.get("stream_id")
if stream_id:
self.mma_streams[stream_id] = text
if stream_id == "Tier 1":
if "status" in payload:
self.ai_status = payload["status"]
else:
self.ai_response = text
self.ai_status = payload.get("status", "done")
self._trigger_blink = True
if self.ui_auto_add_history:
if self.ui_auto_add_history and not stream_id:
role = payload.get("role", "AI")
with self._pending_history_adds_lock:
self._pending_history_adds.append({
@@ -2116,11 +2167,18 @@ class App:
tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history)
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "handle_ai_response",
"payload": {
"text": json.dumps(tracks, indent=2),
"stream_id": "Tier 1",
"status": "Epic tracks generated."
}
})
self._pending_gui_tasks.append({
"action": "show_track_proposal",
"payload": tracks
})
self.ai_status = "Epic tracks generated."
except Exception as e:
self.ai_status = f"Epic plan error: {e}"
print(f"ERROR in _cb_plan_epic background task: {e}")
@@ -2752,7 +2810,36 @@ class App:
)
def _render_mma_dashboard(self):
# 1. Global Controls
# 1. Track Browser
imgui.text("Track Browser")
if imgui.begin_table("mma_tracks_table", 4, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable):
imgui.table_setup_column("Title")
imgui.table_setup_column("Status")
imgui.table_setup_column("Progress")
imgui.table_setup_column("Actions")
imgui.table_headers_row()
for track in self.tracks:
imgui.table_next_row()
imgui.table_next_column()
imgui.text(track.get("title", "Untitled"))
imgui.table_next_column()
imgui.text(track.get("status", "unknown"))
imgui.table_next_column()
progress = track.get("progress", 0.0)
imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{int(progress*100)}%")
imgui.table_next_column()
if imgui.button(f"Load##{track.get('id')}"):
self._cb_load_track(track.get("id"))
imgui.end_table()
imgui.separator()
# 2. Global Controls
changed, self.mma_step_mode = imgui.checkbox("Step Mode (HITL)", self.mma_step_mode)
if changed:
# We could push an event here if the engine needs to know immediately
@@ -2768,7 +2855,7 @@ class App:
# 2. Active Track Info
if self.active_track:
imgui.text(f"Track: {self.active_track.get('title', 'Unknown')}")
imgui.text(f"Track: {self.active_track.description}")
# Progress bar
tickets = self.active_tickets
@@ -2802,49 +2889,107 @@ class App:
imgui.separator()
# 4. Ticket Queue
imgui.text("Ticket Queue")
if imgui.begin_table("mma_tickets", 4, imgui.TableFlags_.borders_inner_h | imgui.TableFlags_.resizable):
imgui.table_setup_column("ID", imgui.TableColumnFlags_.width_fixed, 80)
imgui.table_setup_column("Target", imgui.TableColumnFlags_.width_stretch)
imgui.table_setup_column("Status", imgui.TableColumnFlags_.width_fixed, 100)
imgui.table_setup_column("Actions", imgui.TableColumnFlags_.width_fixed, 120)
imgui.table_headers_row()
imgui.separator()
imgui.text("Strategy (Tier 1)")
strategy_text = self.mma_streams.get("Tier 1", "")
imgui.input_text_multiline("##mma_strategy", strategy_text, imgui.ImVec2(-1, 150), imgui.InputTextFlags_.read_only)
# 4. Task DAG Visualizer
imgui.text("Task DAG")
if self.active_track:
tickets_by_id = {t.get('id'): t for t in self.active_tickets}
all_ids = set(tickets_by_id.keys())
# Build children map
children_map = {}
for t in self.active_tickets:
tid = t.get('id', '??')
imgui.table_next_row()
imgui.table_next_column()
imgui.text(str(tid))
for dep in t.get('depends_on', []):
if dep not in children_map: children_map[dep] = []
children_map[dep].append(t.get('id'))
imgui.table_next_column()
imgui.text(str(t.get('target_file', 'general')))
# Roots are those whose depends_on elements are NOT in all_ids
roots = []
for t in self.active_tickets:
deps = t.get('depends_on', [])
has_local_dep = any(d in all_ids for d in deps)
if not has_local_dep:
roots.append(t)
imgui.table_next_column()
status = t.get('status', 'pending').upper()
rendered = set()
for root in roots:
self._render_ticket_dag_node(root, tickets_by_id, children_map, rendered)
else:
imgui.text_disabled("No active MMA track.")
if status == 'RUNNING':
imgui.push_style_color(imgui.Col_.text, vec4(255, 255, 0)) # Yellow
elif status == 'COMPLETE':
imgui.push_style_color(imgui.Col_.text, vec4(0, 255, 0)) # Green
elif status == 'BLOCKED' or status == 'ERROR':
imgui.push_style_color(imgui.Col_.text, vec4(255, 0, 0)) # Red
elif status == 'PAUSED':
imgui.push_style_color(imgui.Col_.text, vec4(255, 165, 0)) # Orange
def _render_ticket_dag_node(self, ticket, tickets_by_id, children_map, rendered):
tid = ticket.get('id', '??')
target = ticket.get('target_file', 'general')
status = ticket.get('status', 'pending').upper()
imgui.text(status)
# Determine color
status_color = vec4(200, 200, 200) # Gray (TODO)
if status == 'RUNNING':
status_color = vec4(255, 255, 0) # Yellow
elif status == 'COMPLETE':
status_color = vec4(0, 255, 0) # Green
elif status in ['BLOCKED', 'ERROR']:
status_color = vec4(255, 0, 0) # Red
elif status == 'PAUSED':
status_color = vec4(255, 165, 0) # Orange
if status in ['RUNNING', 'COMPLETE', 'BLOCKED', 'ERROR', 'PAUSED']:
imgui.pop_style_color()
flags = imgui.TreeNodeFlags_.open_on_arrow | imgui.TreeNodeFlags_.open_on_double_click | imgui.TreeNodeFlags_.default_open
children = children_map.get(tid, [])
if not children:
flags |= imgui.TreeNodeFlags_.leaf
imgui.table_next_column()
if imgui.button(f"Retry##{tid}"):
self._cb_ticket_retry(tid)
imgui.same_line()
if imgui.button(f"Skip##{tid}"):
self._cb_ticket_skip(tid)
# Check if already rendered elsewhere to avoid infinite recursion or duplicate subtrees
is_duplicate = tid in rendered
imgui.end_table()
node_open = imgui.tree_node_ex(f"##{tid}", flags)
# Detail View / Tooltip
if imgui.is_item_hovered():
imgui.begin_tooltip()
imgui.text_colored(C_KEY, f"ID: {tid}")
imgui.text_colored(C_LBL, f"Target: {target}")
imgui.text_colored(C_LBL, f"Description:")
imgui.same_line()
imgui.text_wrapped(ticket.get('description', 'N/A'))
deps = ticket.get('depends_on', [])
if deps:
imgui.text_colored(C_LBL, f"Depends on: {', '.join(deps)}")
stream_key = f"Tier 3: {tid}"
if stream_key in self.mma_streams:
imgui.separator()
imgui.text_colored(C_KEY, "Worker Stream:")
imgui.text_wrapped(self.mma_streams[stream_key])
imgui.end_tooltip()
imgui.same_line()
imgui.text_colored(C_KEY, tid)
imgui.same_line(150)
imgui.text_disabled(str(target))
imgui.same_line(400)
imgui.text_colored(status_color, status)
imgui.same_line(500)
if imgui.button(f"Retry##{tid}"):
self._cb_ticket_retry(tid)
imgui.same_line()
if imgui.button(f"Skip##{tid}"):
self._cb_ticket_skip(tid)
if node_open:
if not is_duplicate:
rendered.add(tid)
for child_id in children:
child = tickets_by_id.get(child_id)
if child:
self._render_ticket_dag_node(child, tickets_by_id, children_map, rendered)
else:
imgui.text_disabled(" (shown above)")
imgui.tree_pop()
def _render_tool_calls_panel(self):
imgui.text("Tool call history")

View File

@@ -11,6 +11,8 @@ output_dir = "./md_gen"
base_dir = "."
paths = []
[files.tier_assignments]
[screenshots]
base_dir = "."
paths = []

View File

@@ -8,5 +8,5 @@ active = "main"
[discussions.main]
git_commit = ""
last_updated = "2026-02-27T18:57:49"
last_updated = "2026-02-27T22:56:03"
history = []

View File

@@ -10,6 +10,7 @@ import datetime
import tomllib
import tomli_w
import re
import json
from pathlib import Path
TS_FMT = "%Y-%m-%dT%H:%M:%S"
@@ -309,3 +310,85 @@ def save_track_history(track_id: str, history: list, base_dir: str | Path = ".")
entries = [str_to_entry(h, roles) for h in history]
state.discussion = entries
save_track_state(track_id, state, base_dir)
def get_all_tracks(base_dir: str | Path = ".") -> list[dict]:
"""
Scans the conductor/tracks/ directory and returns a list of dictionaries
containing track metadata: 'id', 'title', 'status', 'complete', 'total',
and 'progress' (0.0 to 1.0).
Handles missing or malformed metadata.json or state.toml by falling back
to available info or defaults.
"""
from models import TrackState
tracks_dir = Path(base_dir) / "conductor" / "tracks"
if not tracks_dir.exists():
return []
results = []
for entry in tracks_dir.iterdir():
if not entry.is_dir():
continue
track_id = entry.name
track_info = {
"id": track_id,
"title": track_id,
"status": "unknown",
"complete": 0,
"total": 0,
"progress": 0.0
}
state_found = False
# Try loading state.toml
try:
state = load_track_state(track_id, base_dir)
if state:
track_info["id"] = state.metadata.id or track_id
track_info["title"] = state.metadata.name or track_id
track_info["status"] = state.metadata.status or "unknown"
track_info["complete"] = len([t for t in state.tasks if t.status == "completed"])
track_info["total"] = len(state.tasks)
if track_info["total"] > 0:
track_info["progress"] = track_info["complete"] / track_info["total"]
state_found = True
except Exception:
pass
if not state_found:
# Try loading metadata.json
metadata_file = entry / "metadata.json"
if metadata_file.exists():
try:
with open(metadata_file, "r") as f:
data = json.load(f)
track_info["id"] = data.get("id", data.get("track_id", track_id))
track_info["title"] = data.get("title", data.get("name", data.get("description", track_id)))
track_info["status"] = data.get("status", "unknown")
except Exception:
pass
# Try parsing plan.md for complete/total if state was missing or empty
if track_info["total"] == 0:
plan_file = entry / "plan.md"
if plan_file.exists():
try:
with open(plan_file, "r", encoding="utf-8") as f:
content = f.read()
# Simple regex to count tasks
# - [ ] Task: ...
# - [x] Task: ...
# - [~] Task: ...
tasks = re.findall(r"^[ \t]*- \[[ x~]\] .*", content, re.MULTILINE)
completed_tasks = re.findall(r"^[ \t]*- \[x\] .*", content, re.MULTILINE)
track_info["total"] = len(tasks)
track_info["complete"] = len(completed_tasks)
if track_info["total"] > 0:
track_info["progress"] = float(track_info["complete"]) / track_info["total"]
except Exception:
pass
results.append(track_info)
return results

View File

@@ -0,0 +1,66 @@
import pytest
from unittest.mock import patch, MagicMock
from gui_2 import App
@pytest.fixture
def app_instance():
# We patch the dependencies of App.__init__ to avoid side effects
with (
patch('gui_2.load_config', return_value={'ai': {}, 'projects': {}}),
patch('gui_2.save_config'),
patch('gui_2.project_manager') as mock_pm,
patch('gui_2.session_logger'),
patch('gui_2.immapp.run'),
patch.object(App, '_load_active_project'),
patch.object(App, '_fetch_models'),
patch.object(App, '_load_fonts'),
patch.object(App, '_post_init')
):
app = App()
# Ensure project and ui_files_base_dir are set for _refresh_from_project
app.project = {}
app.ui_files_base_dir = "."
# Return the app and the mock_pm for use in tests
yield app, mock_pm
def test_mma_dashboard_refresh(app_instance):
app, mock_pm = app_instance
# 1. Define mock tracks
mock_tracks = [
MagicMock(id="track_1", description="Track 1"),
MagicMock(id="track_2", description="Track 2")
]
# 2. Patch get_all_tracks to return our mock list
mock_pm.get_all_tracks.return_value = mock_tracks
# 3. Call _refresh_from_project
app._refresh_from_project()
# 4. Verify that app.tracks contains the mock tracks
assert hasattr(app, 'tracks'), "App instance should have a 'tracks' attribute"
assert app.tracks == mock_tracks
assert len(app.tracks) == 2
assert app.tracks[0].id == "track_1"
assert app.tracks[1].id == "track_2"
# Verify get_all_tracks was called with the correct base_dir
mock_pm.get_all_tracks.assert_called_with(app.ui_files_base_dir)
def test_mma_dashboard_initialization_refresh(app_instance):
"""
Checks that _refresh_from_project is called during initialization if
_load_active_project is NOT mocked to skip it (but here it IS mocked in fixture).
This test verifies that calling it manually works as expected for initialization scenarios.
"""
app, mock_pm = app_instance
mock_tracks = [MagicMock(id="init_track", description="Initial Track")]
mock_pm.get_all_tracks.return_value = mock_tracks
# Simulate the refresh that would happen during a project load
app._refresh_from_project()
assert app.tracks == mock_tracks
assert app.tracks[0].id == "init_track"

View File

@@ -1,4 +1,5 @@
import pytest
import json
from unittest.mock import patch, MagicMock
import threading
import time
@@ -29,9 +30,11 @@ def test_mma_ui_state_initialization(app_instance):
assert hasattr(app_instance, 'ui_epic_input')
assert hasattr(app_instance, 'proposed_tracks')
assert hasattr(app_instance, '_show_track_proposal_modal')
assert hasattr(app_instance, 'mma_streams')
assert app_instance.ui_epic_input == ""
assert app_instance.proposed_tracks == []
assert app_instance._show_track_proposal_modal is False
assert app_instance.mma_streams == {}
def test_process_pending_gui_tasks_show_track_proposal(app_instance):
"""Verifies that the 'show_track_proposal' action correctly updates the UI state."""
@@ -69,16 +72,21 @@ def test_cb_plan_epic_launches_thread(app_instance):
app_instance._cb_plan_epic()
# Wait for the background thread to finish (it should be quick with mocks)
# In a real test, we might need a more robust way to wait, but for now:
max_wait = 5
start_time = time.time()
while len(app_instance._pending_gui_tasks) == 0 and time.time() - start_time < max_wait:
while len(app_instance._pending_gui_tasks) < 2 and time.time() - start_time < max_wait:
time.sleep(0.1)
assert len(app_instance._pending_gui_tasks) > 0
task = app_instance._pending_gui_tasks[0]
assert task['action'] == 'show_track_proposal'
assert task['payload'] == mock_tracks
assert len(app_instance._pending_gui_tasks) == 2
task1 = app_instance._pending_gui_tasks[0]
assert task1['action'] == 'handle_ai_response'
assert task1['payload']['stream_id'] == 'Tier 1'
assert task1['payload']['text'] == json.dumps(mock_tracks, indent=2)
task2 = app_instance._pending_gui_tasks[1]
assert task2['action'] == 'show_track_proposal'
assert task2['payload'] == mock_tracks
mock_get_history.assert_called_once()
mock_gen_tracks.assert_called_once()
@@ -104,3 +112,36 @@ def test_process_pending_gui_tasks_mma_spawn_approval(app_instance):
assert app_instance._mma_spawn_edit_mode is False
assert task["dialog_container"][0] is not None
assert task["dialog_container"][0]._ticket_id == "T1"
def test_handle_ai_response_with_stream_id(app_instance):
"""Verifies routing to mma_streams."""
task = {
"action": "handle_ai_response",
"payload": {
"text": "Tier 1 Strategy Content",
"stream_id": "Tier 1",
"status": "Thinking..."
}
}
app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks()
assert app_instance.mma_streams.get("Tier 1") == "Tier 1 Strategy Content"
assert app_instance.ai_status == "Thinking..."
assert app_instance.ai_response == ""
def test_handle_ai_response_fallback(app_instance):
"""Verifies fallback to ai_response when stream_id is missing."""
task = {
"action": "handle_ai_response",
"payload": {
"text": "Regular AI Response",
"status": "done"
}
}
app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks()
assert app_instance.ai_response == "Regular AI Response"
assert app_instance.ai_status == "done"
assert len(app_instance.mma_streams) == 0

View File

@@ -0,0 +1,94 @@
import pytest
import json
from pathlib import Path
from project_manager import get_all_tracks, save_track_state
from models import TrackState, Metadata, Ticket
from datetime import datetime
def test_get_all_tracks_empty(tmp_path):
# conductor/tracks directory doesn't exist
assert get_all_tracks(tmp_path) == []
def test_get_all_tracks_with_state(tmp_path):
tracks_dir = tmp_path / "conductor" / "tracks"
tracks_dir.mkdir(parents=True)
track_id = "test_track_1"
track_dir = tracks_dir / track_id
track_dir.mkdir()
# Create TrackState
metadata = Metadata(id=track_id, name="Test Track 1", status="in_progress",
created_at=datetime.now(), updated_at=datetime.now())
tasks = [
Ticket(id="task1", description="desc1", status="completed", assigned_to="user"),
Ticket(id="task2", description="desc2", status="todo", assigned_to="user")
]
state = TrackState(metadata=metadata, discussion=[], tasks=tasks)
save_track_state(track_id, state, tmp_path)
tracks = get_all_tracks(tmp_path)
assert len(tracks) == 1
track = tracks[0]
assert track["id"] == track_id
assert track["title"] == "Test Track 1"
assert track["status"] == "in_progress"
assert track["complete"] == 1
assert track["total"] == 2
assert track["progress"] == 0.5
def test_get_all_tracks_with_metadata_json(tmp_path):
tracks_dir = tmp_path / "conductor" / "tracks"
tracks_dir.mkdir(parents=True)
track_id = "test_track_2"
track_dir = tracks_dir / track_id
track_dir.mkdir()
metadata = {
"id": track_id,
"title": "Test Track 2",
"status": "planned"
}
with open(track_dir / "metadata.json", "w") as f:
json.dump(metadata, f)
# Create plan.md to test parsing
plan_content = """
# Plan
- [x] Task: Task 1
- [ ] Task: Task 2
- [~] Task: Task 3
"""
with open(track_dir / "plan.md", "w") as f:
f.write(plan_content)
tracks = get_all_tracks(tmp_path)
assert len(tracks) == 1
track = tracks[0]
assert track["id"] == track_id
assert track["title"] == "Test Track 2"
assert track["status"] == "planned"
assert track["complete"] == 1
assert track["total"] == 3
assert pytest.approx(track["progress"]) == 0.333333
def test_get_all_tracks_malformed(tmp_path):
tracks_dir = tmp_path / "conductor" / "tracks"
tracks_dir.mkdir(parents=True)
track_id = "malformed_track"
track_dir = tracks_dir / track_id
track_dir.mkdir()
# Malformed metadata.json
with open(track_dir / "metadata.json", "w") as f:
f.write("{ invalid json }")
tracks = get_all_tracks(tmp_path)
assert len(tracks) == 1
track = tracks[0]
assert track["id"] == track_id
assert track["status"] == "unknown"
assert track["complete"] == 0
assert track["total"] == 0