Files
manual_slop/project_manager.py
2026-02-21 22:06:57 -05:00

150 lines
5.3 KiB
Python

# project_manager.py
import subprocess
import datetime
import tomllib
import tomli_w
import re
from pathlib import Path
TS_FMT = "%Y-%m-%dT%H:%M:%S"
def now_ts() -> str:
return datetime.datetime.now().strftime(TS_FMT)
def parse_ts(s: str):
try:
return datetime.datetime.strptime(s, TS_FMT)
except Exception:
return None
# ── entry serialisation ──────────────────────────────────────────────────────
def entry_to_str(entry: dict) -> str:
"""Serialise a disc entry dict -> stored string."""
ts = entry.get("ts", "")
role = entry.get("role", "User")
content = entry.get("content", "")
if ts:
return f"@{ts}\n{role}:\n{content}"
return f"{role}:\n{content}"
def str_to_entry(raw: str, roles: list[str]) -> dict:
"""Parse a stored string back to a disc entry dict."""
ts = ""
rest = raw
if rest.startswith("@"):
nl = rest.find("\n")
if nl != -1:
ts = rest[1:nl]
rest = rest[nl + 1:]
known = roles or ["User", "AI", "Vendor API", "System"]
role_pat = re.compile(
r"^(?:\[)?(" + "|".join(re.escape(r) for r in known) + r")(?:\])?:?\s*$",
re.IGNORECASE,
)
parts = rest.split("\n", 1)
matched_role = "User"
content = rest.strip()
if parts:
m = role_pat.match(parts[0].strip())
if m:
raw_role = m.group(1)
matched_role = next((r for r in known if r.lower() == raw_role.lower()), raw_role)
content = parts[1].strip() if len(parts) > 1 else ""
return {"role": matched_role, "content": content, "collapsed": False, "ts": ts}
# ── git helpers ──────────────────────────────────────────────────────────────
def get_git_commit(git_dir: str) -> str:
try:
r = subprocess.run(
["git", "rev-parse", "HEAD"],
capture_output=True, text=True, cwd=git_dir, timeout=5,
)
return r.stdout.strip() if r.returncode == 0 else ""
except Exception:
return ""
def get_git_log(git_dir: str, n: int = 5) -> str:
try:
r = subprocess.run(
["git", "log", "--oneline", f"-{n}"],
capture_output=True, text=True, cwd=git_dir, timeout=5,
)
return r.stdout.strip() if r.returncode == 0 else ""
except Exception:
return ""
# ── default structures ───────────────────────────────────────────────────────
def default_discussion() -> dict:
return {"git_commit": "", "last_updated": now_ts(), "history": []}
def default_project(name: str = "unnamed") -> dict:
return {
"project": {"name": name, "git_dir": "", "system_prompt": ""},
"output": {"namespace": name, "output_dir": "./md_gen"},
"files": {"base_dir": ".", "paths": []},
"screenshots": {"base_dir": ".", "paths": []},
"discussion": {
"roles": ["User", "AI", "Vendor API", "System"],
"active": "main",
"discussions": {"main": default_discussion()},
},
}
# ── load / save ──────────────────────────────────────────────────────────────
def load_project(path) -> dict:
with open(path, "rb") as f:
return tomllib.load(f)
def save_project(proj: dict, path):
with open(path, "wb") as f:
tomli_w.dump(proj, f)
# ── migration helper ─────────────────────────────────────────────────────────
def migrate_from_legacy_config(cfg: dict) -> dict:
"""Build a fresh project dict from a legacy flat config.toml. Does NOT save."""
name = cfg.get("output", {}).get("namespace", "project")
proj = default_project(name)
for key in ("output", "files", "screenshots"):
if key in cfg:
proj[key] = dict(cfg[key])
disc = cfg.get("discussion", {})
proj["discussion"]["roles"] = disc.get("roles", ["User", "AI", "Vendor API", "System"])
main_disc = proj["discussion"]["discussions"]["main"]
main_disc["history"] = disc.get("history", [])
main_disc["last_updated"] = now_ts()
return proj
# ── flat config for aggregate.run() ─────────────────────────────────────────
def flat_config(proj: dict, disc_name: str | None = None) -> dict:
"""Return a flat config dict compatible with aggregate.run()."""
disc_sec = proj.get("discussion", {})
name = disc_name or disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(name, {})
return {
"output": proj.get("output", {}),
"files": proj.get("files", {}),
"screenshots": proj.get("screenshots", {}),
"discussion": {
"roles": disc_sec.get("roles", []),
"history": disc_data.get("history", []),
},
}