feat(chronology): add draft-only helper script (FR5)
This commit is contained in:
@@ -0,0 +1,235 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Generate chronology draft for Manual Slop conductor tracks.
|
||||
|
||||
Walks conductor/tracks/ and conductor/archive/, extracts per-track data
|
||||
(date, ID, status, summary, commit range), and emits a draft to stdout.
|
||||
|
||||
The script is READ-ONLY on the source folders. It writes to stdout only.
|
||||
The human cross-check (FR6 of the chronology_20260619 track) is the authority;
|
||||
this script is a starting point, not the canonical source.
|
||||
|
||||
Usage:
|
||||
uv run python scripts/audit/generate_chronology.py --draft
|
||||
uv run python scripts/audit/generate_chronology.py --root conductor/
|
||||
uv run python scripts/audit/generate_chronology.py # JSON dump
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
_SLUG_DATE_RE = re.compile(r"\d{8}$")
|
||||
_SENTENCE_END_RE = re.compile(r"\.\s")
|
||||
_GIT_TIMEOUT = 30
|
||||
_DEFAULT_ROOT = "conductor/"
|
||||
|
||||
|
||||
def extract_slug_date(folder_name: str) -> Optional[str]:
|
||||
m = _SLUG_DATE_RE.search(folder_name)
|
||||
if not m:
|
||||
return None
|
||||
raw: str = m.group(0)
|
||||
return f"{raw[:4]}-{raw[4:6]}-{raw[6:]}"
|
||||
|
||||
|
||||
def _md_escape(text: str) -> str:
|
||||
return text.replace("|", "\\|").replace("\n", " ").replace("\r", " ")
|
||||
|
||||
|
||||
def _to_posix(path_str: str) -> str:
|
||||
return path_str.replace("\\", "/")
|
||||
|
||||
|
||||
def _first_sentence(line: str) -> str:
|
||||
m = _SENTENCE_END_RE.search(line)
|
||||
if m:
|
||||
return line[: m.start() + 1].strip()
|
||||
return line.strip()
|
||||
|
||||
|
||||
def _truncate_to_25_words(text: str) -> str:
|
||||
words: list[str] = text.split()
|
||||
if len(words) <= 25:
|
||||
return text
|
||||
return " ".join(words[:25]) + "\u2026"
|
||||
|
||||
|
||||
def extract_summary(folder_path: Path) -> str:
|
||||
md_path = folder_path / "metadata.json"
|
||||
if md_path.is_file():
|
||||
try:
|
||||
data = json.loads(md_path.read_text(encoding="utf-8"))
|
||||
desc = str(data.get("description", "")).strip()
|
||||
if desc:
|
||||
return desc
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
for fname in ("spec.md", "plan.md"):
|
||||
fpath = folder_path / fname
|
||||
if not fpath.is_file():
|
||||
continue
|
||||
try:
|
||||
text = fpath.read_text(encoding="utf-8")
|
||||
except OSError:
|
||||
continue
|
||||
for line in text.splitlines():
|
||||
stripped = line.strip()
|
||||
if not stripped or stripped.startswith("#"):
|
||||
continue
|
||||
return _truncate_to_25_words(_first_sentence(stripped))
|
||||
return "Imported from archive (no spec)"
|
||||
|
||||
|
||||
def _git_log(folder_relpath: str, *args: str) -> str:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "log", *args, "--", folder_relpath],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=_GIT_TIMEOUT,
|
||||
check=False,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return ""
|
||||
return result.stdout
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
return ""
|
||||
|
||||
|
||||
def _git_first_line(folder_relpath: str, *args: str) -> str:
|
||||
out = _git_log(folder_relpath, *args)
|
||||
stripped = out.strip()
|
||||
if not stripped:
|
||||
return ""
|
||||
return stripped.splitlines()[0]
|
||||
|
||||
|
||||
def _repo_root(start: Path) -> Path:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "rev-parse", "--show-toplevel"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
check=False,
|
||||
cwd=str(start),
|
||||
)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
return Path(result.stdout.strip())
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
pass
|
||||
return start.parent
|
||||
|
||||
|
||||
def walk_track_folders(root: Path) -> list[dict]:
|
||||
repo_root: Path = _repo_root(root)
|
||||
rows: list[dict] = []
|
||||
for parent_dir, default_status in (
|
||||
(root / "tracks", "Active"),
|
||||
(root / "archive", "Shipped"),
|
||||
):
|
||||
if not parent_dir.is_dir():
|
||||
continue
|
||||
for folder in sorted(parent_dir.iterdir()):
|
||||
if not folder.is_dir():
|
||||
continue
|
||||
try:
|
||||
folder_relpath = _to_posix(str(folder.relative_to(repo_root)))
|
||||
except ValueError:
|
||||
folder_relpath = _to_posix(str(folder))
|
||||
track_id: str = folder.name
|
||||
slug_date = extract_slug_date(track_id)
|
||||
if slug_date:
|
||||
date = slug_date
|
||||
else:
|
||||
first_commit = _git_first_line(folder_relpath, "--reverse", "--format=%aI")
|
||||
date = first_commit[:10] if first_commit else ""
|
||||
status: str = default_status
|
||||
metadata_path = folder / "metadata.json"
|
||||
if metadata_path.is_file():
|
||||
try:
|
||||
data = json.loads(metadata_path.read_text(encoding="utf-8"))
|
||||
meta_status = str(data.get("status", "")).strip()
|
||||
if meta_status:
|
||||
status = meta_status
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
summary: str = extract_summary(folder)
|
||||
init_sha: str = _git_first_line(folder_relpath, "--reverse", "--format=%h")
|
||||
end_sha: str = _git_first_line(folder_relpath, "-1", "--format=%h")
|
||||
if init_sha and end_sha:
|
||||
range_log = _git_log(folder_relpath, "--oneline", f"{init_sha}..{end_sha}")
|
||||
commit_count: int = range_log.count("\n") + (1 if init_sha != end_sha else 0)
|
||||
else:
|
||||
fallback_log = _git_log(folder_relpath, "--oneline")
|
||||
commit_count = fallback_log.count("\n")
|
||||
try:
|
||||
folder_link = _to_posix(str(folder.relative_to(repo_root)))
|
||||
except ValueError:
|
||||
folder_link = _to_posix(str(folder))
|
||||
rows.append({
|
||||
"date": date,
|
||||
"track_id": track_id,
|
||||
"status": status,
|
||||
"summary": summary,
|
||||
"init_sha": init_sha,
|
||||
"end_sha": end_sha,
|
||||
"commit_count": commit_count,
|
||||
"folder_link": folder_link,
|
||||
})
|
||||
rows.sort(key=lambda r: r["track_id"])
|
||||
rows.sort(key=lambda r: r["date"], reverse=True)
|
||||
return rows
|
||||
|
||||
|
||||
def format_markdown(rows: list[dict]) -> str:
|
||||
lines: list[str] = [
|
||||
"| Date | ID | Status | Summary | Folder | Range |",
|
||||
"| --- | --- | --- | --- | --- | --- |",
|
||||
]
|
||||
for row in rows:
|
||||
range_str: str = f"`{row['init_sha']}..{row['end_sha']}` ({row['commit_count']})"
|
||||
lines.append(
|
||||
f"| {row['date']} | `{row['track_id']}` | {row['status']} | "
|
||||
f"{_md_escape(row['summary'])} | `{row['folder_link']}` | {range_str} |"
|
||||
)
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
def main() -> None:
|
||||
if hasattr(sys.stdout, "reconfigure"):
|
||||
try:
|
||||
sys.stdout.reconfigure(encoding="utf-8")
|
||||
except (OSError, ValueError):
|
||||
pass
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Generate chronology draft for Manual Slop conductor tracks.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--draft",
|
||||
action="store_true",
|
||||
help="Emit markdown draft table to stdout.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--root",
|
||||
default=_DEFAULT_ROOT,
|
||||
help=f"Path to conductor root (default: {_DEFAULT_ROOT}).",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
root = Path(args.root)
|
||||
if not root.is_absolute():
|
||||
root = Path.cwd() / root
|
||||
rows = walk_track_folders(root)
|
||||
if args.draft:
|
||||
sys.stdout.write(format_markdown(rows))
|
||||
else:
|
||||
sys.stdout.write(json.dumps(rows, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user