# project_manager.py """ Note(Gemini): Handles loading/saving of project .toml configurations. Also handles serializing the discussion history into the TOML format using a special @timestamp prefix to preserve the exact sequence of events. """ import subprocess import datetime import tomllib import tomli_w import re import json from typing import Any, Optional, TYPE_CHECKING, Union from pathlib import Path if TYPE_CHECKING: from models import TrackState TS_FMT: str = "%Y-%m-%dT%H:%M:%S" def now_ts() -> str: return datetime.datetime.now().strftime(TS_FMT) def parse_ts(s: str) -> Optional[datetime.datetime]: try: return datetime.datetime.strptime(s, TS_FMT) except Exception: return None # ── entry serialisation ────────────────────────────────────────────────────── def entry_to_str(entry: dict[str, Any]) -> str: """Serialise a disc entry dict -> stored string.""" ts = entry.get("ts", "") role = entry.get("role", "User") content = entry.get("content", "") if ts: return f"@{ts}\n{role}:\n{content}" return f"{role}:\n{content}" def str_to_entry(raw: str, roles: list[str]) -> dict[str, Any]: """Parse a stored string back to a disc entry dict.""" ts = "" rest = raw if rest.startswith("@"): nl = rest.find("\n") if nl != -1: ts = rest[1:nl] rest = rest[nl + 1:] known = roles or ["User", "AI", "Vendor API", "System"] role_pat = re.compile( r"^(?:\[)?(" + "|".join(re.escape(r) for r in known) + r")(?:\])?:?\s*$", re.IGNORECASE, ) parts = rest.split("\n", 1) matched_role = "User" content = rest.strip() if parts: m = role_pat.match(parts[0].strip()) if m: raw_role = m.group(1) matched_role = next((r for r in known if r.lower() == raw_role.lower()), raw_role) content = parts[1].strip() if len(parts) > 1 else "" return {"role": matched_role, "content": content, "collapsed": False, "ts": ts} # ── git helpers ────────────────────────────────────────────────────────────── def get_git_commit(git_dir: str) -> str: try: r = subprocess.run( ["git", "rev-parse", "HEAD"], capture_output=True, text=True, cwd=git_dir, timeout=5, ) return r.stdout.strip() if r.returncode == 0 else "" except Exception: return "" def get_git_log(git_dir: str, n: int = 5) -> str: try: r = subprocess.run( ["git", "log", "--oneline", f"-{n}"], capture_output=True, text=True, cwd=git_dir, timeout=5, ) return r.stdout.strip() if r.returncode == 0 else "" except Exception: return "" # ── default structures ─────────────────────────────────────────────────────── def default_discussion() -> dict[str, Any]: return {"git_commit": "", "last_updated": now_ts(), "history": []} def default_project(name: str = "unnamed") -> dict[str, Any]: return { "project": {"name": name, "git_dir": "", "system_prompt": "", "main_context": ""}, "output": {"output_dir": "./md_gen"}, "files": {"base_dir": ".", "paths": [], "tier_assignments": {}}, "screenshots": {"base_dir": ".", "paths": []}, "gemini_cli": {"binary_path": "gemini"}, "deepseek": {"reasoning_effort": "medium"}, "agent": { "tools": { "run_powershell": True, "read_file": True, "list_directory": True, "search_files": True, "get_file_summary": True, "web_search": True, "fetch_url": True } }, "discussion": { "roles": ["User", "AI", "Vendor API", "System", "Reasoning"], "active": "main", "discussions": {"main": default_discussion()}, }, "mma": { "epic": "", "active_track_id": "", "tracks": [] } } # ── load / save ────────────────────────────────────────────────────────────── def get_history_path(project_path: Union[str, Path]) -> Path: """Return the Path to the sibling history TOML file for a given project.""" p = Path(project_path) return p.parent / f"{p.stem}_history.toml" def load_project(path: Union[str, Path]) -> dict[str, Any]: """ Load a project TOML file. Automatically migrates legacy 'discussion' keys to a sibling history file. """ with open(path, "rb") as f: proj = tomllib.load(f) hist_path = get_history_path(path) if "discussion" in proj: disc = proj.pop("discussion") with open(hist_path, "wb") as f: tomli_w.dump(disc, f) save_project(proj, path) proj["discussion"] = disc else: if hist_path.exists(): proj["discussion"] = load_history(path) return proj def load_history(project_path: Union[str, Path]) -> dict[str, Any]: """Load the segregated discussion history from its dedicated TOML file.""" hist_path = get_history_path(project_path) if hist_path.exists(): with open(hist_path, "rb") as f: return tomllib.load(f) return {} def clean_nones(data: Any) -> Any: """Recursively remove None values from a dictionary/list.""" if isinstance(data, dict): return {k: clean_nones(v) for k, v in data.items() if v is not None} elif isinstance(data, list): return [clean_nones(v) for v in data if v is not None] return data def save_project(proj: dict[str, Any], path: Union[str, Path], disc_data: Optional[dict[str, Any]] = None) -> None: """ Save the project TOML. If 'discussion' is present in proj, it is moved to the sibling history file. """ proj = clean_nones(proj) if "discussion" in proj: if disc_data is None: disc_data = proj["discussion"] proj = dict(proj) del proj["discussion"] with open(path, "wb") as f: tomli_w.dump(proj, f) if disc_data: disc_data = clean_nones(disc_data) hist_path = get_history_path(path) with open(hist_path, "wb") as f: tomli_w.dump(disc_data, f) # ── migration helper ───────────────────────────────────────────────────────── def migrate_from_legacy_config(cfg: dict[str, Any]) -> dict[str, Any]: """Build a fresh project dict from a legacy flat config.toml. Does NOT save.""" name = cfg.get("output", {}).get("namespace", "project") proj = default_project(name) for key in ("output", "files", "screenshots"): if key in cfg: proj[key] = dict(cfg[key]) disc = cfg.get("discussion", {}) proj["discussion"]["roles"] = disc.get("roles", ["User", "AI", "Vendor API", "System"]) main_disc = proj["discussion"]["discussions"]["main"] main_disc["history"] = disc.get("history", []) main_disc["last_updated"] = now_ts() return proj # ── flat config for aggregate.run() ───────────────────────────────────────── def flat_config(proj: dict[str, Any], disc_name: Optional[str] = None, track_id: Optional[str] = None) -> dict[str, Any]: """Return a flat config dict compatible with aggregate.run().""" disc_sec = proj.get("discussion", {}) if track_id: history = load_track_history(track_id, proj.get("files", {}).get("base_dir", ".")) else: name = disc_name or disc_sec.get("active", "main") disc_data = disc_sec.get("discussions", {}).get(name, {}) history = disc_data.get("history", []) return { "project": proj.get("project", {}), "output": proj.get("output", {}), "files": proj.get("files", {}), "screenshots": proj.get("screenshots", {}), "discussion": { "roles": disc_sec.get("roles", []), "history": history, }, } # ── track state persistence ───────────────────────────────────────────────── def save_track_state(track_id: str, state: 'TrackState', base_dir: Union[str, Path] = ".") -> None: """ Saves a TrackState object to conductor/tracks//state.toml. """ track_dir = Path(base_dir) / "conductor" / "tracks" / track_id track_dir.mkdir(parents=True, exist_ok=True) state_file = track_dir / "state.toml" data = clean_nones(state.to_dict()) with open(state_file, "wb") as f: tomli_w.dump(data, f) def load_track_state(track_id: str, base_dir: Union[str, Path] = ".") -> Optional['TrackState']: """ Loads a TrackState object from conductor/tracks//state.toml. """ from models import TrackState state_file = Path(base_dir) / "conductor" / "tracks" / track_id / "state.toml" if not state_file.exists(): return None with open(state_file, "rb") as f: data = tomllib.load(f) return TrackState.from_dict(data) def load_track_history(track_id: str, base_dir: Union[str, Path] = ".") -> list[str]: """ Loads the discussion history for a specific track from its state.toml. Returns a list of entry strings formatted with @timestamp. """ from models import TrackState state = load_track_state(track_id, base_dir) if not state: return [] history: list[str] = [] for entry in state.discussion: e = dict(entry) ts = e.get("ts") if isinstance(ts, datetime.datetime): e["ts"] = ts.strftime(TS_FMT) history.append(entry_to_str(e)) return history def save_track_history(track_id: str, history: list[str], base_dir: Union[str, Path] = ".") -> None: """ Saves the discussion history for a specific track to its state.toml. 'history' is expected to be a list of formatted strings. """ from models import TrackState state = load_track_state(track_id, base_dir) if not state: return roles = ["User", "AI", "Vendor API", "System", "Reasoning"] entries = [str_to_entry(h, roles) for h in history] state.discussion = entries save_track_state(track_id, state, base_dir) def get_all_tracks(base_dir: Union[str, Path] = ".") -> list[dict[str, Any]]: """ Scans the conductor/tracks/ directory and returns a list of dictionaries containing track metadata: 'id', 'title', 'status', 'complete', 'total', and 'progress' (0.0 to 1.0). Handles missing or malformed metadata.json or state.toml by falling back to available info or defaults. """ from models import TrackState tracks_dir = Path(base_dir) / "conductor" / "tracks" if not tracks_dir.exists(): return [] results: list[dict[str, Any]] = [] for entry in tracks_dir.iterdir(): if not entry.is_dir(): continue track_id = entry.name track_info: dict[str, Any] = { "id": track_id, "title": track_id, "status": "unknown", "complete": 0, "total": 0, "progress": 0.0 } state_found = False try: state = load_track_state(track_id, base_dir) if state: track_info["id"] = state.metadata.id or track_id track_info["title"] = state.metadata.name or track_id track_info["status"] = state.metadata.status or "unknown" track_info["complete"] = len([t for t in state.tasks if t.status == "completed"]) track_info["total"] = len(state.tasks) if track_info["total"] > 0: track_info["progress"] = track_info["complete"] / track_info["total"] state_found = True except Exception: pass if not state_found: metadata_file = entry / "metadata.json" if metadata_file.exists(): try: with open(metadata_file, "r") as f: data = json.load(f) track_info["id"] = data.get("id", data.get("track_id", track_id)) track_info["title"] = data.get("title", data.get("name", data.get("description", track_id))) track_info["status"] = data.get("status", "unknown") except Exception: pass if track_info["total"] == 0: plan_file = entry / "plan.md" if plan_file.exists(): try: with open(plan_file, "r", encoding="utf-8") as f: content = f.read() tasks = re.findall(r"^[ \t]*- \[[ x~]\] .*", content, re.MULTILINE) completed_tasks = re.findall(r"^[ \t]*- \[x\] .*", content, re.MULTILINE) track_info["total"] = len(tasks) track_info["complete"] = len(completed_tasks) if track_info["total"] > 0: track_info["progress"] = float(track_info["complete"]) / track_info["total"] except Exception: pass results.append(track_info) return results