""" Models - Core data structures for MMA orchestration and project configuration. This module defines the primary dataclasses used throughout the Manual Slop application for representing tasks, tracks, and execution context. Key Data Structures: - Ticket: Atomic unit of work with status, dependencies, and context requirements - Track: Collection of tickets with a shared goal - WorkerContext: Execution context for a Tier 3 worker - Metadata: Track metadata (id, name, status, timestamps) - TrackState: Serializable track state with discussion history - FileItem: File configuration with auto-aggregate and force-full flags Status Machine (Ticket): todo -> in_progress -> completed | | v v blocked blocked Serialization: All dataclasses provide to_dict() and from_dict() class methods for TOML/JSON persistence via project_manager.py. Thread Safety: These dataclasses are NOT thread-safe. Callers must synchronize mutations if sharing instances across threads (e.g., during ConductorEngine execution). Configuration Integration: - load_config() / save_config() read/write the global config.toml - AGENT_TOOL_NAMES defines the canonical list of MCP tools available to agents See Also: - docs/guide_mma.md for MMA orchestration documentation - src/dag_engine.py for TrackDAG and ExecutionEngine - src/multi_agent_conductor.py for ConductorEngine - src/project_manager.py for persistence layer """ from __future__ import annotations import tomllib import datetime from dataclasses import dataclass, field from typing import List, Optional, Dict, Any, Union from pathlib import Path from src.paths import get_config_path CONFIG_PATH = get_config_path() def load_config() -> dict[str, Any]: with open(CONFIG_PATH, "rb") as f: return tomllib.load(f) def save_config(config: dict[str, Any]) -> None: import tomli_w with open(CONFIG_PATH, "wb") as f: tomli_w.dump(config, f) AGENT_TOOL_NAMES = [ "run_powershell", "read_file", "list_directory", "search_files", "web_search", "fetch_url", "get_file_summary", "py_get_skeleton", "py_get_code_outline", "py_get_definition", "py_get_signature", "py_get_class_summary", "py_get_var_declaration", "py_get_docstring", "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy" ] def parse_history_entries(history_strings: list[str], roles: list[str]) -> list[dict[str, Any]]: import re entries = [] for raw in history_strings: ts = "" rest = raw if rest.startswith("@"): nl = rest.find("\n") if nl != -1: ts = rest[1:nl] rest = rest[nl + 1:] known = roles or ["User", "AI", "Vendor API", "System"] role_pat = re.compile(r"^(" + "|".join(re.escape(r) for r in known) + r"):", re.IGNORECASE) match = role_pat.match(rest) role = match.group(1) if match else "User" if match: content = rest[match.end():].strip() else: content = rest entries.append({"role": role, "content": content, "collapsed": True, "ts": ts}) return entries @dataclass class Ticket: id: str description: str status: str = "todo" assigned_to: str = "unassigned" priority: str = "medium" target_file: Optional[str] = None target_symbols: List[str] = field(default_factory=list) context_requirements: List[str] = field(default_factory=list) depends_on: List[str] = field(default_factory=list) blocked_reason: Optional[str] = None step_mode: bool = False retry_count: int = 0 manual_block: bool = False model_override: Optional[str] = None def mark_blocked(self, reason: str) -> None: self.status = "blocked" self.blocked_reason = reason def mark_manual_block(self, reason: str) -> None: self.status = "blocked" self.blocked_reason = f"[MANUAL] {reason}" self.manual_block = True def clear_manual_block(self) -> None: if self.manual_block: self.status = "todo" self.blocked_reason = None self.manual_block = False def mark_complete(self) -> None: self.status = "completed" def get(self, key: str, default: Any = None) -> Any: return getattr(self, key, default) def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "description": self.description, "status": self.status, "assigned_to": self.assigned_to, "priority": self.priority, "target_file": self.target_file, "target_symbols": self.target_symbols, "context_requirements": self.context_requirements, "depends_on": self.depends_on, "blocked_reason": self.blocked_reason, "step_mode": self.step_mode, "retry_count": self.retry_count, "manual_block": self.manual_block, "model_override": self.model_override, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Ticket": return cls( id=data["id"], description=data.get("description", ""), status=data.get("status", "todo"), assigned_to=data.get("assigned_to", "unassigned"), priority=data.get("priority", "medium"), target_file=data.get("target_file"), target_symbols=data.get("target_symbols", []), context_requirements=data.get("context_requirements", []), depends_on=data.get("depends_on", []), blocked_reason=data.get("blocked_reason"), step_mode=data.get("step_mode", False), retry_count=data.get("retry_count", 0), manual_block=data.get("manual_block", False), model_override=data.get("model_override"), ) @dataclass class Track: id: str description: str tickets: List[Ticket] = field(default_factory=list) def get_executable_tickets(self) -> List[Ticket]: from src.dag_engine import TrackDAG dag = TrackDAG(self.tickets) return dag.get_ready_tasks() def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "description": self.description, "tickets": [t.to_dict() for t in self.tickets], } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Track": return cls( id=data["id"], description=data.get("description", ""), tickets=[Ticket.from_dict(t) for t in data.get("tickets", [])], ) @dataclass class WorkerContext: ticket_id: str model_name: str messages: List[Dict[str, Any]] = field(default_factory=list) @dataclass class Metadata: id: str name: str status: Optional[str] = None created_at: Optional[datetime.datetime] = None updated_at: Optional[datetime.datetime] = None def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "name": self.name, "status": self.status, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Metadata": created = data.get("created_at") updated = data.get("updated_at") if isinstance(created, str): try: created = datetime.datetime.fromisoformat(created) except ValueError: created = None if isinstance(updated, str): try: updated = datetime.datetime.fromisoformat(updated) except ValueError: updated = None return cls( id=data["id"], name=data.get("name", ""), status=data.get("status"), created_at=created, updated_at=updated, ) @dataclass class TrackState: metadata: Metadata discussion: List[str] = field(default_factory=list) tasks: List[Ticket] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: serialized_discussion = [] for item in self.discussion: if isinstance(item, dict): new_item = dict(item) if "ts" in new_item and isinstance(new_item["ts"], datetime.datetime): new_item["ts"] = new_item["ts"].isoformat() serialized_discussion.append(new_item) else: serialized_discussion.append(item) return { "metadata": self.metadata.to_dict(), "discussion": serialized_discussion, "tasks": [t.to_dict() for t in self.tasks], } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "TrackState": discussion = data.get("discussion", []) parsed_discussion = [] for item in discussion: if isinstance(item, dict): new_item = dict(item) ts = new_item.get("ts") if isinstance(ts, str): try: new_item["ts"] = datetime.datetime.fromisoformat(ts) except ValueError: pass parsed_discussion.append(new_item) else: parsed_discussion.append(item) return cls( metadata=Metadata.from_dict(data["metadata"]), discussion=parsed_discussion, tasks=[Ticket.from_dict(t) for t in data.get("tasks", [])], ) @dataclass class FileItem: path: str auto_aggregate: bool = True force_full: bool = False def to_dict(self) -> Dict[str, Any]: return { "path": self.path, "auto_aggregate": self.auto_aggregate, "force_full": self.force_full, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "FileItem": return cls( path=data["path"], auto_aggregate=data.get("auto_aggregate", True), force_full=data.get("force_full", False), )