Private
Public Access
0
0

feat(audit): implement Phase 7 cross-audit integration + Phase 8.1 DSL arity

Phase 7: read_input_json (stdlib I/O boundary), INPUT_JSON_CONTRACTS
(6 input sources), find_enclosing_function (3-tier mapping tier 1),
compute_result_coverage (cross-check of doeh), compute_type_alias_coverage
(cross-check of dss), aggregate_cross_audit_findings (per-aggregate
bucketing), run_all_cross_audit_reads (convenience).

Phase 8 Task 8.1: DSL_WORD_ARITY_V2 (14 new tagged words).

15 new unit tests passing. 111 total tests passing.

Phase 8 Tasks 8.2-8.5 (4 renderers + parser) next.
This commit is contained in:
2026-06-22 01:49:14 -04:00
parent ae5dcb775e
commit e59334a303
6 changed files with 840 additions and 1 deletions
+208 -1
View File
@@ -578,4 +578,211 @@ def compute_decomposition_cost(
batch_size=None,
struct_field_count=struct_field_count,
struct_frozen=struct_frozen,
)
)
import json
import json
def read_input_json(path: str) -> Result[dict]:
"""Read a JSON file and return Result[dict].
Per error_handling.md stdlib I/O boundary pattern: catches
OSError (missing/permission denied) and json.JSONDecodeError (malformed
JSON), converts to ErrorInfo.
"""
p = Path(path)
try:
raw = p.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError) as e:
return Result(
data={},
errors=[ErrorInfo(
kind=ErrorKind.NOT_FOUND,
message=f"Cannot read {path}: {e}",
source="read_input_json",
original=e,
)],
)
try:
data = json.loads(raw)
except json.JSONDecodeError as e:
return Result(
data={},
errors=[ErrorInfo(
kind=ErrorKind.INVALID_INPUT,
message=f"Malformed JSON in {path}: {e}",
source="read_input_json",
original=e,
)],
)
if not isinstance(data, dict):
return Result(
data={},
errors=[ErrorInfo(
kind=ErrorKind.INVALID_INPUT,
message=f"JSON root in {path} is not a dict",
source="read_input_json",
)],
)
return Result(data=data)
INPUT_JSON_CONTRACTS: dict[str, dict[str, str]] = {
"audit_weak_types": {
"producer": "scripts/audit_weak_types.py --json",
"filename": "audit_weak_types.json",
},
"audit_exception_handling": {
"producer": "scripts/audit_exception_handling.py --json",
"filename": "audit_exception_handling.json",
},
"audit_optional_in_3_files": {
"producer": "scripts/audit_optional_in_3_files.py --json",
"filename": "audit_optional_in_3_files.json",
},
"audit_no_models_config_io": {
"producer": "scripts/audit_no_models_config_io.py --json",
"filename": "audit_no_models_config_io.json",
},
"audit_main_thread_imports": {
"producer": "scripts/audit_main_thread_imports.py --json",
"filename": "audit_main_thread_imports.json",
},
"type_registry": {
"producer": "scripts/generate_type_registry.py --json",
"filename": "type_registry.json",
},
}
def find_enclosing_function(
file: str,
line: int,
function_refs: list[FunctionRef],
) -> FunctionRef | None:
"""Tier 1 of the 3-tier mapping: find the function ref at (file, line)."""
candidates = [r for r in function_refs if r.file == file and r.line <= line]
if not candidates:
return None
return max(candidates, key=lambda r: r.line)
def compute_result_coverage(
producers: list[FunctionRef],
consumers: list[FunctionRef],
branches_on_errors: set[str],
) -> ResultCoverage:
"""Compute the per-aggregate result coverage.
result_producers: total number of producers (the caller is responsible
for filtering to Result[T] producers; this function reports the raw
count).
result_consumers: consumers whose fqname is in branches_on_errors
(the caller passes the set from AST analysis).
"""
total_producers = len(producers)
result_producers = total_producers
total_consumers = len(consumers)
result_consumers = len({c.fqname for c in consumers if c.fqname in branches_on_errors})
if total_producers > 0 and result_producers == total_producers:
pct_p = 100
else:
pct_p = (result_producers / total_producers * 100) if total_producers > 0 else 0
pct_c = (result_consumers / total_consumers * 100) if total_consumers > 0 else 0
summary = f"{result_producers}/{total_producers} producers return Result[T] ({pct_p:.0f}%); {result_consumers}/{total_consumers} consumers branch on .errors ({pct_c:.0f}%)"
return ResultCoverage(
total_producers=total_producers,
result_producers=result_producers,
total_consumers=total_consumers,
result_consumers=result_consumers,
summary=summary,
)
def compute_type_alias_coverage(total_sites: int, typed_sites: int) -> TypeAliasCoverage:
"""Compute the per-aggregate type alias coverage."""
untyped = total_sites - typed_sites
pct_typed = (typed_sites / total_sites * 100) if total_sites > 0 else 0
pct_untyped = (untyped / total_sites * 100) if total_sites > 0 else 0
summary = f"{total_sites} total sites; {typed_sites} typed ({pct_typed:.0f}%); {untyped} untyped ({pct_untyped:.0f}%)"
return TypeAliasCoverage(
total_sites=total_sites,
typed_sites=typed_sites,
untyped_sites=untyped,
summary=summary,
)
def aggregate_cross_audit_findings(
audit_name: str,
findings: list[dict],
example_file: str,
example_line: int,
) -> CrossAuditFindings:
"""Aggregate audit findings into a per-aggregate CrossAuditFindings.
Returns all-empty CrossAuditFindings when findings is empty (the
empty audit case is represented by 5 empty tuples, not 5 tuples
of zero-count CrossAuditFinding entries).
"""
empty = ()
if not findings:
return CrossAuditFindings(weak_types=empty, exception_handling=empty, optional_in_baseline=empty, config_io_ownership=empty, import_graph=empty)
site_count = len(findings)
note = f"{site_count} sites in producer+consumer functions"
finding = CrossAuditFinding(
audit_script=audit_name,
site_count=site_count,
example_file=example_file,
example_line=example_line,
note=note,
)
buckets = {
"audit_weak_types": "weak_types",
"audit_exception_handling": "exception_handling",
"audit_optional_in_3_files": "optional_in_baseline",
"audit_no_models_config_io": "config_io_ownership",
"audit_main_thread_imports": "import_graph",
}
field = buckets.get(audit_name)
if field is None:
return CrossAuditFindings(weak_types=empty, exception_handling=empty, optional_in_baseline=empty, config_io_ownership=empty, import_graph=empty)
kwargs = {f: empty for f in buckets.values()}
kwargs[field] = (finding,)
return CrossAuditFindings(**kwargs)
def run_all_cross_audit_reads(audit_inputs_dir: str) -> dict[str, dict]:
"""Read all 6 input JSONs from audit_inputs_dir.
Returns a dict keyed by audit_name. Missing and malformed files
are tolerated (return empty dict).
"""
out: dict[str, dict] = {}
p = Path(audit_inputs_dir)
if not p.exists():
return out
for audit_name, contract in INPUT_JSON_CONTRACTS.items():
json_path = p / contract["filename"]
if not json_path.exists():
out[audit_name] = {}
continue
result = read_input_json(str(json_path))
if result.ok:
out[audit_name] = result.data
else:
out[audit_name] = {}
return out
DSL_WORD_ARITY_V2: dict[str, int] = {
"kind": 1,
"mem-dim": 1,
"fn-ref": 4,
"access-pattern": 1,
"ap-evidence": 4,
"frequency": 1,
"freq-evidence": 4,
"result-coverage": 5,
"type-alias-coverage": 4,
"cross-audit-finding": 5,
"cross-audit-findings": 5,
"decomp-cost": 8,
"opt-candidate": 7,
"is-candidate": 1,
}