Private
Public Access
0
0
Files
manual_slop/scripts/audit/generate_chronology.py
T

440 lines
13 KiB
Python

#!/usr/bin/env python3
"""Generate chronology for Manual Slop conductor tracks.
Walks conductor/tracks/ and conductor/archive/, extracts per-track data
(date, ID, status, summary, commit range), and emits a markdown chronology
to stdout.
The v2 classifier uses git-history evidence (work-commit count + report
overrides) instead of stale metadata.json.status. Returns
(status, confidence, reason) per row.
Usage:
uv run python scripts/audit/generate_chronology.py --draft
uv run python scripts/audit/generate_chronology.py --root conductor/
uv run python scripts/audit/generate_chronology.py # JSON dump
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from pathlib import Path
from typing import Optional
_SLUG_DATE_RE = re.compile(r"\d{8}$")
_SENTENCE_END_RE = re.compile(r"\.\s")
_GIT_TIMEOUT = 30
_DEFAULT_ROOT = "conductor/"
_METADATA_FIELD_PREFIXES = (
"**Priority:**",
"**Date:**",
"**Initialized:**",
"**Track:**",
"**Track ID:**",
"**Parent umbrella:**",
"**Status:**",
"**Confidence:**",
"**Ancestors:**",
)
_WORK_COMMIT_PREFIXES = ("feat", "fix", "refactor", "perf", "test", "docs(report)")
_METADATA_COMMIT_PREFIXES = ("conductor(plan):", "conductor(state):", "conductor(track):", "docs(spec):", "docs(plan):")
def _is_work_commit(msg: str) -> bool:
"""Check if a commit message is a work commit (feat/fix/refactor/perf/test with optional scope)."""
for prefix in _WORK_COMMIT_PREFIXES:
if msg.startswith(prefix + ":") or msg.startswith(prefix + "("):
if not any(msg.startswith(m) for m in _METADATA_COMMIT_PREFIXES):
return True
return False
def extract_slug_date(folder_name: str) -> Optional[str]:
m = _SLUG_DATE_RE.search(folder_name)
if not m:
return None
raw: str = m.group(0)
return f"{raw[:4]}-{raw[4:6]}-{raw[6:]}"
def _md_escape(text: str) -> str:
return text.replace("|", "\\|").replace("\n", " ").replace("\r", " ")
def _to_posix(path_str: str) -> str:
return path_str.replace("\\", "/")
def _first_sentence(line: str) -> str:
m = _SENTENCE_END_RE.search(line)
if m:
return line[: m.start() + 1].strip()
return line.strip()
def _truncate_to_25_words(text: str) -> str:
words: list[str] = text.split()
if len(words) <= 25:
return text
return " ".join(words[:25]) + "\u2026"
def extract_summary(folder_path: Path) -> str:
md_path = folder_path / "metadata.json"
if md_path.is_file():
try:
data = json.loads(md_path.read_text(encoding="utf-8"))
desc = str(data.get("description", "")).strip()
if desc and not desc.startswith(_METADATA_FIELD_PREFIXES):
return _truncate_to_25_words(_first_sentence(desc))
except (json.JSONDecodeError, OSError):
pass
for fname in ("spec.md", "plan.md"):
fpath = folder_path / fname
if not fpath.is_file():
continue
try:
text = fpath.read_text(encoding="utf-8")
except OSError:
continue
for line in text.splitlines():
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("#"):
continue
if stripped.startswith(">"):
continue
bare = stripped.lstrip(">").strip()
if bare.startswith(_METADATA_FIELD_PREFIXES):
continue
return _truncate_to_25_words(_first_sentence(bare))
return "Imported from archive (no spec)"
def _git_log(folder_relpath: str, *args: str) -> str:
try:
result = subprocess.run(
["git", "log", *args, "--", folder_relpath],
capture_output=True,
text=True,
timeout=_GIT_TIMEOUT,
check=False,
)
if result.returncode != 0:
return ""
return result.stdout
except (subprocess.SubprocessError, OSError):
return ""
def _git_log_multi(*folder_relpaths: str) -> str:
"""Get git log for multiple paths in a single subprocess call."""
try:
result = subprocess.run(
["git", "log", "--oneline", "--", *folder_relpaths],
capture_output=True,
text=True,
timeout=_GIT_TIMEOUT,
check=False,
)
if result.returncode != 0:
return ""
return result.stdout
except (subprocess.SubprocessError, OSError):
return ""
def _git_first_line(folder_relpath: str, *args: str) -> str:
out = _git_log(folder_relpath, *args)
stripped = out.strip()
if not stripped:
return ""
return stripped.splitlines()[0]
def _repo_root(start: Path) -> Path:
try:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
timeout=10,
check=False,
cwd=str(start),
)
if result.returncode == 0 and result.stdout.strip():
return Path(result.stdout.strip())
except (subprocess.SubprocessError, OSError):
pass
return start.parent
def _parse_state_phase(state_path: Path) -> str:
if not state_path.is_file():
return "no-state-toml"
try:
for line in state_path.read_text(encoding="utf-8").splitlines():
if line.startswith("current_phase"):
v = line.split("=", 1)[1].strip().split("#")[0].strip().strip('"')
return v
except (OSError, Exception):
pass
return "?"
def _parse_state_status(state_path: Path) -> str:
if not state_path.is_file():
return ""
try:
text = state_path.read_text(encoding="utf-8")
except OSError:
return ""
for line in text.splitlines():
stripped = line.strip()
if stripped.startswith("status") and "=" in stripped:
parts = stripped.split("=", 1)
if len(parts) == 2:
val: str = parts[1].split("#")[0].strip()
val = val.strip('"').strip("'").strip()
return val
return ""
def _last_commit_date(folder_relpath: str) -> str:
try:
result = subprocess.run(
["git", "log", "-1", "--format=%ad", "--date=short", "--", folder_relpath],
capture_output=True, text=True, timeout=_GIT_TIMEOUT, check=False,
)
return result.stdout.strip()
except (subprocess.SubprocessError, OSError):
return "never"
def _count_work_commits_from_log(log: str) -> int:
count: int = 0
for line in log.splitlines():
msg: str = line.split(" ", 1)[1] if " " in line else ""
if _is_work_commit(msg):
count += 1
return count
def _count_work_commits(folder_relpath: str) -> int:
log: str = _git_log(folder_relpath, "--oneline")
return _count_work_commits_from_log(log)
def _has_report_matching(reports_dir: Path, track_id: str, prefix: str) -> bool:
if not reports_dir.is_dir():
return False
for f in reports_dir.iterdir():
if f.is_file() and f.name.startswith(prefix) and track_id in f.name:
return True
return False
def classify_status(
folder_link: str,
current: str,
track_id: str,
repo_root: Path,
reports_dir: Path,
has_abort_report: bool = False,
state_status: str = "",
work_commits: int = -1,
) -> tuple[str, str, str]:
"""Git-history evidence classifier returning (status, confidence, reason).
Evidence priority:
1. Override signals (highest): state.toml status (human-set), TRACK_COMPLETION/TRACK_ABORTED reports
2. Git commit evidence (medium): work-commit count
3. Directory location (low): archive/ vs tracks/
4. Fallback: Needs Review
"""
if "PLACEHOLDER" in track_id:
return ("Special", "high", "placeholder track")
if "contingency" in current.lower():
return ("Special", "high", "contingency track")
# 1. Override signals — state.toml is human-set, trust it
if state_status == "superseded":
return ("Superseded", "high", "state.toml status=superseded")
if state_status in ("completed", "complete", "shipped"):
return ("Completed", "high", f"state.toml status={state_status}")
if state_status == "abandoned":
return ("Abandoned", "high", "state.toml status=abandoned")
if state_status == "archived":
return ("Completed", "high", "state.toml status=archived (treated as completed)")
if has_abort_report or _has_report_matching(reports_dir, track_id, "TRACK_ABORTED_"):
return ("Abandoned", "high", "abort report found")
if _has_report_matching(reports_dir, track_id, "TRACK_COMPLETION_"):
return ("Completed", "high", "completion report found")
# 2. Git commit evidence
is_archive = folder_link.startswith("conductor/archive/")
is_tracks = folder_link.startswith("conductor/tracks/")
if work_commits < 0:
work_commits = _count_work_commits(folder_link)
if work_commits >= 3:
return ("Completed", "medium", f"{work_commits} work commits")
if 1 <= work_commits <= 2 and is_tracks:
return ("In Progress", "medium", f"{work_commits} work commits in tracks/")
if work_commits == 0 and is_tracks:
return ("Active", "medium", "0 work commits in tracks/ (spec/plan only)")
# 3. Directory location
if is_archive:
# Archive tracks are almost always completed work — the act of `git mv` to archive/
# IS the completion signal. You don't archive abandoned tracks; you leave them or
# delete them. Default to Completed unless a TRACK_ABORTED report says otherwise
# (already checked above). The work was typically done in src/ files, not the
# track folder, so commit-count heuristics on the track folder are unreliable here.
return ("Completed", "low", "archived (work in src/, not track folder)")
# 4. Fallback
return ("Needs Review", "none", "classifier inconclusive")
def walk_track_folders(root: Path) -> list[dict]:
repo_root: Path = _repo_root(root)
reports_dir: Path = repo_root / "docs" / "reports"
rows: list[dict] = []
for parent_dir, default_status in (
(root / "tracks", "Active"),
(root / "archive", "Completed"),
):
if not parent_dir.is_dir():
continue
for folder in sorted(parent_dir.iterdir()):
if not folder.is_dir():
continue
try:
folder_relpath = _to_posix(str(folder.relative_to(repo_root)))
except ValueError:
folder_relpath = _to_posix(str(folder))
track_id: str = folder.name
slug_date = extract_slug_date(track_id)
# Get the full oneline log once. For archive folders, include both the
# archive path and the original tracks/ path (git mv preserves history
# but git log -- <archive_path> alone may miss pre-move commits)
if folder_relpath.startswith("conductor/archive/"):
original_path: str = folder_relpath.replace("conductor/archive/", "conductor/tracks/", 1)
oneline_log: str = _git_log_multi(folder_relpath, original_path)
else:
oneline_log = _git_log(folder_relpath, "--oneline")
log_lines: list[str] = [l for l in oneline_log.splitlines() if l.strip()]
commit_count: int = len(log_lines)
if slug_date:
date = slug_date
elif log_lines:
# First commit date via reverse log
first_commit = _git_first_line(folder_relpath, "--reverse", "--format=%aI")
date = first_commit[:10] if first_commit else ""
else:
date = ""
# Derive init_sha and end_sha from the oneline log
if log_lines:
init_sha: str = log_lines[-1].split(" ", 1)[0] # oldest (last in reverse-chronological git log)
end_sha: str = log_lines[0].split(" ", 1)[0] # newest (first in git log)
else:
init_sha = ""
end_sha = ""
metadata_path = folder / "metadata.json"
meta_status: str = ""
if metadata_path.is_file():
try:
data = json.loads(metadata_path.read_text(encoding="utf-8"))
meta_status = str(data.get("status", "")).strip()
except (json.JSONDecodeError, OSError):
pass
state_status: str = _parse_state_status(folder / "state.toml")
work_commits: int = _count_work_commits_from_log(oneline_log)
status, confidence, reason = classify_status(
folder_link=folder_relpath,
current=meta_status or default_status,
track_id=track_id,
repo_root=repo_root,
reports_dir=reports_dir,
state_status=state_status,
work_commits=work_commits,
)
summary: str = extract_summary(folder)
try:
folder_link = _to_posix(str(folder.relative_to(repo_root)))
except ValueError:
folder_link = _to_posix(str(folder))
rows.append({
"date": date,
"track_id": track_id,
"status": status,
"confidence": confidence,
"reason": reason,
"summary": summary,
"init_sha": init_sha,
"end_sha": end_sha,
"commit_count": commit_count,
"folder_link": folder_link,
})
rows.sort(key=lambda r: r["track_id"])
rows.sort(key=lambda r: r["date"], reverse=True)
return rows
def format_markdown(rows: list[dict]) -> str:
from datetime import date as today_date
lines: list[str] = []
lines.append(f"<!-- Generated {today_date.today().isoformat()} | {len(rows)} rows -->")
lines.append("")
lines.append("| Date | ID | Status | Summary | Folder | Range |")
lines.append("| --- | --- | --- | --- | --- | --- |")
for r in rows:
range_str: str = f"`{r['init_sha']}..{r['end_sha']}` ({r['commit_count']})" if r["init_sha"] else "n/a"
lines.append(
f"| {r['date']} | `{r['track_id']}` | {r['status']} | "
f"{_md_escape(r['summary'])} | `{r['folder_link']}` | {range_str} |"
)
needs_review = [r for r in rows if r["status"] == "Needs Review"]
if needs_review:
lines.append("")
lines.append("## Needs Review")
lines.append("")
for r in needs_review:
lines.append(f"- `{r['track_id']}` (`{r['folder_link']}`): {r['reason']}")
return "\n".join(lines) + "\n"
def main() -> None:
if hasattr(sys.stdout, "reconfigure"):
try:
sys.stdout.reconfigure(encoding="utf-8")
except (OSError, ValueError):
pass
parser = argparse.ArgumentParser(
description="Generate chronology for Manual Slop conductor tracks.",
)
parser.add_argument(
"--draft",
action="store_true",
help="Emit markdown table to stdout.",
)
parser.add_argument(
"--root",
default=_DEFAULT_ROOT,
help=f"Path to conductor root (default: {_DEFAULT_ROOT}).",
)
args = parser.parse_args()
root = Path(args.root)
if not root.is_absolute():
root = Path.cwd() / root
rows = walk_track_folders(root)
if args.draft:
sys.stdout.write(format_markdown(rows))
else:
sys.stdout.write(json.dumps(rows, indent=2))
if __name__ == "__main__":
main()