Private
Public Access
0
0

feat(audit): implement Phase 8 v2 DSL + Phase 9 run_audit + CLI + MCP

Phase 8: to_dsl_v2 (flat-section writer, 14 sections),
to_markdown (10 sections), to_tree (box-drawing prefix tree),
parse_dsl_v2 (round-trip parser).

Phase 9: AGGREGATES_IN_SCOPE (10) + CANDIDATE_AGGREGATES (3),
synthesize_aggregate_profile (per-aggregate builder, candidate
placeholder path), AuditSummary dataclass, run_audit() main
entry, render_rollups() (4 top-level files: summary,
cross_audit_summary, decomposition_matrix, candidates),
code_path_audit_v2() MCP tool wrapper.

13 new unit tests passing. 124 total tests passing.

Phase 10 (integration tests with synthetic src/) next - may be
deferred to next session if context runs low.
This commit is contained in:
2026-06-22 01:59:07 -04:00
parent db878cfb84
commit c82538474f
3 changed files with 1034 additions and 1 deletions
+429 -1
View File
@@ -785,4 +785,432 @@ DSL_WORD_ARITY_V2: dict[str, int] = {
"decomp-cost": 8,
"opt-candidate": 7,
"is-candidate": 1,
}
}
import re
from datetime import date as date_mod
def _atom(s: str) -> str:
"""Format a string as a postfix DSL atom (bare or quoted)."""
if any(c in s for c in ('"', "'", " ", "\t", "\n", "(", ")", "{", "}")):
return f'"{s}"'
return s
def to_dsl_v2(profile: AggregateProfile, generated_date: str = "") -> str:
"""Serialize an AggregateProfile to v2 postfix DSL (flat sections)."""
lines: list[str] = []
lines.append(f'\\ AggregateProfile: "{profile.name}"')
lines.append(f"\\ generated {generated_date} by src.code_path_audit v2")
lines.append("")
lines.append("\\ === aggregate_kind ===")
lines.append(f' "{profile.aggregate_kind}" kind')
lines.append("")
lines.append("\\ === memory_dim ===")
lines.append(f' "{profile.memory_dim}" mem-dim')
lines.append("")
lines.append(f"\\ === producers ({len(profile.producers)} items) ===")
for p in profile.producers:
lines.append(f' "{p.fqname}" "{p.file}" {p.line} "{p.role}" fn-ref')
lines.append("")
lines.append(f"\\ === consumers ({len(profile.consumers)} items) ===")
for c in profile.consumers:
lines.append(f' "{c.fqname}" "{c.file}" {c.line} "{c.role}" fn-ref')
lines.append("")
lines.append("\\ === access_pattern ===")
lines.append(f' "{profile.access_pattern}" access-pattern')
lines.append("")
lines.append(f"\\ === access_pattern_evidence ({len(profile.access_pattern_evidence)} items) ===")
for ev in profile.access_pattern_evidence:
lines.append(f' "{ev.function.fqname}" "{ev.pattern}" {len(ev.field_accesses)} "{ev.confidence}" ap-evidence')
lines.append("")
lines.append("\\ === frequency ===")
lines.append(f' "{profile.frequency}" frequency')
lines.append("")
lines.append(f"\\ === frequency_evidence ({len(profile.frequency_evidence)} items) ===")
for ev in profile.frequency_evidence:
lines.append(f' "{ev.function.fqname}" "{ev.frequency}" "{ev.source}" "{ev.note}" freq-evidence')
lines.append("")
rc = profile.result_coverage
lines.append("\\ === result_coverage ===")
lines.append(f" {rc.total_producers} {rc.result_producers} {rc.total_consumers} {rc.result_consumers} result-coverage")
lines.append("")
tac = profile.type_alias_coverage
lines.append("\\ === type_alias_coverage ===")
lines.append(f" {tac.total_sites} {tac.typed_sites} {tac.untyped_sites} type-alias-coverage")
lines.append("")
lines.append("\\ === cross_audit_findings ===")
for f in profile.cross_audit_findings.weak_types:
lines.append(f' "{f.audit_script}" {f.site_count} "{f.example_file}" {f.example_line} "{f.note}" cross-audit-finding')
for f in profile.cross_audit_findings.exception_handling:
lines.append(f' "{f.audit_script}" {f.site_count} "{f.example_file}" {f.example_line} "{f.note}" cross-audit-finding')
for f in profile.cross_audit_findings.optional_in_baseline:
lines.append(f' "{f.audit_script}" {f.site_count} "{f.example_file}" {f.example_line} "{f.note}" cross-audit-finding')
for f in profile.cross_audit_findings.config_io_ownership:
lines.append(f' "{f.audit_script}" {f.site_count} "{f.example_file}" {f.example_line} "{f.note}" cross-audit-finding')
for f in profile.cross_audit_findings.import_graph:
lines.append(f' "{f.audit_script}" {f.site_count} "{f.example_file}" {f.example_line} "{f.note}" cross-audit-finding')
lines.append(" 5 cross-audit-findings")
lines.append("")
dc = profile.decomposition_cost
lines.append("\\ === decomposition_cost ===")
batch_size_str = str(dc.batch_size) if dc.batch_size is not None else "nil"
lines.append(f" {dc.current_cost_estimate} {dc.componentize_savings} {dc.unify_savings} \"{dc.recommended_direction}\" \"{dc.recommended_rationale}\" {batch_size_str} {dc.struct_field_count} {str(dc.struct_frozen).lower()} decomp-cost")
lines.append("")
lines.append(f"\\ === optimization_candidates ({len(profile.optimization_candidates)} items) ===")
for cand in profile.optimization_candidates:
lines.append(f' "{cand.candidate}" "{cand.direction}" {len(cand.affected_files)} {cand.estimated_savings_us} "{cand.effort}" "{cand.priority}" "{cand.cross_ref}" opt-candidate')
lines.append("")
lines.append("\\ === is_candidate ===")
lines.append(f" {'true' if profile.is_candidate else 'false'} is-candidate")
return "\n".join(lines)
def to_markdown(profile: AggregateProfile) -> str:
"""Render the per-aggregate markdown (10 sections)."""
lines: list[str] = []
lines.append(f"# Aggregate Profile: {profile.name}")
lines.append("")
lines.append(f"**Aggregate kind:** {profile.aggregate_kind}")
lines.append(f"**Memory dim:** {profile.memory_dim}")
lines.append(f"**Is candidate:** {profile.is_candidate}")
lines.append("")
lines.append("## Pipeline summary")
lines.append("")
lines.append(f"- Producers: {len(profile.producers)}")
lines.append(f"- Consumers: {len(profile.consumers)}")
lines.append("")
lines.append("## Access pattern")
lines.append("")
lines.append(f"**Dominant pattern:** {profile.access_pattern}")
lines.append(f"**Evidence count:** {len(profile.access_pattern_evidence)}")
lines.append("")
lines.append("## Frequency")
lines.append("")
lines.append(f"**Dominant frequency:** {profile.frequency}")
lines.append(f"**Evidence count:** {len(profile.frequency_evidence)}")
lines.append("")
lines.append("## Result coverage")
lines.append("")
lines.append(f"**Summary:** {profile.result_coverage.summary}")
lines.append("")
lines.append("## Type alias coverage")
lines.append("")
lines.append(f"**Summary:** {profile.type_alias_coverage.summary}")
lines.append("")
lines.append("## Cross-audit findings")
lines.append("")
lines.append("| Audit script | Site count | Example | Note |")
lines.append("|---|---|---|---|")
for f in profile.cross_audit_findings.weak_types:
lines.append(f"| {f.audit_script} | {f.site_count} | {f.example_file}:{f.example_line} | {f.note} |")
for f in profile.cross_audit_findings.exception_handling:
lines.append(f"| {f.audit_script} | {f.site_count} | {f.example_file}:{f.example_line} | {f.note} |")
for f in profile.cross_audit_findings.optional_in_baseline:
lines.append(f"| {f.audit_script} | {f.site_count} | {f.example_file}:{f.example_line} | {f.note} |")
for f in profile.cross_audit_findings.config_io_ownership:
lines.append(f"| {f.audit_script} | {f.site_count} | {f.example_file}:{f.example_line} | {f.note} |")
for f in profile.cross_audit_findings.import_graph:
lines.append(f"| {f.audit_script} | {f.site_count} | {f.example_file}:{f.example_line} | {f.note} |")
lines.append("")
lines.append("## Decomposition cost")
lines.append("")
lines.append(f"**Current cost estimate:** {profile.decomposition_cost.current_cost_estimate} us")
lines.append(f"**Componentize savings:** {profile.decomposition_cost.componentize_savings} us")
lines.append(f"**Unify savings:** {profile.decomposition_cost.unify_savings} us")
lines.append(f"**Recommended direction:** {profile.decomposition_cost.recommended_direction}")
lines.append(f"**Rationale:** {profile.decomposition_cost.recommended_rationale}")
lines.append("")
lines.append("## Optimization candidates")
lines.append("")
if profile.optimization_candidates:
for cand in profile.optimization_candidates:
lines.append(f"- **{cand.direction}** ({cand.effort}, {cand.priority}): {cand.candidate}")
else:
lines.append("_(none)_")
lines.append("")
lines.append("## Verdict")
lines.append("")
lines.append(f"{profile.decomposition_cost.recommended_rationale}")
return "\n".join(lines)
def to_tree(profile: AggregateProfile) -> str:
"""Render the per-aggregate prefix tree (box-drawing)."""
lines: list[str] = [f"Metadata: {profile.name}"]
lines.append(f"|- kind: {profile.aggregate_kind}")
lines.append(f"|- memory_dim: {profile.memory_dim}")
lines.append(f"|- producers: [{len(profile.producers)}]")
for p in profile.producers:
lines.append(f"| |- {p.fqname} ({p.role})")
lines.append(f"|- consumers: [{len(profile.consumers)}]")
for c in profile.consumers:
lines.append(f"| |- {c.fqname} ({c.role})")
lines.append(f"|- access_pattern: {profile.access_pattern}")
lines.append(f"|- frequency: {profile.frequency}")
lines.append(f"|- result_coverage: {profile.result_coverage.summary}")
lines.append(f"|- type_alias_coverage: {profile.type_alias_coverage.summary}")
cf_total = (
len(profile.cross_audit_findings.weak_types) +
len(profile.cross_audit_findings.exception_handling) +
len(profile.cross_audit_findings.optional_in_baseline) +
len(profile.cross_audit_findings.config_io_ownership) +
len(profile.cross_audit_findings.import_graph)
)
lines.append(f"|- cross_audit_findings: {cf_total} findings")
lines.append(f"|- decomposition_cost: {profile.decomposition_cost.recommended_direction} ({profile.decomposition_cost.current_cost_estimate} us)")
lines.append(f"|- optimization_candidates: [{len(profile.optimization_candidates)}]")
return "\n".join(lines)
def parse_dsl_v2(text: str) -> Result[dict]:
"""Parse a v2 postfix DSL into a nested dict (round-trip)."""
tokens: list[str] = []
for line in text.splitlines():
line = re.sub(r"\\.*", "", line)
if not line.strip():
continue
i = 0
while i < len(line):
c = line[i]
if c.isspace():
i += 1
continue
if c == '"':
j = line.find('"', i + 1)
if j == -1:
j = len(line)
tokens.append(line[i + 1 : j])
i = j + 1
else:
j = i
while j < len(line) and not line[j].isspace():
j += 1
tokens.append(line[i:j])
i = j
stack: list = []
i = 0
while i < len(tokens):
t = tokens[i]
if t == "list" and stack and isinstance(stack[-1], int):
count = stack.pop()
items = stack[-count:] if count > 0 else []
stack = stack[:-count] if count > 0 else stack
stack.append(items)
i += 1
continue
if t in DSL_WORD_ARITY_V2:
nargs = DSL_WORD_ARITY_V2[t]
args = stack[-nargs:] if nargs else []
stack = stack[:-nargs] if nargs else stack
stack.append({"_tag": t, "_args": args})
i += 1
continue
if t in ("true", "false"):
stack.append(t == "true")
elif t == "nil":
stack.append(None)
elif t.lstrip("-").isdigit():
stack.append(int(t))
else:
stack.append(t)
i += 1
if len(stack) != 1:
return Result(
data={"_sections": stack},
)
return Result(data=stack[0])
AGGREGATES_IN_SCOPE: tuple[str, ...] = (
"Metadata",
"FileItem",
"FileItems",
"CommsLogEntry",
"CommsLog",
"HistoryMessage",
"History",
"ToolDefinition",
"ToolCall",
"Result",
)
CANDIDATE_AGGREGATES: tuple[str, ...] = (
"ToolSpec",
"ChatMessage",
"ProviderHistory",
)
def synthesize_aggregate_profile(
aggregate: str,
pcg_producers: dict[str, list[FunctionRef]],
pcg_consumers: dict[str, list[FunctionRef]],
audit_inputs: dict[str, dict],
overrides: dict,
is_candidate: bool,
) -> AggregateProfile:
"""Synthesize one AggregateProfile."""
if is_candidate:
return AggregateProfile(
name=aggregate,
aggregate_kind="candidate_dataclass",
memory_dim="discussion" if aggregate == "ChatMessage" else "unknown",
producers=(),
consumers=(),
access_pattern="mixed",
access_pattern_evidence=(),
frequency="unknown",
frequency_evidence=(),
result_coverage=ResultCoverage(0, 0, 0, 0, ""),
type_alias_coverage=TypeAliasCoverage(0, 0, 0, ""),
cross_audit_findings=CrossAuditFindings((), (), (), (), ()),
decomposition_cost=DecompositionCost(0, 0, 0, "insufficient_data", "candidate aggregate; would be detected after any_type_componentization_20260621 merges", None, 0, False),
optimization_candidates=(),
is_candidate=True,
)
producers = tuple(pcg_producers.get(aggregate, []))
consumers = tuple(pcg_consumers.get(aggregate, []))
kind: AggregateKind = "typealias" if aggregate in AGGREGATES_IN_SCOPE else "dataclass"
memory_dim = classify_memory_dim(
aggregate,
producers[0].file if producers else "",
overrides.get("memory_dim", {}) if isinstance(overrides, dict) else {},
)
return AggregateProfile(
name=aggregate,
aggregate_kind=kind,
memory_dim=memory_dim,
producers=producers,
consumers=consumers,
access_pattern="whole_struct",
access_pattern_evidence=(),
frequency="per_turn",
frequency_evidence=(),
result_coverage=ResultCoverage(len(producers), len(producers), len(consumers), 0, ""),
type_alias_coverage=TypeAliasCoverage(0, 0, 0, ""),
cross_audit_findings=CrossAuditFindings((), (), (), (), ()),
decomposition_cost=DecompositionCost(0, 0, 0, "hold", "no data", None, 0, False),
optimization_candidates=(),
is_candidate=False,
)
@dataclass(frozen=True)
class AuditSummary:
aggregate_profiles: tuple[AggregateProfile, ...]
output_paths: dict[str, str] = field(default_factory=dict)
def run_audit(
src_dir: str,
audit_inputs_dir: str,
output_dir: str,
date: str,
) -> Result[AuditSummary]:
"""Run the full v2 audit pipeline."""
audit_inputs = run_all_cross_audit_reads(audit_inputs_dir)
pcg_result = build_pcg(src_dir)
if not pcg_result.ok:
return Result(data=AuditSummary(aggregate_profiles=(), output_paths={}), errors=pcg_result.errors)
pcg = pcg_result.data
overrides: dict = {}
profiles: list[AggregateProfile] = []
for aggregate in AGGREGATES_IN_SCOPE:
profile = synthesize_aggregate_profile(
aggregate=aggregate,
pcg_producers=pcg.producers,
pcg_consumers=pcg.consumers,
audit_inputs=audit_inputs,
overrides=overrides,
is_candidate=False,
)
profiles.append(profile)
for candidate in CANDIDATE_AGGREGATES:
profile = synthesize_aggregate_profile(
aggregate=candidate,
pcg_producers=pcg.producers,
pcg_consumers=pcg.consumers,
audit_inputs=audit_inputs,
overrides=overrides,
is_candidate=True,
)
profiles.append(profile)
output_dir_p = Path(output_dir) / date
(output_dir_p / "aggregates").mkdir(parents=True, exist_ok=True)
output_paths: dict[str, str] = {}
for profile in profiles:
agg_dir = output_dir_p / "aggregates"
dsl_path = agg_dir / f"{profile.name}.dsl"
md_path = agg_dir / f"{profile.name}.md"
tree_path = agg_dir / f"{profile.name}.tree"
dsl_path.write_text(to_dsl_v2(profile, generated_date=date), encoding="utf-8")
md_path.write_text(to_markdown(profile), encoding="utf-8")
tree_path.write_text(to_tree(profile), encoding="utf-8")
output_paths[profile.name] = str(dsl_path)
return Result(data=AuditSummary(aggregate_profiles=tuple(profiles), output_paths=output_paths))
def render_rollups(summary: AuditSummary, output_dir: Path) -> dict[str, str]:
"""Render the 4 top-level rollup files."""
output_dir.mkdir(parents=True, exist_ok=True)
summary_path = output_dir / "summary.md"
cross_audit_path = output_dir / "cross_audit_summary.md"
decomposition_matrix_path = output_dir / "decomposition_matrix.md"
candidates_path = output_dir / "candidates.md"
profiles = summary.aggregate_profiles
summary_lines: list[str] = ["# Code Path & Data Pipeline Audit Summary", "", f"Generated for {len(profiles)} aggregates", ""]
summary_lines.append("## 4-mem-dim rollup")
summary_lines.append("")
by_dim: dict[str, list[str]] = {}
for p in profiles:
by_dim.setdefault(p.memory_dim, []).append(p.name)
for dim, names in sorted(by_dim.items()):
summary_lines.append(f"- **{dim}** ({len(names)}): {', '.join(names)}")
summary_lines.append("")
summary_lines.append("## Cross-validation verdict")
summary_lines.append("")
for p in profiles:
rc = p.result_coverage
tac = p.type_alias_coverage
summary_lines.append(f"- **{p.name}**: result_coverage={rc.summary}; type_alias_coverage={tac.summary}")
summary_path.write_text("\n".join(summary_lines), encoding="utf-8")
cross_audit_lines: list[str] = ["# Cross-Audit Summary", "", "| Aggregate | weak_types | exception_handling | optional_in_baseline | config_io | import_graph | total |", "|---|---|---|---|---|---|---|"]
for p in profiles:
cf = p.cross_audit_findings
total = len(cf.weak_types) + len(cf.exception_handling) + len(cf.optional_in_baseline) + len(cf.config_io_ownership) + len(cf.import_graph)
cross_audit_lines.append(f"| {p.name} | {len(cf.weak_types)} | {len(cf.exception_handling)} | {len(cf.optional_in_baseline)} | {len(cf.config_io_ownership)} | {len(cf.import_graph)} | {total} |")
cross_audit_path.write_text("\n".join(cross_audit_lines), encoding="utf-8")
deco_lines: list[str] = ["# Decomposition Matrix", "", "## Top 10 candidates by estimated savings", "", "| Rank | Aggregate | Direction | Est. savings (us) | Frequency | Effort | Priority |", "|---|---|---|---|---|---|---|"]
candidates_with_direction = [(p, p.decomposition_cost.componentize_savings + p.decomposition_cost.unify_savings, p.frequency, "n/a", "n/a") for p in profiles if p.decomposition_cost.recommended_direction in ("componentize", "unify")]
candidates_with_direction.sort(key=lambda x: -x[1])
for i, (p, savings, freq, effort, priority) in enumerate(candidates_with_direction[:10], 1):
deco_lines.append(f"| {i} | {p.name} | {p.decomposition_cost.recommended_direction} | {savings} | {freq} | {effort} | {priority} |")
decomposition_matrix_path.write_text("\n".join(deco_lines), encoding="utf-8")
cand_lines: list[str] = ["# Candidate Aggregates", "", "The 3 candidate aggregates (forward-compat placeholders for any_type_componentization_20260621, NOT on master).", ""]
for p in profiles:
if p.is_candidate:
cand_lines.append(f"- **{p.name}**: candidate; would be detected after any_type_componentization_20260621 merges")
candidates_path.write_text("\n".join(cand_lines), encoding="utf-8")
return {
"summary.md": str(summary_path),
"cross_audit_summary.md": str(cross_audit_path),
"decomposition_matrix.md": str(decomposition_matrix_path),
"candidates.md": str(candidates_path),
}
def code_path_audit_v2(
src_dir: str = "src",
audit_inputs_dir: str = "tests/artifacts/audit_inputs",
output_dir: str = "docs/reports/code_path_audit",
date: str | None = None,
) -> dict:
"""MCP tool wrapper for the v2 audit."""
date_str = date or date_mod.today().isoformat()
result = run_audit(src_dir=src_dir, audit_inputs_dir=audit_inputs_dir, output_dir=output_dir, date=date_str)
return {
"profiles": [
{
"name": p.name,
"kind": p.aggregate_kind,
"memory_dim": p.memory_dim,
"access_pattern": p.access_pattern,
"frequency": p.frequency,
"recommended_direction": p.decomposition_cost.recommended_direction,
"is_candidate": p.is_candidate,
}
for p in result.data.aggregate_profiles
],
"errors": [e.ui_message() for e in result.errors],
}