Private
Public Access
0
0
Files
manual_slop/scripts/audit/generate_chronology.py
T

339 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""Generate chronology draft for Manual Slop conductor tracks.
Walks conductor/tracks/ and conductor/archive/, extracts per-track data
(date, ID, status, summary, commit range), and emits a draft to stdout.
The script is READ-ONLY on the source folders. It writes to stdout only.
The human cross-check (FR6 of the chronology_20260619 track) is the authority;
this script is a starting point, not the canonical source.
Usage:
uv run python scripts/audit/generate_chronology.py --draft
uv run python scripts/audit/generate_chronology.py --root conductor/
uv run python scripts/audit/generate_chronology.py # JSON dump
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from pathlib import Path
from typing import Optional
_SLUG_DATE_RE = re.compile(r"\d{8}$")
_SENTENCE_END_RE = re.compile(r"\.\s")
_GIT_TIMEOUT = 30
_DEFAULT_ROOT = "conductor/"
def extract_slug_date(folder_name: str) -> Optional[str]:
m = _SLUG_DATE_RE.search(folder_name)
if not m:
return None
raw: str = m.group(0)
return f"{raw[:4]}-{raw[4:6]}-{raw[6:]}"
def _md_escape(text: str) -> str:
return text.replace("|", "\\|").replace("\n", " ").replace("\r", " ")
def _to_posix(path_str: str) -> str:
return path_str.replace("\\", "/")
def _first_sentence(line: str) -> str:
m = _SENTENCE_END_RE.search(line)
if m:
return line[: m.start() + 1].strip()
return line.strip()
def _truncate_to_25_words(text: str) -> str:
words: list[str] = text.split()
if len(words) <= 25:
return text
return " ".join(words[:25]) + "\u2026"
def extract_summary(folder_path: Path) -> str:
md_path = folder_path / "metadata.json"
if md_path.is_file():
try:
data = json.loads(md_path.read_text(encoding="utf-8"))
desc = str(data.get("description", "")).strip()
if desc:
return desc
except (json.JSONDecodeError, OSError):
pass
for fname in ("spec.md", "plan.md"):
fpath = folder_path / fname
if not fpath.is_file():
continue
try:
text = fpath.read_text(encoding="utf-8")
except OSError:
continue
for line in text.splitlines():
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("#"):
continue
if stripped.startswith(">"):
continue
bare = stripped.lstrip(">").strip()
if bare.startswith("**Status:**") or bare.startswith("**Track ID:**") or bare.startswith("**Track:**"):
continue
return _truncate_to_25_words(_first_sentence(bare))
return "Imported from archive (no spec)"
def _git_log(folder_relpath: str, *args: str) -> str:
try:
result = subprocess.run(
["git", "log", *args, "--", folder_relpath],
capture_output=True,
text=True,
timeout=_GIT_TIMEOUT,
check=False,
)
if result.returncode != 0:
return ""
return result.stdout
except (subprocess.SubprocessError, OSError):
return ""
def _git_first_line(folder_relpath: str, *args: str) -> str:
out = _git_log(folder_relpath, *args)
stripped = out.strip()
if not stripped:
return ""
return stripped.splitlines()[0]
def _repo_root(start: Path) -> Path:
try:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
timeout=10,
check=False,
cwd=str(start),
)
if result.returncode == 0 and result.stdout.strip():
return Path(result.stdout.strip())
except (subprocess.SubprocessError, OSError):
pass
return start.parent
def _git_log(folder_relpath: str, *args: str) -> str:
try:
result = subprocess.run(
["git", "log", *args, "--", folder_relpath],
capture_output=True,
text=True,
timeout=_GIT_TIMEOUT,
check=False,
)
if result.returncode != 0:
return ""
return result.stdout
except (subprocess.SubprocessError, OSError):
return ""
def _git_first_line(folder_relpath: str, *args: str) -> str:
out = _git_log(folder_relpath, *args)
stripped = out.strip()
if not stripped:
return ""
return stripped.splitlines()[0]
def _repo_root(start: Path) -> Path:
try:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
timeout=10,
check=False,
cwd=str(start),
)
if result.returncode == 0 and result.stdout.strip():
return Path(result.stdout.strip())
except (subprocess.SubprocessError, OSError):
pass
return start.parent
def _parse_state_phase(state_path: Path) -> str:
if not state_path.is_file():
return "no-state-toml"
try:
for line in state_path.read_text(encoding="utf-8").splitlines():
if line.startswith("current_phase"):
v = line.split("=", 1)[1].strip().split("#")[0].strip().strip('"')
return v
except (subprocess.SubprocessError, OSError, Exception):
pass
return "?"
def _last_commit_date(folder_relpath: str) -> str:
try:
result = subprocess.run(
["git", "log", "-1", "--format=%ad", "--date=short", "--", folder_relpath],
capture_output=True, text=True, timeout=_GIT_TIMEOUT, check=False,
)
return result.stdout.strip()
except (subprocess.SubprocessError, OSError):
return "never"
def _classify_status(folder_link: str, current: str, track_id: str) -> str:
"""Per-row manual review classification (FR6 hard gate).
Logic (per user directive 2026-06-20):
- PLACEHOLDER tracks: keep as is
- archive/ folder: default to Completed (the work was done and archived; metadata status may be stale)
- tracks/ folder + state_phase=complete OR chrono in {completed, complete, shipped}: Completed
- tracks/ folder + everything else: keep original chrono status (in flight)
- Abandoned is reserved for explicit user marking; the script does NOT auto-mark.
Note: "Completed" (not "Shipped") is the canonical term per user directive 2026-06-20.
This is a side-project, not a shipped product.
"""
if "PLACEHOLDER" in track_id:
return current
if "contingency" in current.lower():
return current
is_archive = folder_link.startswith("conductor/archive/")
is_tracks = folder_link.startswith("conductor/tracks/")
if is_archive:
return "Completed"
folder = Path(folder_link)
state_phase = _parse_state_phase(folder / "state.toml") if is_tracks else "?"
chrono_lower = current.lower()
is_completed = chrono_lower in {"completed", "complete", "shipped"} or state_phase in {"complete", '"complete"'}
if is_tracks and is_completed:
return "Completed"
return current
def walk_track_folders(root: Path) -> list[dict]:
repo_root: Path = _repo_root(root)
rows: list[dict] = []
for parent_dir, default_status in (
(root / "tracks", "Active"),
(root / "archive", "Completed"),
):
if not parent_dir.is_dir():
continue
for folder in sorted(parent_dir.iterdir()):
if not folder.is_dir():
continue
try:
folder_relpath = _to_posix(str(folder.relative_to(repo_root)))
except ValueError:
folder_relpath = _to_posix(str(folder))
track_id: str = folder.name
slug_date = extract_slug_date(track_id)
if slug_date:
date = slug_date
else:
first_commit = _git_first_line(folder_relpath, "--reverse", "--format=%aI")
date = first_commit[:10] if first_commit else ""
metadata_path = folder / "metadata.json"
status: str = default_status
if metadata_path.is_file():
try:
data = json.loads(metadata_path.read_text(encoding="utf-8"))
meta_status = str(data.get("status", "")).strip()
if meta_status:
status = meta_status
except (json.JSONDecodeError, OSError):
pass
status = _classify_status(folder_relpath, status, track_id)
summary: str = extract_summary(folder)
init_sha: str = _git_first_line(folder_relpath, "--reverse", "--format=%h")
end_sha: str = _git_first_line(folder_relpath, "-1", "--format=%h")
if init_sha and end_sha:
range_log = _git_log(folder_relpath, "--oneline", f"{init_sha}..{end_sha}")
commit_count: int = range_log.count("\n") + (1 if init_sha != end_sha else 0)
else:
fallback_log = _git_log(folder_relpath, "--oneline")
commit_count = fallback_log.count("\n")
try:
folder_link = _to_posix(str(folder.relative_to(repo_root)))
except ValueError:
folder_link = _to_posix(str(folder))
rows.append({
"date": date,
"track_id": track_id,
"status": status,
"summary": summary,
"init_sha": init_sha,
"end_sha": end_sha,
"commit_count": commit_count,
"folder_link": folder_link,
})
rows.sort(key=lambda r: r["track_id"])
rows.sort(key=lambda r: r["date"], reverse=True)
return rows
def format_markdown(rows: list[dict]) -> str:
lines: list[str] = [
"| Date | ID | Status | Summary | Folder | Range |",
"| --- | --- | --- | --- | --- | --- |",
]
for row in rows:
range_str: str = f"`{row['init_sha']}..{row['end_sha']}` ({row['commit_count']})"
lines.append(
f"| {row['date']} | `{row['track_id']}` | {row['status']} | "
f"{_md_escape(row['summary'])} | `{row['folder_link']}` | {range_str} |"
)
return "\n".join(lines) + "\n"
def main() -> None:
if hasattr(sys.stdout, "reconfigure"):
try:
sys.stdout.reconfigure(encoding="utf-8")
except (OSError, ValueError):
pass
parser = argparse.ArgumentParser(
description="Generate chronology draft for Manual Slop conductor tracks.",
)
parser.add_argument(
"--draft",
action="store_true",
help="Emit markdown draft table to stdout.",
)
parser.add_argument(
"--root",
default=_DEFAULT_ROOT,
help=f"Path to conductor root (default: {_DEFAULT_ROOT}).",
)
args = parser.parse_args()
root = Path(args.root)
if not root.is_absolute():
root = Path.cwd() / root
rows = walk_track_folders(root)
if args.draft:
sys.stdout.write(format_markdown(rows))
else:
sys.stdout.write(json.dumps(rows, indent=2))
if __name__ == "__main__":
main()