wip multi-project
This commit is contained in:
@@ -1,31 +1,5 @@
|
||||
# project_manager.py
|
||||
"""
|
||||
Project-level config management for manual_slop.
|
||||
|
||||
A project file (e.g. manual_slop.toml) stores everything project-specific:
|
||||
[project] name, git_dir
|
||||
[output] namespace, output_dir
|
||||
[files] base_dir, paths
|
||||
[screenshots] base_dir, paths
|
||||
[discussion]
|
||||
roles = [...]
|
||||
active = "main"
|
||||
[discussion.discussions.main]
|
||||
git_commit = ""
|
||||
last_updated = ""
|
||||
history = [...] # each entry: "@ISO\\nRole:\\ncontent"
|
||||
|
||||
The global config.toml keeps:
|
||||
[projects] paths = [...], active = "..."
|
||||
[ai] provider, model
|
||||
[theme] palette, font_path, font_size, scale
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import datetime
|
||||
import tomllib
|
||||
import tomli_w
|
||||
import re
|
||||
import subprocess, datetime, tomllib, tomli_w, re
|
||||
from pathlib import Path
|
||||
|
||||
TS_FMT = "%Y-%m-%dT%H:%M:%S"
|
||||
@@ -33,13 +7,14 @@ TS_FMT = "%Y-%m-%dT%H:%M:%S"
|
||||
def now_ts() -> str:
|
||||
return datetime.datetime.now().strftime(TS_FMT)
|
||||
|
||||
def parse_ts(s: str) -> datetime.datetime | None:
|
||||
try:
|
||||
return datetime.datetime.strptime(s, TS_FMT)
|
||||
except Exception:
|
||||
return None
|
||||
def parse_ts(s: str):
|
||||
try: return datetime.datetime.strptime(s, TS_FMT)
|
||||
except: return None
|
||||
|
||||
# ── entry serialisation ───────────────────────────────────────────────────────
|
||||
|
||||
def entry_to_str(entry: dict) -> str:
|
||||
"""Serialise a disc entry dict -> stored string. Always uses real newlines."""
|
||||
ts = entry.get("ts", "")
|
||||
role = entry.get("role", "User")
|
||||
content = entry.get("content", "")
|
||||
@@ -48,99 +23,106 @@ def entry_to_str(entry: dict) -> str:
|
||||
return f"{role}:\n{content}"
|
||||
|
||||
def str_to_entry(raw: str, roles: list[str]) -> dict:
|
||||
"""Parse a stored string back to a disc entry dict."""
|
||||
ts = ""
|
||||
rest = raw
|
||||
# Strip optional leading @timestamp line
|
||||
if rest.startswith("@"):
|
||||
nl = rest.find("\n")
|
||||
if nl != -1:
|
||||
ts = rest[1:nl]
|
||||
rest = rest[nl + 1:]
|
||||
known = roles if roles else ["User", "AI", "Vendor API", "System"]
|
||||
known = roles or ["User", "AI", "Vendor API", "System"]
|
||||
role_pat = re.compile(
|
||||
r"^(?:\\[)?(" + "|".join(re.escape(r) for r in known) + r")(?:\\])?:?\\s*$",
|
||||
r"^(?:\[)?(" + "|".join(re.escape(r) for r in known) + r")(?:\])?:?\s*$",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
lines = rest.split("\n", 1)
|
||||
parts = rest.split("\n", 1)
|
||||
matched_role = "User"
|
||||
content = rest.strip()
|
||||
if lines:
|
||||
m = role_pat.match(lines[0].strip())
|
||||
if parts:
|
||||
m = role_pat.match(parts[0].strip())
|
||||
if m:
|
||||
raw_role = m.group(1)
|
||||
matched_role = next((r for r in known if r.lower() == raw_role.lower()), raw_role)
|
||||
content = lines[1].strip() if len(lines) > 1 else ""
|
||||
content = parts[1].strip() if len(parts) > 1 else ""
|
||||
return {"role": matched_role, "content": content, "collapsed": False, "ts": ts}
|
||||
|
||||
# ── git helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
def get_git_commit(git_dir: str) -> str:
|
||||
try:
|
||||
r = subprocess.run(["git", "rev-parse", "HEAD"],
|
||||
capture_output=True, text=True, cwd=git_dir, timeout=5)
|
||||
if r.returncode == 0:
|
||||
return r.stdout.strip()
|
||||
return r.stdout.strip() if r.returncode == 0 else ""
|
||||
except Exception:
|
||||
pass
|
||||
return ""
|
||||
return ""
|
||||
|
||||
def get_git_log(git_dir: str, n: int = 5) -> str:
|
||||
try:
|
||||
r = subprocess.run(["git", "log", "--oneline", f"-{n}"],
|
||||
capture_output=True, text=True, cwd=git_dir, timeout=5)
|
||||
if r.returncode == 0:
|
||||
return r.stdout.strip()
|
||||
return r.stdout.strip() if r.returncode == 0 else ""
|
||||
except Exception:
|
||||
pass
|
||||
return ""
|
||||
return ""
|
||||
|
||||
# ── default structures ────────────────────────────────────────────────────────
|
||||
|
||||
def default_discussion() -> dict:
|
||||
return {"git_commit": "", "last_updated": now_ts(), "history": []}
|
||||
|
||||
def default_project(name: str = "unnamed") -> dict:
|
||||
return {
|
||||
"project": {"name": name, "git_dir": ""},
|
||||
"output": {"namespace": name, "output_dir": "./md_gen"},
|
||||
"files": {"base_dir": ".", "paths": []},
|
||||
"screenshots": {"base_dir": ".", "paths": []},
|
||||
"discussion": {
|
||||
"roles": ["User", "AI", "Vendor API", "System"],
|
||||
"active": "main",
|
||||
"project": {"name": name, "git_dir": ""},
|
||||
"output": {"namespace": name, "output_dir": "./md_gen"},
|
||||
"files": {"base_dir": ".", "paths": []},
|
||||
"screenshots": {"base_dir": ".", "paths": []},
|
||||
"discussion": {
|
||||
"roles": ["User", "AI", "Vendor API", "System"],
|
||||
"active": "main",
|
||||
"discussions": {"main": default_discussion()},
|
||||
},
|
||||
}
|
||||
|
||||
def load_project(path: str | Path) -> dict:
|
||||
# ── load / save ───────────────────────────────────────────────────────────────
|
||||
|
||||
def load_project(path) -> dict:
|
||||
with open(path, "rb") as f:
|
||||
return tomllib.load(f)
|
||||
|
||||
def save_project(proj: dict, path: str | Path):
|
||||
def save_project(proj: dict, path):
|
||||
with open(path, "wb") as f:
|
||||
tomli_w.dump(proj, f)
|
||||
|
||||
# ── migration helper ─────────────────────────────────────────────────────────
|
||||
|
||||
def migrate_from_legacy_config(cfg: dict) -> dict:
|
||||
"""Build a project dict from a legacy config.toml. Does NOT save it."""
|
||||
"""Build a fresh project dict from a legacy flat config.toml. Does NOT save."""
|
||||
name = cfg.get("output", {}).get("namespace", "project")
|
||||
proj = default_project(name)
|
||||
if "output" in cfg: proj["output"] = dict(cfg["output"])
|
||||
if "files" in cfg: proj["files"] = dict(cfg["files"])
|
||||
if "screenshots" in cfg: proj["screenshots"] = dict(cfg["screenshots"])
|
||||
disc_cfg = cfg.get("discussion", {})
|
||||
roles = disc_cfg.get("roles", ["User", "AI", "Vendor API", "System"])
|
||||
old_history = disc_cfg.get("history", [])
|
||||
proj["discussion"]["roles"] = roles
|
||||
proj["discussion"]["discussions"]["main"]["history"] = old_history
|
||||
proj["discussion"]["discussions"]["main"]["last_updated"] = now_ts()
|
||||
for key in ("output", "files", "screenshots"):
|
||||
if key in cfg:
|
||||
proj[key] = dict(cfg[key])
|
||||
disc = cfg.get("discussion", {})
|
||||
proj["discussion"]["roles"] = disc.get("roles", ["User", "AI", "Vendor API", "System"])
|
||||
main = proj["discussion"]["discussions"]["main"]
|
||||
main["history"] = disc.get("history", [])
|
||||
main["last_updated"] = now_ts()
|
||||
return proj
|
||||
|
||||
# ── flat config for aggregate.run() ─────────────────────────────────────────
|
||||
|
||||
def flat_config(proj: dict, disc_name: str | None = None) -> dict:
|
||||
"""Return a flat config dict compatible with aggregate.run()."""
|
||||
disc_section = proj.get("discussion", {})
|
||||
name = disc_name or disc_section.get("active", "main")
|
||||
disc_data = disc_section.get("discussions", {}).get(name, {})
|
||||
disc_sec = proj.get("discussion", {})
|
||||
name = disc_name or disc_sec.get("active", "main")
|
||||
disc_data = disc_sec.get("discussions", {}).get(name, {})
|
||||
return {
|
||||
"output": proj.get("output", {}),
|
||||
"files": proj.get("files", {}),
|
||||
"screenshots": proj.get("screenshots", {}),
|
||||
"discussion": {
|
||||
"roles": disc_section.get("roles", []),
|
||||
"discussion": {
|
||||
"roles": disc_sec.get("roles", []),
|
||||
"history": disc_data.get("history", []),
|
||||
},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user