Compare commits

...

5 Commits

6 changed files with 464 additions and 199 deletions

View File

@@ -58,3 +58,4 @@ For deep implementation details when planning or implementing tracks, consult `d
- **Gemini CLI Integration:** Allows using the `gemini` CLI as a headless backend provider. This enables leveraging Gemini subscriptions with advanced features like persistent sessions, while maintaining full "Human-in-the-Loop" safety through a dedicated bridge for synchronous tool call approvals within the Manual Slop GUI. Now features full functional parity with the direct API, including accurate token estimation, safety settings, and robust system instruction handling.
- **Context & Token Visualization:** Detailed UI panels for monitoring real-time token usage, history depth, and **visual cache awareness** (tracking specific files currently live in the provider's context cache).
- **On-Demand Definition Lookup:** Allows developers to request specific class or function definitions during discussions using `@SymbolName` syntax. Injected definitions feature syntax highlighting, intelligent collapsing for long blocks, and a **[Source]** button for instant navigation to the full file.
- **Manual Ticket Queue Management:** Provides a dedicated GUI panel for granular control over the implementation queue. Features include color-coded priority assignment (High, Medium, Low), multi-select bulk operations (Execute, Skip, Block), and interactive drag-and-drop reordering with real-time Directed Acyclic Graph (DAG) validation.

View File

@@ -66,7 +66,7 @@ This file tracks all major tracks for the project. Each track has its own detail
### Manual UX Controls
14. [ ] **Track: Manual Ticket Queue Management**
14. [~] **Track: Manual Ticket Queue Management**
*Link: [./tracks/ticket_queue_mgmt_20260306/](./tracks/ticket_queue_mgmt_20260306/)*
15. [ ] **Track: Kill/Abort Running Workers**

View File

@@ -5,10 +5,10 @@
## Phase 1: Priority Field
Focus: Add priority to Ticket model
- [ ] Task 1.1: Initialize MMA Environment
- [x] Task 1.1: Initialize MMA Environment
- Run `activate_skill mma-orchestrator` before starting
- [ ] Task 1.2: Add priority field to Ticket
- [x] Task 1.2: Add priority field to Ticket (035c74e)
- WHERE: `src/models.py` `Ticket` dataclass
- WHAT: Add `priority: str = "medium"` field
- HOW:
@@ -20,7 +20,7 @@ Focus: Add priority to Ticket model
```
- CODE STYLE: 1-space indentation
- [ ] Task 1.3: Update Ticket serialization
- [x] Task 1.3: Update Ticket serialization (035c74e)
- WHERE: `src/models.py` `Ticket.to_dict()` and `from_dict()`
- WHAT: Include priority in serialization
- HOW: Add `priority` to dict conversion
@@ -28,7 +28,7 @@ Focus: Add priority to Ticket model
## Phase 2: Priority UI
Focus: Add priority dropdown to ticket display
- [ ] Task 2.1: Add priority dropdown
- [x] Task 2.1: Add priority dropdown (a22603d)
- WHERE: `src/gui_2.py` ticket rendering
- WHAT: Dropdown for priority selection
- HOW:
@@ -42,7 +42,7 @@ Focus: Add priority dropdown to ticket display
imgui.end_combo()
```
- [ ] Task 2.2: Add color coding
- [x] Task 2.2: Add color coding (a22603d)
- WHERE: `src/gui_2.py` ticket rendering
- WHAT: Color-code priority display
- HOW:
@@ -54,7 +54,7 @@ Focus: Add priority dropdown to ticket display
## Phase 3: Multi-Select
Focus: Enable ticket selection for bulk operations
- [ ] Task 3.1: Add selection state
- [x] Task 3.1: Add selection state (a22603d)
- WHERE: `src/gui_2.py` or `src/app_controller.py`
- WHAT: Track selected ticket IDs
- HOW:
@@ -62,7 +62,7 @@ Focus: Enable ticket selection for bulk operations
self._selected_tickets: set[str] = set()
```
- [ ] Task 3.2: Add checkbox per ticket
- [x] Task 3.2: Add checkbox per ticket (a22603d)
- WHERE: `src/gui_2.py` ticket list rendering
- WHAT: Checkbox for selection
- HOW:
@@ -76,7 +76,7 @@ Focus: Enable ticket selection for bulk operations
imgui.same_line()
```
- [ ] Task 3.3: Add select all/none buttons
- [x] Task 3.3: Add select all/none buttons (a22603d)
- WHERE: `src/gui_2.py` ticket list header
- WHAT: Buttons to select/deselect all
- HOW:
@@ -91,7 +91,7 @@ Focus: Enable ticket selection for bulk operations
## Phase 4: Bulk Actions
Focus: Execute bulk operations on selected tickets
- [ ] Task 4.1: Add bulk action buttons
- [x] Task 4.1: Add bulk action buttons (a22603d)
- WHERE: `src/gui_2.py` ticket list area
- WHAT: Execute, Skip, Block buttons
- HOW:
@@ -112,7 +112,7 @@ Focus: Execute bulk operations on selected tickets
## Phase 5: Drag-Drop (Optional)
Focus: Allow ticket reordering
- [ ] Task 5.1: Implement drag-drop reordering
- [x] Task 5.1: Implement drag-drop reordering (a22603d)
- WHERE: `src/gui_2.py` ticket list
- WHAT: Drag tickets to reorder
- HOW: Use imgui drag-drop API
@@ -121,11 +121,11 @@ Focus: Allow ticket reordering
## Phase 6: Testing
Focus: Verify all functionality
- [ ] Task 6.1: Write unit tests
- [x] Task 6.1: Write unit tests (a22603d)
- WHERE: `tests/test_ticket_queue.py` (new file)
- WHAT: Test priority serialization, bulk operations
- HOW: Create mock tickets, verify state changes
- [ ] Task 6.2: Conductor - Phase Verification
- [x] Task 6.2: Conductor - Phase Verification (a22603d)
- Run: `uv run pytest tests/test_ticket_queue.py -v`
- Manual: Verify UI controls work

View File

@@ -105,6 +105,8 @@ class App:
self.node_editor_config = ed.Config()
self.node_editor_ctx = ed.create_editor(self.node_editor_config)
self.ui_selected_ticket_id: Optional[str] = None
self.ui_selected_tickets: set[str] = set()
self.ui_new_ticket_priority: str = "medium"
self._autofocus_response_tab = False
gui_cfg = self.config.get("gui", {})
self.ui_separate_message_panel = gui_cfg.get("separate_message_panel", False)
@@ -1930,6 +1932,137 @@ class App:
self._scroll_tool_calls_to_bottom = False
if self.perf_profiling_enabled: self.perf_monitor.end_component("_render_tool_calls_panel")
def bulk_execute(self) -> None:
for tid in self.ui_selected_tickets:
t = next((t for t in self.active_tickets if str(t.get('id', '')) == tid), None)
if t: t['status'] = 'ready'
self._push_mma_state_update()
def bulk_skip(self) -> None:
for tid in self.ui_selected_tickets:
t = next((t for t in self.active_tickets if str(t.get('id', '')) == tid), None)
if t: t['status'] = 'completed'
self._push_mma_state_update()
def bulk_block(self) -> None:
for tid in self.ui_selected_tickets:
t = next((t for t in self.active_tickets if str(t.get('id', '')) == tid), None)
if t: t['status'] = 'blocked'
self._push_mma_state_update()
def _reorder_ticket(self, src_idx: int, dst_idx: int) -> None:
if src_idx == dst_idx: return
new_tickets = list(self.active_tickets)
ticket = new_tickets.pop(src_idx)
new_tickets.insert(dst_idx, ticket)
# Validate dependencies: a ticket cannot be placed before any of its dependencies
id_to_idx = {str(t.get('id', '')): i for i, t in enumerate(new_tickets)}
valid = True
for i, t in enumerate(new_tickets):
deps = t.get('depends_on', [])
for d_id in deps:
if d_id in id_to_idx and id_to_idx[d_id] >= i:
valid = False
break
if not valid: break
if valid:
self.active_tickets = new_tickets
self._push_mma_state_update()
def _render_ticket_queue(self) -> None:
imgui.text("Ticket Queue Management")
if not self.active_track:
imgui.text_disabled("No active track.")
return
# Select All / None
if imgui.button("Select All"):
self.ui_selected_tickets = {str(t.get('id', '')) for t in self.active_tickets}
imgui.same_line()
if imgui.button("Select None"):
self.ui_selected_tickets.clear()
imgui.same_line()
imgui.spacing()
imgui.same_line()
# Bulk Actions
if imgui.button("Bulk Execute"):
self.bulk_execute()
imgui.same_line()
if imgui.button("Bulk Skip"):
self.bulk_skip()
imgui.same_line()
if imgui.button("Bulk Block"):
self.bulk_block()
# Table
flags = imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable | imgui.TableFlags_.scroll_y
if imgui.begin_table("ticket_queue_table", 5, flags, imgui.ImVec2(0, 300)):
imgui.table_setup_column("Select", imgui.TableColumnFlags_.width_fixed, 40)
imgui.table_setup_column("ID", imgui.TableColumnFlags_.width_fixed, 80)
imgui.table_setup_column("Priority", imgui.TableColumnFlags_.width_fixed, 100)
imgui.table_setup_column("Status", imgui.TableColumnFlags_.width_fixed, 100)
imgui.table_setup_column("Description", imgui.TableColumnFlags_.width_stretch)
imgui.table_headers_row()
for i, t in enumerate(self.active_tickets):
tid = str(t.get('id', ''))
imgui.table_next_row()
# Select
imgui.table_next_column()
is_sel = tid in self.ui_selected_tickets
changed, is_sel = imgui.checkbox(f"##sel_{tid}", is_sel)
if changed:
if is_sel: self.ui_selected_tickets.add(tid)
else: self.ui_selected_tickets.discard(tid)
# ID
imgui.table_next_column()
is_selected = (tid == self.ui_selected_ticket_id)
opened, _ = imgui.selectable(f"{tid}##drag_{tid}", is_selected)
if opened:
self.ui_selected_ticket_id = tid
if imgui.begin_drag_drop_source():
imgui.set_drag_drop_payload("TICKET_REORDER", i)
imgui.text(f"Moving {tid}")
imgui.end_drag_drop_source()
if imgui.begin_drag_drop_target():
payload = imgui.accept_drag_drop_payload("TICKET_REORDER")
if payload:
src_idx = int(payload.data)
self._reorder_ticket(src_idx, i)
imgui.end_drag_drop_target()
# Priority
imgui.table_next_column()
prio = t.get('priority', 'medium')
p_col = vec4(180, 180, 180) # gray
if prio == 'high': p_col = vec4(255, 100, 100) # red
elif prio == 'medium': p_col = vec4(255, 255, 100) # yellow
imgui.push_style_color(imgui.Col_.text, p_col)
if imgui.begin_combo(f"##prio_{tid}", prio, imgui.ComboFlags_.height_small):
for p_opt in ['high', 'medium', 'low']:
if imgui.selectable(p_opt, p_opt == prio)[0]:
t['priority'] = p_opt
self._push_mma_state_update()
imgui.end_combo()
imgui.pop_style_color()
# Status
imgui.table_next_column()
imgui.text(t.get('status', 'todo'))
# Description
imgui.table_next_column()
imgui.text(t.get('description', ''))
imgui.end_table()
def _render_mma_dashboard(self) -> None:
if self.perf_profiling_enabled: self.perf_monitor.start_component("_render_mma_dashboard")
# Task 5.3: Dense Summary Line
@@ -2178,6 +2311,8 @@ class App:
imgui.pop_id()
imgui.separator()
self._render_ticket_queue()
imgui.separator()
# 4. Task DAG Visualizer
imgui.text("Task DAG")
if self.active_track and self.node_editor_ctx:
@@ -2292,11 +2427,19 @@ class App:
_, self.ui_new_ticket_desc = imgui.input_text_multiline("Description##new_ticket", self.ui_new_ticket_desc, imgui.ImVec2(-1, 60))
_, self.ui_new_ticket_target = imgui.input_text("Target File##new_ticket", self.ui_new_ticket_target)
_, self.ui_new_ticket_deps = imgui.input_text("Depends On (IDs, comma-separated)##new_ticket", self.ui_new_ticket_deps)
imgui.text("Priority:")
imgui.same_line()
if imgui.begin_combo("##new_prio", self.ui_new_ticket_priority):
for p_opt in ['high', 'medium', 'low']:
if imgui.selectable(p_opt, p_opt == self.ui_new_ticket_priority)[0]:
self.ui_new_ticket_priority = p_opt
imgui.end_combo()
if imgui.button("Create"):
new_ticket = {
"id": self.ui_new_ticket_id,
"description": self.ui_new_ticket_desc,
"status": "todo",
"priority": self.ui_new_ticket_priority,
"assigned_to": "tier3-worker",
"target_file": self.ui_new_ticket_target,
"depends_on": [d.strip() for d in self.ui_new_ticket_deps.split(",") if d.strip()]
@@ -2318,6 +2461,15 @@ class App:
ticket = next((t for t in self.active_tickets if str(t.get('id', '')) == self.ui_selected_ticket_id), None)
if ticket:
imgui.text(f"Status: {ticket.get('status', 'todo')}")
prio = ticket.get('priority', 'medium')
imgui.text("Priority:")
imgui.same_line()
if imgui.begin_combo(f"##edit_prio_{ticket.get('id')}", prio):
for p_opt in ['high', 'medium', 'low']:
if imgui.selectable(p_opt, p_opt == prio)[0]:
ticket['priority'] = p_opt
self._push_mma_state_update()
imgui.end_combo()
imgui.text(f"Target: {ticket.get('target_file', '')}")
deps = ticket.get('depends_on', [])
imgui.text(f"Depends on: {', '.join(deps)}")

View File

@@ -9,230 +9,233 @@ from src.paths import get_config_path
CONFIG_PATH = get_config_path()
def load_config() -> dict[str, Any]:
with open(CONFIG_PATH, "rb") as f:
return tomllib.load(f)
with open(CONFIG_PATH, "rb") as f:
return tomllib.load(f)
def save_config(config: dict[str, Any]) -> None:
import tomli_w
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(config, f)
import tomli_w
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(config, f)
AGENT_TOOL_NAMES = [
"run_powershell",
"read_file",
"list_directory",
"search_files",
"web_search",
"fetch_url",
"get_file_summary",
"py_get_skeleton",
"py_get_code_outline",
"py_get_definition",
"py_get_signature",
"py_get_class_summary",
"py_get_var_declaration",
"py_get_docstring",
"py_find_usages",
"py_get_imports",
"py_check_syntax",
"py_get_hierarchy"
"run_powershell",
"read_file",
"list_directory",
"search_files",
"web_search",
"fetch_url",
"get_file_summary",
"py_get_skeleton",
"py_get_code_outline",
"py_get_definition",
"py_get_signature",
"py_get_class_summary",
"py_get_var_declaration",
"py_get_docstring",
"py_find_usages",
"py_get_imports",
"py_check_syntax",
"py_get_hierarchy"
]
def parse_history_entries(history_strings: list[str], roles: list[str]) -> list[dict[str, Any]]:
import re
entries = []
for raw in history_strings:
ts = ""
rest = raw
if rest.startswith("@"):
nl = rest.find("\n")
if nl != -1:
ts = rest[1:nl]
rest = rest[nl + 1:]
known = roles or ["User", "AI", "Vendor API", "System"]
role_pat = re.compile(r"^(" + "|".join(re.escape(r) for r in known) + r"):", re.IGNORECASE)
match = role_pat.match(rest)
role = match.group(1) if match else "User"
if match:
content = rest[match.end():].strip()
else:
content = rest
entries.append({"role": role, "content": content, "collapsed": True, "ts": ts})
return entries
import re
entries = []
for raw in history_strings:
ts = ""
rest = raw
if rest.startswith("@"):
nl = rest.find("\n")
if nl != -1:
ts = rest[1:nl]
rest = rest[nl + 1:]
known = roles or ["User", "AI", "Vendor API", "System"]
role_pat = re.compile(r"^(" + "|".join(re.escape(r) for r in known) + r"):", re.IGNORECASE)
match = role_pat.match(rest)
role = match.group(1) if match else "User"
if match:
content = rest[match.end():].strip()
else:
content = rest
entries.append({"role": role, "content": content, "collapsed": True, "ts": ts})
return entries
@dataclass
class Ticket:
id: str
description: str
status: str = "todo"
assigned_to: str = "unassigned"
target_file: Optional[str] = None
target_symbols: List[str] = field(default_factory=list)
context_requirements: List[str] = field(default_factory=list)
depends_on: List[str] = field(default_factory=list)
blocked_reason: Optional[str] = None
step_mode: bool = False
retry_count: int = 0
id: str
description: str
status: str = "todo"
assigned_to: str = "unassigned"
priority: str = "medium"
target_file: Optional[str] = None
target_symbols: List[str] = field(default_factory=list)
context_requirements: List[str] = field(default_factory=list)
depends_on: List[str] = field(default_factory=list)
blocked_reason: Optional[str] = None
step_mode: bool = False
retry_count: int = 0
def mark_blocked(self, reason: str) -> None:
self.status = "blocked"
self.blocked_reason = reason
def mark_blocked(self, reason: str) -> None:
self.status = "blocked"
self.blocked_reason = reason
def mark_complete(self) -> None:
self.status = "completed"
def mark_complete(self) -> None:
self.status = "completed"
def get(self, key: str, default: Any = None) -> Any:
return getattr(self, key, default)
def get(self, key: str, default: Any = None) -> Any:
return getattr(self, key, default)
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"description": self.description,
"status": self.status,
"assigned_to": self.assigned_to,
"target_file": self.target_file,
"target_symbols": self.target_symbols,
"context_requirements": self.context_requirements,
"depends_on": self.depends_on,
"blocked_reason": self.blocked_reason,
"step_mode": self.step_mode,
"retry_count": self.retry_count,
}
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"description": self.description,
"status": self.status,
"assigned_to": self.assigned_to,
"priority": self.priority,
"target_file": self.target_file,
"target_symbols": self.target_symbols,
"context_requirements": self.context_requirements,
"depends_on": self.depends_on,
"blocked_reason": self.blocked_reason,
"step_mode": self.step_mode,
"retry_count": self.retry_count,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Ticket":
return cls(
id=data["id"],
description=data.get("description", ""),
status=data.get("status", "todo"),
assigned_to=data.get("assigned_to", ""),
target_file=data.get("target_file"),
target_symbols=data.get("target_symbols", []),
context_requirements=data.get("context_requirements", []),
depends_on=data.get("depends_on", []),
blocked_reason=data.get("blocked_reason"),
step_mode=data.get("step_mode", False),
retry_count=data.get("retry_count", 0),
)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Ticket":
return cls(
id=data["id"],
description=data.get("description", ""),
status=data.get("status", "todo"),
assigned_to=data.get("assigned_to", "unassigned"),
priority=data.get("priority", "medium"),
target_file=data.get("target_file"),
target_symbols=data.get("target_symbols", []),
context_requirements=data.get("context_requirements", []),
depends_on=data.get("depends_on", []),
blocked_reason=data.get("blocked_reason"),
step_mode=data.get("step_mode", False),
retry_count=data.get("retry_count", 0),
)
@dataclass
class Track:
id: str
description: str
tickets: List[Ticket] = field(default_factory=list)
id: str
description: str
tickets: List[Ticket] = field(default_factory=list)
def get_executable_tickets(self) -> List[Ticket]:
from src.dag_engine import TrackDAG
dag = TrackDAG(self.tickets)
return dag.get_ready_tasks()
def get_executable_tickets(self) -> List[Ticket]:
from src.dag_engine import TrackDAG
dag = TrackDAG(self.tickets)
return dag.get_ready_tasks()
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"description": self.description,
"tickets": [t.to_dict() for t in self.tickets],
}
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"description": self.description,
"tickets": [t.to_dict() for t in self.tickets],
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Track":
return cls(
id=data["id"],
description=data.get("description", ""),
tickets=[Ticket.from_dict(t) for t in data.get("tickets", [])],
)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Track":
return cls(
id=data["id"],
description=data.get("description", ""),
tickets=[Ticket.from_dict(t) for t in data.get("tickets", [])],
)
@dataclass
class WorkerContext:
ticket_id: str
model_name: str
messages: List[Dict[str, Any]] = field(default_factory=list)
ticket_id: str
model_name: str
messages: List[Dict[str, Any]] = field(default_factory=list)
@dataclass
class Metadata:
id: str
name: str
status: Optional[str] = None
created_at: Optional[datetime.datetime] = None
updated_at: Optional[datetime.datetime] = None
id: str
name: str
status: Optional[str] = None
created_at: Optional[datetime.datetime] = None
updated_at: Optional[datetime.datetime] = None
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"status": self.status,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"status": self.status,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Metadata":
created = data.get("created_at")
updated = data.get("updated_at")
if isinstance(created, str):
try:
created = datetime.datetime.fromisoformat(created)
except ValueError:
created = None
if isinstance(updated, str):
try:
updated = datetime.datetime.fromisoformat(updated)
except ValueError:
updated = None
return cls(
id=data["id"],
name=data.get("name", ""),
status=data.get("status"),
created_at=created,
updated_at=updated,
)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Metadata":
created = data.get("created_at")
updated = data.get("updated_at")
if isinstance(created, str):
try:
created = datetime.datetime.fromisoformat(created)
except ValueError:
created = None
if isinstance(updated, str):
try:
updated = datetime.datetime.fromisoformat(updated)
except ValueError:
updated = None
return cls(
id=data["id"],
name=data.get("name", ""),
status=data.get("status"),
created_at=created,
updated_at=updated,
)
@dataclass
class TrackState:
metadata: Metadata
discussion: List[str] = field(default_factory=list)
tasks: List[Ticket] = field(default_factory=list)
metadata: Metadata
discussion: List[str] = field(default_factory=list)
tasks: List[Ticket] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
serialized_discussion = []
for item in self.discussion:
if isinstance(item, dict):
new_item = dict(item)
if "ts" in new_item and isinstance(new_item["ts"], datetime.datetime):
new_item["ts"] = new_item["ts"].isoformat()
serialized_discussion.append(new_item)
else:
serialized_discussion.append(item)
return {
"metadata": self.metadata.to_dict(),
"discussion": serialized_discussion,
"tasks": [t.to_dict() for t in self.tasks],
}
def to_dict(self) -> Dict[str, Any]:
serialized_discussion = []
for item in self.discussion:
if isinstance(item, dict):
new_item = dict(item)
if "ts" in new_item and isinstance(new_item["ts"], datetime.datetime):
new_item["ts"] = new_item["ts"].isoformat()
serialized_discussion.append(new_item)
else:
serialized_discussion.append(item)
return {
"metadata": self.metadata.to_dict(),
"discussion": serialized_discussion,
"tasks": [t.to_dict() for t in self.tasks],
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TrackState":
discussion = data.get("discussion", [])
parsed_discussion = []
for item in discussion:
if isinstance(item, dict):
new_item = dict(item)
ts = new_item.get("ts")
if isinstance(ts, str):
try:
new_item["ts"] = datetime.datetime.fromisoformat(ts)
except ValueError:
pass
parsed_discussion.append(new_item)
else:
parsed_discussion.append(item)
return cls(
metadata=Metadata.from_dict(data["metadata"]),
discussion=parsed_discussion,
tasks=[Ticket.from_dict(t) for t in data.get("tasks", [])],
)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TrackState":
discussion = data.get("discussion", [])
parsed_discussion = []
for item in discussion:
if isinstance(item, dict):
new_item = dict(item)
ts = new_item.get("ts")
if isinstance(ts, str):
try:
new_item["ts"] = datetime.datetime.fromisoformat(ts)
except ValueError:
pass
parsed_discussion.append(new_item)
else:
parsed_discussion.append(item)
return cls(
metadata=Metadata.from_dict(data["metadata"]),
discussion=parsed_discussion,
tasks=[Ticket.from_dict(t) for t in data.get("tasks", [])],
)
@dataclass
class FileItem:

109
tests/test_ticket_queue.py Normal file
View File

@@ -0,0 +1,109 @@
import pytest
from unittest.mock import patch
from src.models import Ticket
def test_ticket_priority_default():
ticket = Ticket(id="T1", description="Test ticket")
assert ticket.priority == "medium"
def test_ticket_priority_custom():
ticket_high = Ticket(id="T2", description="High priority", priority="high")
assert ticket_high.priority == "high"
ticket_low = Ticket(id="T3", description="Low priority", priority="low")
assert ticket_low.priority == "low"
def test_ticket_to_dict_priority():
ticket = Ticket(id="T4", description="To dict test", priority="high")
d = ticket.to_dict()
assert "priority" in d
assert d["priority"] == "high"
def test_ticket_from_dict_priority():
data = {
"id": "T5",
"description": "From dict test",
"priority": "low",
"status": "todo"
}
ticket = Ticket.from_dict(data)
assert ticket.priority == "low"
def test_ticket_from_dict_default_priority():
data = {
"id": "T6",
"description": "No priority in dict"
}
ticket = Ticket.from_dict(data)
assert ticket.priority == "medium"
class TestBulkOperations:
def test_bulk_execute(self, mock_app):
mock_app.active_tickets = [
{"id": "T1", "status": "todo"},
{"id": "T2", "status": "todo"},
{"id": "T3", "status": "todo"}
]
mock_app.ui_selected_tickets = {"T1", "T3"}
with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push:
mock_app.bulk_execute()
assert mock_app.active_tickets[0]["status"] == "ready"
assert mock_app.active_tickets[1]["status"] == "todo"
assert mock_app.active_tickets[2]["status"] == "ready"
mock_push.assert_called_once()
def test_bulk_skip(self, mock_app):
mock_app.active_tickets = [
{"id": "T1", "status": "todo"},
{"id": "T2", "status": "todo"}
]
mock_app.ui_selected_tickets = {"T1"}
with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push:
mock_app.bulk_skip()
assert mock_app.active_tickets[0]["status"] == "completed"
assert mock_app.active_tickets[1]["status"] == "todo"
mock_push.assert_called_once()
def test_bulk_block(self, mock_app):
mock_app.active_tickets = [
{"id": "T1", "status": "todo"},
{"id": "T2", "status": "todo"}
]
mock_app.ui_selected_tickets = {"T1", "T2"}
with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push:
mock_app.bulk_block()
assert mock_app.active_tickets[0]["status"] == "blocked"
assert mock_app.active_tickets[1]["status"] == "blocked"
mock_push.assert_called_once()
class TestReorder:
def test_reorder_ticket_valid(self, mock_app):
mock_app.active_tickets = [
{"id": "T1", "depends_on": []},
{"id": "T2", "depends_on": []},
{"id": "T3", "depends_on": ["T1"]}
]
with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push:
# Move T1 to index 1: [T2, T1, T3]. T3 depends on T1. T1 index 1 < T3 index 2. VALID.
mock_app._reorder_ticket(0, 1)
assert mock_app.active_tickets[0]["id"] == "T2"
assert mock_app.active_tickets[1]["id"] == "T1"
assert mock_app.active_tickets[2]["id"] == "T3"
mock_push.assert_called_once()
def test_reorder_ticket_invalid(self, mock_app):
mock_app.active_tickets = [
{"id": "T1", "depends_on": []},
{"id": "T2", "depends_on": ["T1"]}
]
with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push:
# Move T1 after T2: [T2, T1]. T2 depends on T1, but T1 is now at index 1 while T2 is at index 0.
# Violation: dependency T1 (index 1) is not before T2 (index 0).
mock_app._reorder_ticket(0, 1)
# Should NOT change
assert mock_app.active_tickets[0]["id"] == "T1"
assert mock_app.active_tickets[1]["id"] == "T2"
mock_push.assert_not_called()