feat(models): Add priority field to Ticket dataclass and update serialization

This commit is contained in:
2026-03-07 15:27:30 -05:00
parent e9d9cdeb28
commit 035c74ed36
4 changed files with 229 additions and 189 deletions

View File

@@ -9,230 +9,233 @@ from src.paths import get_config_path
CONFIG_PATH = get_config_path()
def load_config() -> dict[str, Any]:
with open(CONFIG_PATH, "rb") as f:
return tomllib.load(f)
with open(CONFIG_PATH, "rb") as f:
return tomllib.load(f)
def save_config(config: dict[str, Any]) -> None:
import tomli_w
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(config, f)
import tomli_w
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(config, f)
AGENT_TOOL_NAMES = [
"run_powershell",
"read_file",
"list_directory",
"search_files",
"web_search",
"fetch_url",
"get_file_summary",
"py_get_skeleton",
"py_get_code_outline",
"py_get_definition",
"py_get_signature",
"py_get_class_summary",
"py_get_var_declaration",
"py_get_docstring",
"py_find_usages",
"py_get_imports",
"py_check_syntax",
"py_get_hierarchy"
"run_powershell",
"read_file",
"list_directory",
"search_files",
"web_search",
"fetch_url",
"get_file_summary",
"py_get_skeleton",
"py_get_code_outline",
"py_get_definition",
"py_get_signature",
"py_get_class_summary",
"py_get_var_declaration",
"py_get_docstring",
"py_find_usages",
"py_get_imports",
"py_check_syntax",
"py_get_hierarchy"
]
def parse_history_entries(history_strings: list[str], roles: list[str]) -> list[dict[str, Any]]:
import re
entries = []
for raw in history_strings:
ts = ""
rest = raw
if rest.startswith("@"):
nl = rest.find("\n")
if nl != -1:
ts = rest[1:nl]
rest = rest[nl + 1:]
known = roles or ["User", "AI", "Vendor API", "System"]
role_pat = re.compile(r"^(" + "|".join(re.escape(r) for r in known) + r"):", re.IGNORECASE)
match = role_pat.match(rest)
role = match.group(1) if match else "User"
if match:
content = rest[match.end():].strip()
else:
content = rest
entries.append({"role": role, "content": content, "collapsed": True, "ts": ts})
return entries
import re
entries = []
for raw in history_strings:
ts = ""
rest = raw
if rest.startswith("@"):
nl = rest.find("\n")
if nl != -1:
ts = rest[1:nl]
rest = rest[nl + 1:]
known = roles or ["User", "AI", "Vendor API", "System"]
role_pat = re.compile(r"^(" + "|".join(re.escape(r) for r in known) + r"):", re.IGNORECASE)
match = role_pat.match(rest)
role = match.group(1) if match else "User"
if match:
content = rest[match.end():].strip()
else:
content = rest
entries.append({"role": role, "content": content, "collapsed": True, "ts": ts})
return entries
@dataclass
class Ticket:
id: str
description: str
status: str = "todo"
assigned_to: str = "unassigned"
target_file: Optional[str] = None
target_symbols: List[str] = field(default_factory=list)
context_requirements: List[str] = field(default_factory=list)
depends_on: List[str] = field(default_factory=list)
blocked_reason: Optional[str] = None
step_mode: bool = False
retry_count: int = 0
id: str
description: str
status: str = "todo"
assigned_to: str = "unassigned"
priority: str = "medium"
target_file: Optional[str] = None
target_symbols: List[str] = field(default_factory=list)
context_requirements: List[str] = field(default_factory=list)
depends_on: List[str] = field(default_factory=list)
blocked_reason: Optional[str] = None
step_mode: bool = False
retry_count: int = 0
def mark_blocked(self, reason: str) -> None:
self.status = "blocked"
self.blocked_reason = reason
def mark_blocked(self, reason: str) -> None:
self.status = "blocked"
self.blocked_reason = reason
def mark_complete(self) -> None:
self.status = "completed"
def mark_complete(self) -> None:
self.status = "completed"
def get(self, key: str, default: Any = None) -> Any:
return getattr(self, key, default)
def get(self, key: str, default: Any = None) -> Any:
return getattr(self, key, default)
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"description": self.description,
"status": self.status,
"assigned_to": self.assigned_to,
"target_file": self.target_file,
"target_symbols": self.target_symbols,
"context_requirements": self.context_requirements,
"depends_on": self.depends_on,
"blocked_reason": self.blocked_reason,
"step_mode": self.step_mode,
"retry_count": self.retry_count,
}
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"description": self.description,
"status": self.status,
"assigned_to": self.assigned_to,
"priority": self.priority,
"target_file": self.target_file,
"target_symbols": self.target_symbols,
"context_requirements": self.context_requirements,
"depends_on": self.depends_on,
"blocked_reason": self.blocked_reason,
"step_mode": self.step_mode,
"retry_count": self.retry_count,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Ticket":
return cls(
id=data["id"],
description=data.get("description", ""),
status=data.get("status", "todo"),
assigned_to=data.get("assigned_to", ""),
target_file=data.get("target_file"),
target_symbols=data.get("target_symbols", []),
context_requirements=data.get("context_requirements", []),
depends_on=data.get("depends_on", []),
blocked_reason=data.get("blocked_reason"),
step_mode=data.get("step_mode", False),
retry_count=data.get("retry_count", 0),
)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Ticket":
return cls(
id=data["id"],
description=data.get("description", ""),
status=data.get("status", "todo"),
assigned_to=data.get("assigned_to", "unassigned"),
priority=data.get("priority", "medium"),
target_file=data.get("target_file"),
target_symbols=data.get("target_symbols", []),
context_requirements=data.get("context_requirements", []),
depends_on=data.get("depends_on", []),
blocked_reason=data.get("blocked_reason"),
step_mode=data.get("step_mode", False),
retry_count=data.get("retry_count", 0),
)
@dataclass
class Track:
id: str
description: str
tickets: List[Ticket] = field(default_factory=list)
id: str
description: str
tickets: List[Ticket] = field(default_factory=list)
def get_executable_tickets(self) -> List[Ticket]:
from src.dag_engine import TrackDAG
dag = TrackDAG(self.tickets)
return dag.get_ready_tasks()
def get_executable_tickets(self) -> List[Ticket]:
from src.dag_engine import TrackDAG
dag = TrackDAG(self.tickets)
return dag.get_ready_tasks()
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"description": self.description,
"tickets": [t.to_dict() for t in self.tickets],
}
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"description": self.description,
"tickets": [t.to_dict() for t in self.tickets],
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Track":
return cls(
id=data["id"],
description=data.get("description", ""),
tickets=[Ticket.from_dict(t) for t in data.get("tickets", [])],
)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Track":
return cls(
id=data["id"],
description=data.get("description", ""),
tickets=[Ticket.from_dict(t) for t in data.get("tickets", [])],
)
@dataclass
class WorkerContext:
ticket_id: str
model_name: str
messages: List[Dict[str, Any]] = field(default_factory=list)
ticket_id: str
model_name: str
messages: List[Dict[str, Any]] = field(default_factory=list)
@dataclass
class Metadata:
id: str
name: str
status: Optional[str] = None
created_at: Optional[datetime.datetime] = None
updated_at: Optional[datetime.datetime] = None
id: str
name: str
status: Optional[str] = None
created_at: Optional[datetime.datetime] = None
updated_at: Optional[datetime.datetime] = None
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"status": self.status,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"status": self.status,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Metadata":
created = data.get("created_at")
updated = data.get("updated_at")
if isinstance(created, str):
try:
created = datetime.datetime.fromisoformat(created)
except ValueError:
created = None
if isinstance(updated, str):
try:
updated = datetime.datetime.fromisoformat(updated)
except ValueError:
updated = None
return cls(
id=data["id"],
name=data.get("name", ""),
status=data.get("status"),
created_at=created,
updated_at=updated,
)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Metadata":
created = data.get("created_at")
updated = data.get("updated_at")
if isinstance(created, str):
try:
created = datetime.datetime.fromisoformat(created)
except ValueError:
created = None
if isinstance(updated, str):
try:
updated = datetime.datetime.fromisoformat(updated)
except ValueError:
updated = None
return cls(
id=data["id"],
name=data.get("name", ""),
status=data.get("status"),
created_at=created,
updated_at=updated,
)
@dataclass
class TrackState:
metadata: Metadata
discussion: List[str] = field(default_factory=list)
tasks: List[Ticket] = field(default_factory=list)
metadata: Metadata
discussion: List[str] = field(default_factory=list)
tasks: List[Ticket] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
serialized_discussion = []
for item in self.discussion:
if isinstance(item, dict):
new_item = dict(item)
if "ts" in new_item and isinstance(new_item["ts"], datetime.datetime):
new_item["ts"] = new_item["ts"].isoformat()
serialized_discussion.append(new_item)
else:
serialized_discussion.append(item)
return {
"metadata": self.metadata.to_dict(),
"discussion": serialized_discussion,
"tasks": [t.to_dict() for t in self.tasks],
}
def to_dict(self) -> Dict[str, Any]:
serialized_discussion = []
for item in self.discussion:
if isinstance(item, dict):
new_item = dict(item)
if "ts" in new_item and isinstance(new_item["ts"], datetime.datetime):
new_item["ts"] = new_item["ts"].isoformat()
serialized_discussion.append(new_item)
else:
serialized_discussion.append(item)
return {
"metadata": self.metadata.to_dict(),
"discussion": serialized_discussion,
"tasks": [t.to_dict() for t in self.tasks],
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TrackState":
discussion = data.get("discussion", [])
parsed_discussion = []
for item in discussion:
if isinstance(item, dict):
new_item = dict(item)
ts = new_item.get("ts")
if isinstance(ts, str):
try:
new_item["ts"] = datetime.datetime.fromisoformat(ts)
except ValueError:
pass
parsed_discussion.append(new_item)
else:
parsed_discussion.append(item)
return cls(
metadata=Metadata.from_dict(data["metadata"]),
discussion=parsed_discussion,
tasks=[Ticket.from_dict(t) for t in data.get("tasks", [])],
)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TrackState":
discussion = data.get("discussion", [])
parsed_discussion = []
for item in discussion:
if isinstance(item, dict):
new_item = dict(item)
ts = new_item.get("ts")
if isinstance(ts, str):
try:
new_item["ts"] = datetime.datetime.fromisoformat(ts)
except ValueError:
pass
parsed_discussion.append(new_item)
else:
parsed_discussion.append(item)
return cls(
metadata=Metadata.from_dict(data["metadata"]),
discussion=parsed_discussion,
tasks=[Ticket.from_dict(t) for t in data.get("tasks", [])],
)
@dataclass
class FileItem: