277 lines
7.2 KiB
Python
277 lines
7.2 KiB
Python
from __future__ import annotations
|
|
import tomllib
|
|
import datetime
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Optional, Dict, Any, Union
|
|
from pathlib import Path
|
|
from src.paths import get_config_path
|
|
|
|
CONFIG_PATH = get_config_path()
|
|
|
|
def load_config() -> dict[str, Any]:
|
|
with open(CONFIG_PATH, "rb") as f:
|
|
return tomllib.load(f)
|
|
|
|
def save_config(config: dict[str, Any]) -> None:
|
|
import tomli_w
|
|
with open(CONFIG_PATH, "wb") as f:
|
|
tomli_w.dump(config, f)
|
|
|
|
AGENT_TOOL_NAMES = [
|
|
"run_powershell",
|
|
"read_file",
|
|
"list_directory",
|
|
"search_files",
|
|
"web_search",
|
|
"fetch_url",
|
|
"get_file_summary",
|
|
"py_get_skeleton",
|
|
"py_get_code_outline",
|
|
"py_get_definition",
|
|
"py_get_signature",
|
|
"py_get_class_summary",
|
|
"py_get_var_declaration",
|
|
"py_get_docstring",
|
|
"py_find_usages",
|
|
"py_get_imports",
|
|
"py_check_syntax",
|
|
"py_get_hierarchy"
|
|
]
|
|
|
|
def parse_history_entries(history_strings: list[str], roles: list[str]) -> list[dict[str, Any]]:
|
|
import re
|
|
entries = []
|
|
for raw in history_strings:
|
|
ts = ""
|
|
rest = raw
|
|
if rest.startswith("@"):
|
|
nl = rest.find("\n")
|
|
if nl != -1:
|
|
ts = rest[1:nl]
|
|
rest = rest[nl + 1:]
|
|
known = roles or ["User", "AI", "Vendor API", "System"]
|
|
role_pat = re.compile(r"^(" + "|".join(re.escape(r) for r in known) + r"):", re.IGNORECASE)
|
|
match = role_pat.match(rest)
|
|
role = match.group(1) if match else "User"
|
|
if match:
|
|
content = rest[match.end():].strip()
|
|
else:
|
|
content = rest
|
|
entries.append({"role": role, "content": content, "collapsed": True, "ts": ts})
|
|
return entries
|
|
|
|
@dataclass
|
|
class Ticket:
|
|
id: str
|
|
description: str
|
|
status: str = "todo"
|
|
assigned_to: str = "unassigned"
|
|
priority: str = "medium"
|
|
target_file: Optional[str] = None
|
|
target_symbols: List[str] = field(default_factory=list)
|
|
context_requirements: List[str] = field(default_factory=list)
|
|
depends_on: List[str] = field(default_factory=list)
|
|
blocked_reason: Optional[str] = None
|
|
step_mode: bool = False
|
|
retry_count: int = 0
|
|
manual_block: bool = False
|
|
model_override: Optional[str] = None
|
|
|
|
def mark_blocked(self, reason: str) -> None:
|
|
self.status = "blocked"
|
|
self.blocked_reason = reason
|
|
|
|
def mark_manual_block(self, reason: str) -> None:
|
|
self.status = "blocked"
|
|
self.blocked_reason = f"[MANUAL] {reason}"
|
|
self.manual_block = True
|
|
|
|
def clear_manual_block(self) -> None:
|
|
if self.manual_block:
|
|
self.status = "todo"
|
|
self.blocked_reason = None
|
|
self.manual_block = False
|
|
|
|
def mark_complete(self) -> None:
|
|
self.status = "completed"
|
|
|
|
def get(self, key: str, default: Any = None) -> Any:
|
|
return getattr(self, key, default)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"id": self.id,
|
|
"description": self.description,
|
|
"status": self.status,
|
|
"assigned_to": self.assigned_to,
|
|
"priority": self.priority,
|
|
"target_file": self.target_file,
|
|
"target_symbols": self.target_symbols,
|
|
"context_requirements": self.context_requirements,
|
|
"depends_on": self.depends_on,
|
|
"blocked_reason": self.blocked_reason,
|
|
"step_mode": self.step_mode,
|
|
"retry_count": self.retry_count,
|
|
"manual_block": self.manual_block,
|
|
"model_override": self.model_override,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "Ticket":
|
|
return cls(
|
|
id=data["id"],
|
|
description=data.get("description", ""),
|
|
status=data.get("status", "todo"),
|
|
assigned_to=data.get("assigned_to", "unassigned"),
|
|
priority=data.get("priority", "medium"),
|
|
target_file=data.get("target_file"),
|
|
target_symbols=data.get("target_symbols", []),
|
|
context_requirements=data.get("context_requirements", []),
|
|
depends_on=data.get("depends_on", []),
|
|
blocked_reason=data.get("blocked_reason"),
|
|
step_mode=data.get("step_mode", False),
|
|
retry_count=data.get("retry_count", 0),
|
|
manual_block=data.get("manual_block", False),
|
|
model_override=data.get("model_override"),
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class Track:
|
|
id: str
|
|
description: str
|
|
tickets: List[Ticket] = field(default_factory=list)
|
|
|
|
def get_executable_tickets(self) -> List[Ticket]:
|
|
from src.dag_engine import TrackDAG
|
|
dag = TrackDAG(self.tickets)
|
|
return dag.get_ready_tasks()
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"id": self.id,
|
|
"description": self.description,
|
|
"tickets": [t.to_dict() for t in self.tickets],
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "Track":
|
|
return cls(
|
|
id=data["id"],
|
|
description=data.get("description", ""),
|
|
tickets=[Ticket.from_dict(t) for t in data.get("tickets", [])],
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class WorkerContext:
|
|
ticket_id: str
|
|
model_name: str
|
|
messages: List[Dict[str, Any]] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class Metadata:
|
|
id: str
|
|
name: str
|
|
status: Optional[str] = None
|
|
created_at: Optional[datetime.datetime] = None
|
|
updated_at: Optional[datetime.datetime] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"id": self.id,
|
|
"name": self.name,
|
|
"status": self.status,
|
|
"created_at": self.created_at.isoformat() if self.created_at else None,
|
|
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "Metadata":
|
|
created = data.get("created_at")
|
|
updated = data.get("updated_at")
|
|
if isinstance(created, str):
|
|
try:
|
|
created = datetime.datetime.fromisoformat(created)
|
|
except ValueError:
|
|
created = None
|
|
if isinstance(updated, str):
|
|
try:
|
|
updated = datetime.datetime.fromisoformat(updated)
|
|
except ValueError:
|
|
updated = None
|
|
return cls(
|
|
id=data["id"],
|
|
name=data.get("name", ""),
|
|
status=data.get("status"),
|
|
created_at=created,
|
|
updated_at=updated,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class TrackState:
|
|
metadata: Metadata
|
|
discussion: List[str] = field(default_factory=list)
|
|
tasks: List[Ticket] = field(default_factory=list)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
serialized_discussion = []
|
|
for item in self.discussion:
|
|
if isinstance(item, dict):
|
|
new_item = dict(item)
|
|
if "ts" in new_item and isinstance(new_item["ts"], datetime.datetime):
|
|
new_item["ts"] = new_item["ts"].isoformat()
|
|
serialized_discussion.append(new_item)
|
|
else:
|
|
serialized_discussion.append(item)
|
|
return {
|
|
"metadata": self.metadata.to_dict(),
|
|
"discussion": serialized_discussion,
|
|
"tasks": [t.to_dict() for t in self.tasks],
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "TrackState":
|
|
discussion = data.get("discussion", [])
|
|
parsed_discussion = []
|
|
for item in discussion:
|
|
if isinstance(item, dict):
|
|
new_item = dict(item)
|
|
ts = new_item.get("ts")
|
|
if isinstance(ts, str):
|
|
try:
|
|
new_item["ts"] = datetime.datetime.fromisoformat(ts)
|
|
except ValueError:
|
|
pass
|
|
parsed_discussion.append(new_item)
|
|
else:
|
|
parsed_discussion.append(item)
|
|
return cls(
|
|
metadata=Metadata.from_dict(data["metadata"]),
|
|
discussion=parsed_discussion,
|
|
tasks=[Ticket.from_dict(t) for t in data.get("tasks", [])],
|
|
)
|
|
|
|
@dataclass
|
|
class FileItem:
|
|
path: str
|
|
auto_aggregate: bool = True
|
|
force_full: bool = False
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"path": self.path,
|
|
"auto_aggregate": self.auto_aggregate,
|
|
"force_full": self.force_full,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "FileItem":
|
|
return cls(
|
|
path=data["path"],
|
|
auto_aggregate=data.get("auto_aggregate", True),
|
|
force_full=data.get("force_full", False),
|
|
)
|