Files
manual_slop/models.py
Ed_ 9fb01ce5d1 feat(mma): complete Phase 6 and finalize Comprehensive GUI UX track
- Implement Live Worker Streaming: wire ai_client.comms_log_callback to Tier 3 streams
- Add Parallel DAG Execution using asyncio.gather for non-dependent tickets
- Implement Automatic Retry with Model Escalation (Flash-Lite -> Flash -> Pro)
- Add Tier Model Configuration UI to MMA Dashboard with project TOML persistence
- Fix FPS reporting in PerformanceMonitor to prevent transient 0.0 values
- Update Ticket model with retry_count and dictionary-like access
- Stabilize Gemini CLI integration tests and handle script approval events in simulations
- Finalize and verify all 6 phases of the implementation plan
2026-03-01 22:38:43 -05:00

163 lines
4.4 KiB
Python

from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from datetime import datetime
@dataclass
class Ticket:
"""
Represents a discrete unit of work within a track.
"""
id: str
description: str
status: str
assigned_to: str
target_file: Optional[str] = None
context_requirements: List[str] = field(default_factory=list)
depends_on: List[str] = field(default_factory=list)
blocked_reason: Optional[str] = None
step_mode: bool = False
retry_count: int = 0
def mark_blocked(self, reason: str) -> None:
"""Sets the ticket status to 'blocked' and records the reason."""
self.status = "blocked"
self.blocked_reason = reason
def mark_complete(self) -> None:
"""Sets the ticket status to 'completed'."""
self.status = "completed"
def get(self, key: str, default: Any = None) -> Any:
"""Helper to provide dictionary-like access to dataclass fields."""
return getattr(self, key, default)
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"description": self.description,
"status": self.status,
"assigned_to": self.assigned_to,
"target_file": self.target_file,
"context_requirements": self.context_requirements,
"depends_on": self.depends_on,
"blocked_reason": self.blocked_reason,
"step_mode": self.step_mode,
"retry_count": self.retry_count,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Ticket":
return cls(
id=data["id"],
description=data.get("description"),
status=data.get("status"),
assigned_to=data.get("assigned_to"),
target_file=data.get("target_file"),
context_requirements=data.get("context_requirements", []),
depends_on=data.get("depends_on", []),
blocked_reason=data.get("blocked_reason"),
step_mode=data.get("step_mode", False),
retry_count=data.get("retry_count", 0),
)
@dataclass
class Track:
"""
Represents a collection of tickets that together form an architectural track or epic.
"""
id: str
description: str
tickets: List[Ticket] = field(default_factory=list)
def get_executable_tickets(self) -> List[Ticket]:
"""
Returns all 'todo' tickets whose dependencies are all 'completed'.
"""
# Map ticket IDs to their current status for efficient lookup
status_map = {t.id: t.status for t in self.tickets}
executable = []
for ticket in self.tickets:
if ticket.status != "todo":
continue
# Check if all dependencies are completed
all_deps_completed = True
for dep_id in ticket.depends_on:
# If a dependency is missing from the track, we treat it as not completed (or we could raise an error)
if status_map.get(dep_id) != "completed":
all_deps_completed = False
break
if all_deps_completed:
executable.append(ticket)
return executable
@dataclass
class WorkerContext:
"""
Represents the context provided to a Tier 3 Worker for a specific ticket.
"""
ticket_id: str
model_name: str
messages: List[dict]
@dataclass
class Metadata:
id: str
name: str
status: str
created_at: datetime
updated_at: datetime
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"status": self.status,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Metadata":
return cls(
id=data["id"],
name=data["name"],
status=data.get("status"),
created_at=datetime.fromisoformat(data['created_at']) if data.get('created_at') else None,
updated_at=datetime.fromisoformat(data['updated_at']) if data.get('updated_at') else None,
)
@dataclass
class TrackState:
metadata: Metadata
discussion: List[Dict[str, Any]]
tasks: List[Ticket]
def to_dict(self) -> Dict[str, Any]:
return {
"metadata": self.metadata.to_dict(),
"discussion": [
{
k: v.isoformat() if isinstance(v, datetime) else v
for k, v in item.items()
}
for item in self.discussion
],
"tasks": [task.to_dict() for task in self.tasks],
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TrackState":
metadata = Metadata.from_dict(data["metadata"])
tasks = [Ticket.from_dict(task_data) for task_data in data["tasks"]]
return cls(
metadata=metadata,
discussion=[
{
k: datetime.fromisoformat(v) if isinstance(v, str) and 'T' in v else v # Basic check for ISO format
for k, v in item.items()
}
for item in data["discussion"]
],
tasks=tasks,
)