#!/usr/bin/env python3 """Generate chronology draft for Manual Slop conductor tracks. Walks conductor/tracks/ and conductor/archive/, extracts per-track data (date, ID, status, summary, commit range), and emits a draft to stdout. The script is READ-ONLY on the source folders. It writes to stdout only. The human cross-check (FR6 of the chronology_20260619 track) is the authority; this script is a starting point, not the canonical source. Usage: uv run python scripts/audit/generate_chronology.py --draft uv run python scripts/audit/generate_chronology.py --root conductor/ uv run python scripts/audit/generate_chronology.py # JSON dump """ from __future__ import annotations import argparse import json import re import subprocess import sys from pathlib import Path from typing import Optional _SLUG_DATE_RE = re.compile(r"\d{8}$") _SENTENCE_END_RE = re.compile(r"\.\s") _GIT_TIMEOUT = 30 _DEFAULT_ROOT = "conductor/" def extract_slug_date(folder_name: str) -> Optional[str]: m = _SLUG_DATE_RE.search(folder_name) if not m: return None raw: str = m.group(0) return f"{raw[:4]}-{raw[4:6]}-{raw[6:]}" def _md_escape(text: str) -> str: return text.replace("|", "\\|").replace("\n", " ").replace("\r", " ") def _to_posix(path_str: str) -> str: return path_str.replace("\\", "/") def _first_sentence(line: str) -> str: m = _SENTENCE_END_RE.search(line) if m: return line[: m.start() + 1].strip() return line.strip() def _truncate_to_25_words(text: str) -> str: words: list[str] = text.split() if len(words) <= 25: return text return " ".join(words[:25]) + "\u2026" def extract_summary(folder_path: Path) -> str: md_path = folder_path / "metadata.json" if md_path.is_file(): try: data = json.loads(md_path.read_text(encoding="utf-8")) desc = str(data.get("description", "")).strip() if desc: return desc except (json.JSONDecodeError, OSError): pass for fname in ("spec.md", "plan.md"): fpath = folder_path / fname if not fpath.is_file(): continue try: text = fpath.read_text(encoding="utf-8") except OSError: continue for line in text.splitlines(): stripped = line.strip() if not stripped: continue if stripped.startswith("#"): continue if stripped.startswith(">"): continue bare = stripped.lstrip(">").strip() if bare.startswith("**Status:**") or bare.startswith("**Track ID:**") or bare.startswith("**Track:**"): continue return _truncate_to_25_words(_first_sentence(bare)) return "Imported from archive (no spec)" def _git_log(folder_relpath: str, *args: str) -> str: try: result = subprocess.run( ["git", "log", *args, "--", folder_relpath], capture_output=True, text=True, timeout=_GIT_TIMEOUT, check=False, ) if result.returncode != 0: return "" return result.stdout except (subprocess.SubprocessError, OSError): return "" def _git_first_line(folder_relpath: str, *args: str) -> str: out = _git_log(folder_relpath, *args) stripped = out.strip() if not stripped: return "" return stripped.splitlines()[0] def _repo_root(start: Path) -> Path: try: result = subprocess.run( ["git", "rev-parse", "--show-toplevel"], capture_output=True, text=True, timeout=10, check=False, cwd=str(start), ) if result.returncode == 0 and result.stdout.strip(): return Path(result.stdout.strip()) except (subprocess.SubprocessError, OSError): pass return start.parent def _git_log(folder_relpath: str, *args: str) -> str: try: result = subprocess.run( ["git", "log", *args, "--", folder_relpath], capture_output=True, text=True, timeout=_GIT_TIMEOUT, check=False, ) if result.returncode != 0: return "" return result.stdout except (subprocess.SubprocessError, OSError): return "" def _git_first_line(folder_relpath: str, *args: str) -> str: out = _git_log(folder_relpath, *args) stripped = out.strip() if not stripped: return "" return stripped.splitlines()[0] def _repo_root(start: Path) -> Path: try: result = subprocess.run( ["git", "rev-parse", "--show-toplevel"], capture_output=True, text=True, timeout=10, check=False, cwd=str(start), ) if result.returncode == 0 and result.stdout.strip(): return Path(result.stdout.strip()) except (subprocess.SubprocessError, OSError): pass return start.parent def _parse_state_phase(state_path: Path) -> str: if not state_path.is_file(): return "no-state-toml" try: for line in state_path.read_text(encoding="utf-8").splitlines(): if line.startswith("current_phase"): v = line.split("=", 1)[1].strip().split("#")[0].strip().strip('"') return v except (subprocess.SubprocessError, OSError, Exception): pass return "?" def _last_commit_date(folder_relpath: str) -> str: try: result = subprocess.run( ["git", "log", "-1", "--format=%ad", "--date=short", "--", folder_relpath], capture_output=True, text=True, timeout=_GIT_TIMEOUT, check=False, ) return result.stdout.strip() except (subprocess.SubprocessError, OSError): return "never" def _classify_status(folder_link: str, current: str, track_id: str) -> str: """Per-row manual review classification (FR6 hard gate). Logic (per user directive 2026-06-20): - PLACEHOLDER tracks: keep as is - archive/ folder: default to Completed (the work was done and archived; metadata status may be stale) - tracks/ folder + state_phase=complete OR chrono in {completed, complete, shipped}: Completed - tracks/ folder + everything else: keep original chrono status (in flight) - Abandoned is reserved for explicit user marking; the script does NOT auto-mark. Note: "Completed" (not "Shipped") is the canonical term per user directive 2026-06-20. This is a side-project, not a shipped product. """ if "PLACEHOLDER" in track_id: return current if "contingency" in current.lower(): return current is_archive = folder_link.startswith("conductor/archive/") is_tracks = folder_link.startswith("conductor/tracks/") if is_archive: return "Completed" folder = Path(folder_link) state_phase = _parse_state_phase(folder / "state.toml") if is_tracks else "?" chrono_lower = current.lower() is_completed = chrono_lower in {"completed", "complete", "shipped"} or state_phase in {"complete", '"complete"'} if is_tracks and is_completed: return "Completed" return current def walk_track_folders(root: Path) -> list[dict]: repo_root: Path = _repo_root(root) rows: list[dict] = [] for parent_dir, default_status in ( (root / "tracks", "Active"), (root / "archive", "Completed"), ): if not parent_dir.is_dir(): continue for folder in sorted(parent_dir.iterdir()): if not folder.is_dir(): continue try: folder_relpath = _to_posix(str(folder.relative_to(repo_root))) except ValueError: folder_relpath = _to_posix(str(folder)) track_id: str = folder.name slug_date = extract_slug_date(track_id) if slug_date: date = slug_date else: first_commit = _git_first_line(folder_relpath, "--reverse", "--format=%aI") date = first_commit[:10] if first_commit else "" metadata_path = folder / "metadata.json" status: str = default_status if metadata_path.is_file(): try: data = json.loads(metadata_path.read_text(encoding="utf-8")) meta_status = str(data.get("status", "")).strip() if meta_status: status = meta_status except (json.JSONDecodeError, OSError): pass status = _classify_status(folder_relpath, status, track_id) summary: str = extract_summary(folder) init_sha: str = _git_first_line(folder_relpath, "--reverse", "--format=%h") end_sha: str = _git_first_line(folder_relpath, "-1", "--format=%h") if init_sha and end_sha: range_log = _git_log(folder_relpath, "--oneline", f"{init_sha}..{end_sha}") commit_count: int = range_log.count("\n") + (1 if init_sha != end_sha else 0) else: fallback_log = _git_log(folder_relpath, "--oneline") commit_count = fallback_log.count("\n") try: folder_link = _to_posix(str(folder.relative_to(repo_root))) except ValueError: folder_link = _to_posix(str(folder)) rows.append({ "date": date, "track_id": track_id, "status": status, "summary": summary, "init_sha": init_sha, "end_sha": end_sha, "commit_count": commit_count, "folder_link": folder_link, }) rows.sort(key=lambda r: r["track_id"]) rows.sort(key=lambda r: r["date"], reverse=True) return rows def format_markdown(rows: list[dict]) -> str: lines: list[str] = [ "| Date | ID | Status | Summary | Folder | Range |", "| --- | --- | --- | --- | --- | --- |", ] for row in rows: range_str: str = f"`{row['init_sha']}..{row['end_sha']}` ({row['commit_count']})" lines.append( f"| {row['date']} | `{row['track_id']}` | {row['status']} | " f"{_md_escape(row['summary'])} | `{row['folder_link']}` | {range_str} |" ) return "\n".join(lines) + "\n" def main() -> None: if hasattr(sys.stdout, "reconfigure"): try: sys.stdout.reconfigure(encoding="utf-8") except (OSError, ValueError): pass parser = argparse.ArgumentParser( description="Generate chronology draft for Manual Slop conductor tracks.", ) parser.add_argument( "--draft", action="store_true", help="Emit markdown draft table to stdout.", ) parser.add_argument( "--root", default=_DEFAULT_ROOT, help=f"Path to conductor root (default: {_DEFAULT_ROOT}).", ) args = parser.parse_args() root = Path(args.root) if not root.is_absolute(): root = Path.cwd() / root rows = walk_track_folders(root) if args.draft: sys.stdout.write(format_markdown(rows)) else: sys.stdout.write(json.dumps(rows, indent=2)) if __name__ == "__main__": main()