feat(chronology): rewrite classifier to use git-history evidence + 7-status enum + Needs Review section
This commit is contained in:
@@ -1,12 +1,13 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Generate chronology draft for Manual Slop conductor tracks.
|
||||
"""Generate chronology for Manual Slop conductor tracks.
|
||||
|
||||
Walks conductor/tracks/ and conductor/archive/, extracts per-track data
|
||||
(date, ID, status, summary, commit range), and emits a draft to stdout.
|
||||
(date, ID, status, summary, commit range), and emits a markdown chronology
|
||||
to stdout.
|
||||
|
||||
The script is READ-ONLY on the source folders. It writes to stdout only.
|
||||
The human cross-check (FR6 of the chronology_20260619 track) is the authority;
|
||||
this script is a starting point, not the canonical source.
|
||||
The v2 classifier uses git-history evidence (work-commit count + report
|
||||
overrides) instead of stale metadata.json.status. Returns
|
||||
(status, confidence, reason) per row.
|
||||
|
||||
Usage:
|
||||
uv run python scripts/audit/generate_chronology.py --draft
|
||||
@@ -28,6 +29,20 @@ _SENTENCE_END_RE = re.compile(r"\.\s")
|
||||
_GIT_TIMEOUT = 30
|
||||
_DEFAULT_ROOT = "conductor/"
|
||||
|
||||
_METADATA_FIELD_PREFIXES = (
|
||||
"**Priority:**",
|
||||
"**Date:**",
|
||||
"**Initialized:**",
|
||||
"**Track:**",
|
||||
"**Track ID:**",
|
||||
"**Parent umbrella:**",
|
||||
"**Status:**",
|
||||
"**Confidence:**",
|
||||
)
|
||||
|
||||
_WORK_COMMIT_PREFIXES = ("feat:", "fix:", "refactor:", "perf:", "test:", "docs(report):")
|
||||
_METADATA_COMMIT_PREFIXES = ("conductor(plan):", "conductor(state):", "conductor(track):", "docs(spec):", "docs(plan):")
|
||||
|
||||
|
||||
def extract_slug_date(folder_name: str) -> Optional[str]:
|
||||
m = _SLUG_DATE_RE.search(folder_name)
|
||||
@@ -65,8 +80,8 @@ def extract_summary(folder_path: Path) -> str:
|
||||
try:
|
||||
data = json.loads(md_path.read_text(encoding="utf-8"))
|
||||
desc = str(data.get("description", "")).strip()
|
||||
if desc:
|
||||
return desc
|
||||
if desc and not desc.startswith(_METADATA_FIELD_PREFIXES):
|
||||
return _truncate_to_25_words(_first_sentence(desc))
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
for fname in ("spec.md", "plan.md"):
|
||||
@@ -80,14 +95,14 @@ def extract_summary(folder_path: Path) -> str:
|
||||
for line in text.splitlines():
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
continue
|
||||
continue
|
||||
if stripped.startswith("#"):
|
||||
continue
|
||||
continue
|
||||
if stripped.startswith(">"):
|
||||
continue
|
||||
continue
|
||||
bare = stripped.lstrip(">").strip()
|
||||
if bare.startswith("**Status:**") or bare.startswith("**Track ID:**") or bare.startswith("**Track:**"):
|
||||
continue
|
||||
if bare.startswith(_METADATA_FIELD_PREFIXES):
|
||||
continue
|
||||
return _truncate_to_25_words(_first_sentence(bare))
|
||||
return "Imported from archive (no spec)"
|
||||
|
||||
@@ -133,47 +148,6 @@ def _repo_root(start: Path) -> Path:
|
||||
return start.parent
|
||||
|
||||
|
||||
def _git_log(folder_relpath: str, *args: str) -> str:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "log", *args, "--", folder_relpath],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=_GIT_TIMEOUT,
|
||||
check=False,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return ""
|
||||
return result.stdout
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
return ""
|
||||
|
||||
|
||||
def _git_first_line(folder_relpath: str, *args: str) -> str:
|
||||
out = _git_log(folder_relpath, *args)
|
||||
stripped = out.strip()
|
||||
if not stripped:
|
||||
return ""
|
||||
return stripped.splitlines()[0]
|
||||
|
||||
|
||||
def _repo_root(start: Path) -> Path:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "rev-parse", "--show-toplevel"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
check=False,
|
||||
cwd=str(start),
|
||||
)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
return Path(result.stdout.strip())
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
pass
|
||||
return start.parent
|
||||
|
||||
|
||||
def _parse_state_phase(state_path: Path) -> str:
|
||||
if not state_path.is_file():
|
||||
return "no-state-toml"
|
||||
@@ -182,11 +156,27 @@ def _parse_state_phase(state_path: Path) -> str:
|
||||
if line.startswith("current_phase"):
|
||||
v = line.split("=", 1)[1].strip().split("#")[0].strip().strip('"')
|
||||
return v
|
||||
except (subprocess.SubprocessError, OSError, Exception):
|
||||
except (OSError, Exception):
|
||||
pass
|
||||
return "?"
|
||||
|
||||
|
||||
def _parse_state_status(state_path: Path) -> str:
|
||||
if not state_path.is_file():
|
||||
return ""
|
||||
try:
|
||||
text = state_path.read_text(encoding="utf-8")
|
||||
except OSError:
|
||||
return ""
|
||||
for line in text.splitlines():
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("status") and "=" in stripped:
|
||||
parts = stripped.split("=", 1)
|
||||
if len(parts) == 2:
|
||||
return parts[1].strip().strip('"').strip("'").split("#")[0].strip()
|
||||
return ""
|
||||
|
||||
|
||||
def _last_commit_date(folder_relpath: str) -> str:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
@@ -198,38 +188,75 @@ def _last_commit_date(folder_relpath: str) -> str:
|
||||
return "never"
|
||||
|
||||
|
||||
def _classify_status(folder_link: str, current: str, track_id: str) -> str:
|
||||
"""Per-row manual review classification (FR6 hard gate).
|
||||
def _count_work_commits(folder_relpath: str) -> int:
|
||||
log: str = _git_log(folder_relpath, "--oneline")
|
||||
count: int = 0
|
||||
for line in log.splitlines():
|
||||
msg: str = line.split(" ", 1)[1] if " " in line else ""
|
||||
if msg.startswith(_WORK_COMMIT_PREFIXES) and not msg.startswith(_METADATA_COMMIT_PREFIXES):
|
||||
count += 1
|
||||
return count
|
||||
|
||||
Logic (per user directive 2026-06-20):
|
||||
- PLACEHOLDER tracks: keep as is
|
||||
- archive/ folder: default to Completed (the work was done and archived; metadata status may be stale)
|
||||
- tracks/ folder + state_phase=complete OR chrono in {completed, complete, shipped}: Completed
|
||||
- tracks/ folder + everything else: keep original chrono status (in flight)
|
||||
- Abandoned is reserved for explicit user marking; the script does NOT auto-mark.
|
||||
|
||||
Note: "Completed" (not "Shipped") is the canonical term per user directive 2026-06-20.
|
||||
This is a side-project, not a shipped product.
|
||||
def _has_report_matching(reports_dir: Path, track_id: str, prefix: str) -> bool:
|
||||
if not reports_dir.is_dir():
|
||||
return False
|
||||
for f in reports_dir.iterdir():
|
||||
if f.is_file() and f.name.startswith(prefix) and track_id in f.name:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def classify_status(
|
||||
folder_link: str,
|
||||
current: str,
|
||||
track_id: str,
|
||||
repo_root: Path,
|
||||
reports_dir: Path,
|
||||
has_abort_report: bool = False,
|
||||
state_status: str = "",
|
||||
) -> tuple[str, str, str]:
|
||||
"""Git-history evidence classifier returning (status, confidence, reason).
|
||||
|
||||
Evidence priority:
|
||||
1. Override signals (highest): TRACK_COMPLETION/TRACK_ABORTED reports, state.toml superseded
|
||||
2. Git commit evidence (medium): work-commit count
|
||||
3. Directory location (low): archive/ vs tracks/
|
||||
4. Fallback: Needs Review
|
||||
"""
|
||||
if "PLACEHOLDER" in track_id:
|
||||
return current
|
||||
return ("Special", "high", "placeholder track")
|
||||
if "contingency" in current.lower():
|
||||
return current
|
||||
return ("Special", "high", "contingency track")
|
||||
# 1. Override signals
|
||||
if state_status == "superseded":
|
||||
return ("Superseded", "high", "state.toml status=superseded")
|
||||
if has_abort_report or _has_report_matching(reports_dir, track_id, "TRACK_ABORTED_"):
|
||||
return ("Abandoned", "high", "abort report found")
|
||||
if _has_report_matching(reports_dir, track_id, "TRACK_COMPLETION_"):
|
||||
return ("Completed", "high", "completion report found")
|
||||
# 2. Git commit evidence
|
||||
is_archive = folder_link.startswith("conductor/archive/")
|
||||
is_tracks = folder_link.startswith("conductor/tracks/")
|
||||
work_commits: int = _count_work_commits(folder_link)
|
||||
if work_commits >= 3:
|
||||
return ("Completed", "medium", f"{work_commits} work commits")
|
||||
if 1 <= work_commits <= 2 and is_tracks:
|
||||
return ("In Progress", "medium", f"{work_commits} work commits in tracks/")
|
||||
if work_commits == 0 and is_tracks:
|
||||
return ("Active", "medium", "0 work commits in tracks/ (spec/plan only)")
|
||||
# 3. Directory location
|
||||
if is_archive:
|
||||
return "Completed"
|
||||
folder = Path(folder_link)
|
||||
state_phase = _parse_state_phase(folder / "state.toml") if is_tracks else "?"
|
||||
chrono_lower = current.lower()
|
||||
is_completed = chrono_lower in {"completed", "complete", "shipped"} or state_phase in {"complete", '"complete"'}
|
||||
if is_tracks and is_completed:
|
||||
return "Completed"
|
||||
return current
|
||||
if work_commits == 0:
|
||||
return ("Abandoned", "low", "archived with 0 commits")
|
||||
return ("Completed", "low", "archived but no completion report")
|
||||
# 4. Fallback
|
||||
return ("Needs Review", "none", "classifier inconclusive")
|
||||
|
||||
|
||||
def walk_track_folders(root: Path) -> list[dict]:
|
||||
repo_root: Path = _repo_root(root)
|
||||
reports_dir: Path = repo_root / "docs" / "reports"
|
||||
rows: list[dict] = []
|
||||
for parent_dir, default_status in (
|
||||
(root / "tracks", "Active"),
|
||||
@@ -252,16 +279,22 @@ def walk_track_folders(root: Path) -> list[dict]:
|
||||
first_commit = _git_first_line(folder_relpath, "--reverse", "--format=%aI")
|
||||
date = first_commit[:10] if first_commit else ""
|
||||
metadata_path = folder / "metadata.json"
|
||||
status: str = default_status
|
||||
meta_status: str = ""
|
||||
if metadata_path.is_file():
|
||||
try:
|
||||
data = json.loads(metadata_path.read_text(encoding="utf-8"))
|
||||
meta_status = str(data.get("status", "")).strip()
|
||||
if meta_status:
|
||||
status = meta_status
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
status = _classify_status(folder_relpath, status, track_id)
|
||||
state_status: str = _parse_state_status(folder / "state.toml")
|
||||
status, confidence, reason = classify_status(
|
||||
folder_link=folder_relpath,
|
||||
current=meta_status or default_status,
|
||||
track_id=track_id,
|
||||
repo_root=repo_root,
|
||||
reports_dir=reports_dir,
|
||||
state_status=state_status,
|
||||
)
|
||||
summary: str = extract_summary(folder)
|
||||
init_sha: str = _git_first_line(folder_relpath, "--reverse", "--format=%h")
|
||||
end_sha: str = _git_first_line(folder_relpath, "-1", "--format=%h")
|
||||
@@ -279,6 +312,8 @@ def walk_track_folders(root: Path) -> list[dict]:
|
||||
"date": date,
|
||||
"track_id": track_id,
|
||||
"status": status,
|
||||
"confidence": confidence,
|
||||
"reason": reason,
|
||||
"summary": summary,
|
||||
"init_sha": init_sha,
|
||||
"end_sha": end_sha,
|
||||
@@ -291,16 +326,25 @@ def walk_track_folders(root: Path) -> list[dict]:
|
||||
|
||||
|
||||
def format_markdown(rows: list[dict]) -> str:
|
||||
lines: list[str] = [
|
||||
"| Date | ID | Status | Summary | Folder | Range |",
|
||||
"| --- | --- | --- | --- | --- | --- |",
|
||||
]
|
||||
for row in rows:
|
||||
range_str: str = f"`{row['init_sha']}..{row['end_sha']}` ({row['commit_count']})"
|
||||
from datetime import date as today_date
|
||||
lines: list[str] = []
|
||||
lines.append(f"<!-- Generated {today_date.today().isoformat()} | {len(rows)} rows -->")
|
||||
lines.append("")
|
||||
lines.append("| Date | ID | Status | Summary | Folder | Range |")
|
||||
lines.append("| --- | --- | --- | --- | --- | --- |")
|
||||
for r in rows:
|
||||
range_str: str = f"`{r['init_sha']}..{r['end_sha']}` ({r['commit_count']})" if r["init_sha"] else "n/a"
|
||||
lines.append(
|
||||
f"| {row['date']} | `{row['track_id']}` | {row['status']} | "
|
||||
f"{_md_escape(row['summary'])} | `{row['folder_link']}` | {range_str} |"
|
||||
f"| {r['date']} | `{r['track_id']}` | {r['status']} | "
|
||||
f"{_md_escape(r['summary'])} | `{r['folder_link']}` | {range_str} |"
|
||||
)
|
||||
needs_review = [r for r in rows if r["status"] == "Needs Review"]
|
||||
if needs_review:
|
||||
lines.append("")
|
||||
lines.append("## Needs Review")
|
||||
lines.append("")
|
||||
for r in needs_review:
|
||||
lines.append(f"- `{r['track_id']}` (`{r['folder_link']}`): {r['reason']}")
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
@@ -311,12 +355,12 @@ def main() -> None:
|
||||
except (OSError, ValueError):
|
||||
pass
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Generate chronology draft for Manual Slop conductor tracks.",
|
||||
description="Generate chronology for Manual Slop conductor tracks.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--draft",
|
||||
action="store_true",
|
||||
help="Emit markdown draft table to stdout.",
|
||||
help="Emit markdown table to stdout.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--root",
|
||||
@@ -335,4 +379,4 @@ def main() -> None:
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
@@ -149,7 +149,7 @@ def test_classify_status_work_commits_completed(tmp_path: Path) -> None:
|
||||
reports_dir = tmp_path / "docs/reports"
|
||||
reports_dir.mkdir(parents=True)
|
||||
with patch("scripts.audit.generate_chronology._git_log") as mock_log:
|
||||
mock_log.return_value = "feat: add thing\nfix: fix thing\nrefactor: refactor thing\n"
|
||||
mock_log.return_value = "abc1234 feat: add thing\ndef5678 fix: fix thing\nghi9012 refactor: refactor thing\n"
|
||||
result = classify_status(
|
||||
folder_link="conductor/tracks/my_track_20260701",
|
||||
current="active",
|
||||
@@ -162,12 +162,12 @@ def test_classify_status_work_commits_completed(tmp_path: Path) -> None:
|
||||
assert "work commits" in result[2]
|
||||
|
||||
|
||||
def test_classify_status_metadata_commits_not_counted_as_work(tmp_path: Path) -> None:
|
||||
def test_classify_status_metadata_commits_not_countd_as_work(tmp_path: Path) -> None:
|
||||
"""conductor(plan): commits don't count as work commits."""
|
||||
reports_dir = tmp_path / "docs/reports"
|
||||
reports_dir.mkdir(parents=True)
|
||||
with patch("scripts.audit.generate_chronology._git_log") as mock_log:
|
||||
mock_log.return_value = "conductor(plan): mark task\nconductor(state): update\nconductor(track): init\n"
|
||||
mock_log.return_value = "abc1234 conductor(plan): mark task\ndef5678 conductor(state): update\nghi9012 conductor(track): init\n"
|
||||
result = classify_status(
|
||||
folder_link="conductor/tracks/my_track_20260701",
|
||||
current="active",
|
||||
@@ -183,7 +183,7 @@ def test_classify_status_1_2_work_commits_in_progress(tmp_path: Path) -> None:
|
||||
reports_dir = tmp_path / "docs/reports"
|
||||
reports_dir.mkdir(parents=True)
|
||||
with patch("scripts.audit.generate_chronology._git_log") as mock_log:
|
||||
mock_log.return_value = "feat: add thing\nfix: fix thing\n"
|
||||
mock_log.return_value = "abc1234 feat: add thing\ndef5678 fix: fix thing\n"
|
||||
result = classify_status(
|
||||
folder_link="conductor/tracks/my_track_20260701",
|
||||
current="active",
|
||||
@@ -199,7 +199,7 @@ def test_classify_status_archive_no_override_completed_low(tmp_path: Path) -> No
|
||||
reports_dir = tmp_path / "docs/reports"
|
||||
reports_dir.mkdir(parents=True)
|
||||
with patch("scripts.audit.generate_chronology._git_log") as mock_log:
|
||||
mock_log.return_value = "feat: thing\nfix: thing\nrefactor: thing\n"
|
||||
mock_log.return_value = "abc1234 feat: thing\ndef5678 fix: thing\nghi9012 refactor: thing\n"
|
||||
result = classify_status(
|
||||
folder_link="conductor/archive/my_track_20260701",
|
||||
current="active",
|
||||
@@ -212,13 +212,13 @@ def test_classify_status_archive_no_override_completed_low(tmp_path: Path) -> No
|
||||
|
||||
|
||||
def test_classify_status_fallback_needs_review(tmp_path: Path) -> None:
|
||||
"""Inconclusive -> Needs Review."""
|
||||
"""Inconclusive -> Needs Review (path is neither tracks/ nor archive/)."""
|
||||
reports_dir = tmp_path / "docs/reports"
|
||||
reports_dir.mkdir(parents=True)
|
||||
with patch("scripts.audit.generate_chronology._git_log") as mock_log:
|
||||
mock_log.return_value = ""
|
||||
result = classify_status(
|
||||
folder_link="conductor/tracks/my_track",
|
||||
folder_link="some/other/path/my_track",
|
||||
current="",
|
||||
track_id="my_track",
|
||||
repo_root=tmp_path,
|
||||
|
||||
Reference in New Issue
Block a user