perf(entropy): Fix nested imports in hot paths

Hoisted imports from inside frequently-called functions to module level:

app_controller.py:
- Added traceback and inspect at module level
- Removed 3 nested traceback imports from exception handlers

gui_2.py:
- Added traceback at module level
- Removed nested traceback import from _gui_func exception handler
- Kept uvicorn lazy-loaded (only for --headless mode)

multi_agent_conductor.py:
- Removed unused 'import sys' from run()
- Removed redundant nested imports (already at module level)

Also adds audit scripts and entropy findings documentation.
This commit is contained in:
2026-05-06 20:18:30 -04:00
parent 54afbb9365
commit 2b5185a78f
8 changed files with 598 additions and 57 deletions
+266
View File
@@ -0,0 +1,266 @@
#!/usr/bin/env python3
"""
Comprehensive Entropy Audit Script for Manual Slop src/
Checks for:
1. Duplicate function definitions
2. Duplicate class definitions
3. Very long functions (>200 lines)
4. Nested imports within functions
5. Inconsistent patterns (TODO, FIXME comments)
6. Cyclomatic complexity indicators (nested conditionals)
7. Dead code indicators (unused variables, commented out code)
"""
import os
import re
import ast
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Dict, Set, Optional
@dataclass
class EntropyIssue:
file: str
line: int
severity: str # 'high', 'medium', 'low'
category: str
description: str
detail: str = ""
@dataclass
class FileAnalysis:
path: str
size_kb: float
issues: List[EntropyIssue] = field(default_factory=list)
stats: Dict = field(default_factory=dict)
class EntropyAuditor:
def __init__(self, src_dir: str = "src"):
self.src_dir = Path(src_dir)
self.issues: List[EntropyIssue] = []
self.files_analyzed = 0
self.total_lines = 0
def analyze_file(self, filepath: Path) -> FileAnalysis:
with open(filepath, encoding='utf-8', errors='ignore') as f:
content = f.read()
lines = content.split('\n')
self.total_lines += len(lines)
analysis = FileAnalysis(
path=str(filepath),
size_kb=filepath.stat().st_size / 1024
)
# 1. Check for nested imports
self._check_nested_imports(filepath, content)
# 2. Check for very long functions
self._check_long_functions(filepath, content)
# 3. Check for TODO/FIXME
self._check_todos(filepath, content)
# 4. Check for nested depth (complexity)
self._check_nesting_depth(filepath, lines)
# 5. Check for duplicate code patterns
self._check_duplicate_patterns(filepath, lines)
# 6. Check for magic numbers
self._check_magic_numbers(filepath, lines)
return analysis
def _check_nested_imports(self, filepath: Path, content: str) -> None:
"""Check for imports inside function bodies."""
tree = ast.parse(content, filename=str(filepath))
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
for child in ast.walk(node):
if isinstance(child, (ast.Import, ast.ImportFrom)):
# Check if it's not at module level
if not any(isinstance(p, (ast.Import, ast.ImportFrom)) for p in tree.body):
line = child.lineno or 0
self.issues.append(EntropyIssue(
file=str(filepath),
line=line,
severity='medium',
category='nested_import',
description=f'Nested import in function `{node.name}`',
detail=ast.unparse(child)[:100]
))
def _check_long_functions(self, filepath: Path, content: str) -> None:
"""Check for functions with >200 lines or >10 parameters."""
tree = ast.parse(content, filename=str(filepath))
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
if node.end_lineno and node.lineno:
length = node.end_lineno - node.lineno
if length > 200:
self.issues.append(EntropyIssue(
file=str(filepath),
line=node.lineno,
severity='high',
category='long_function',
description=f'Function `{node.name}` is {length} lines (>{200})',
detail=f'Lines {node.lineno}-{node.end_lineno}'
))
if len(node.args.args) > 10:
self.issues.append(EntropyIssue(
file=str(filepath),
line=node.lineno,
severity='medium',
category='too_many_params',
description=f'Function `{node.name}` has {len(node.args.args)} parameters',
detail=str(node.args.args[:5]) + '...'
))
def _check_todos(self, filepath: Path, content: str) -> None:
"""Check for TODO/FIXME/BUG comments."""
for i, line in enumerate(content.split('\n'), 1):
if re.search(r'(TODO|FIXME|BUG|HACK|XXX)', line, re.IGNORECASE):
self.issues.append(EntropyIssue(
file=str(filepath),
line=i,
severity='low',
category='tech_debt',
description=line.strip()[:80],
detail=f'Technical debt marker'
))
def _check_nesting_depth(self, filepath: Path, lines: List[str]) -> None:
"""Check for deeply nested code blocks."""
for i, line in enumerate(lines, 1):
if line and not line.strip().startswith('#'):
# Count leading spaces
stripped = line.lstrip()
indent = len(line) - len(stripped)
if indent > 20: # More than ~10 levels deep
self.issues.append(EntropyIssue(
file=str(filepath),
line=i,
severity='medium',
category='deep_nesting',
description=f'Line has {indent} spaces of indentation',
detail=line.strip()[:60]
))
def _check_duplicate_patterns(self, filepath: Path, lines: List[str]) -> None:
"""Check for consecutive duplicate non-blank lines."""
prev_line = None
dup_start = None
for i, line in enumerate(lines, 1):
stripped = line.strip()
if stripped and not stripped.startswith('#') and stripped == prev_line:
if dup_start is None:
dup_start = i - 1
else:
if dup_start and i - dup_start > 2:
self.issues.append(EntropyIssue(
file=str(filepath),
line=dup_start,
severity='high',
category='duplicate_lines',
description=f'{i - dup_start} consecutive duplicate lines starting at {dup_start}',
detail=lines[dup_start-1].strip()[:60] if dup_start <= len(lines) else ''
))
dup_start = None
prev_line = stripped
def _check_magic_numbers(self, filepath: Path, lines: List[str]) -> None:
"""Check for magic numbers (unnamed constants)."""
magic_pattern = re.compile(r'(?<!\w)([0-9]{3,})(?!\w)') # Numbers with 3+ digits
for i, line in enumerate(lines, 1):
if not line.strip().startswith('#'):
matches = magic_pattern.findall(line)
for m in matches:
self.issues.append(EntropyIssue(
file=str(filepath),
line=i,
severity='low',
category='magic_number',
description=f'Magic number: {m}',
detail=line.strip()[:60]
))
def run_audit(self) -> None:
"""Run audit on all Python files in src/."""
py_files = list(self.src_dir.glob("*.py"))
print(f"Auditing {len(py_files)} Python files in {self.src_dir}...")
for filepath in sorted(py_files):
if filepath.name == "__init__.py":
continue
try:
self.analyze_file(filepath)
self.files_analyzed += 1
except Exception as e:
print(f"Error analyzing {filepath}: {e}")
def generate_report(self) -> str:
"""Generate a markdown report of findings."""
by_severity = {'high': [], 'medium': [], 'low': []}
by_category = {}
for issue in self.issues:
by_severity[issue.severity].append(issue)
if issue.category not in by_category:
by_category[issue.category] = []
by_category[issue.category].append(issue)
report = [
"# Entropy Audit Report: src/",
"",
f"**Files Analyzed:** {self.files_analyzed}",
f"**Total Lines:** {self.total_lines:,}",
f"**Issues Found:** {len(self.issues)}",
"",
"## Summary by Severity",
"",
f"- **High:** {len(by_severity['high'])}",
f"- **Medium:** {len(by_severity['medium'])}",
f"- **Low:** {len(by_severity['low'])}",
"",
"## Summary by Category",
""
]
for cat, issues in sorted(by_category.items()):
report.append(f"- **{cat}:** {len(issues)}")
report.extend(["", "## High Severity Issues", ""])
for issue in sorted(by_severity['high'], key=lambda x: (x.file, x.line)):
report.append(f"### {issue.file}")
report.append(f"- **Line {issue.line}:** {issue.description}")
if issue.detail:
report.append(f" - Detail: `{issue.detail[:80]}`")
report.append("")
report.extend(["", "## Medium Severity Issues", ""])
for issue in sorted(by_severity['medium'], key=lambda x: (x.file, x.line))[:50]:
report.append(f"- **Line {issue.line}** ({issue.file}): {issue.description}")
if len(by_severity['medium']) > 50:
report.append(f"\n_... and {len(by_severity['medium']) - 50} more medium issues_")
return "\n".join(report)
def main():
auditor = EntropyAuditor("src")
auditor.run_audit()
report = auditor.generate_report()
print(report)
# Also write to file
report_path = "conductor/tracks/data_oriented_optimization_20260312/entropy_audit_report.md"
with open(report_path, 'w') as f:
f.write(report)
print(f"\nReport written to {report_path}")
if __name__ == "__main__":
main()
+199
View File
@@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""
Focused Entropy Audit for Manual Slop - Muratori Style
Focuses on ACTUAL issues, not style:
1. Duplicate logic (same thing done in multiple places)
2. State inconsistencies (parallel representations)
3. Logic errors / bugs
4. Performance concerns
"""
import os
import re
import ast
from pathlib import Path
from collections import defaultdict
from typing import List, Dict, Set, Tuple, Optional
def find_duplicate_logic_files():
"""Find files with similar patterns that might indicate duplicate logic."""
patterns = {
'calculate_track_progress': [],
'cascade_blocks': [],
'topological_sort': [],
'push_mma_state': [],
'active_tickets': [],
}
for f in Path('src').glob('*.py'):
content = f.read_text(encoding='utf-8', errors='ignore')
for pattern in patterns:
if re.search(pattern, content):
patterns[pattern].append(f.name)
return patterns
def check_ticket_state_management():
"""Check for state management issues in ticket handling."""
issues = []
# Check if there are parallel ticket representations
gui_2_content = Path('src/gui_2.py').read_text(encoding='utf-8', errors='ignore')
app_ctrl_content = Path('src/app_controller.py').read_text(encoding='utf-8', errors='ignore')
dag_content = Path('src/dag_engine.py').read_text(encoding='utf-8', errors='ignore')
# gui_2 uses dict-based tickets
if 'active_tickets' in gui_2_content:
if 'ticket["status"]' in gui_2_content or "t['status']" in gui_2_content:
issues.append(("gui_2.py", "Dict-based ticket access pattern found"))
# Check for blocking logic duplication
gui_blocking = len(re.findall(r'_cb_block_ticket|_cb_unblock_ticket', gui_2_content))
dag_blocking = len(re.findall(r'cascade_blocks', dag_content))
if gui_blocking > 0 and dag_blocking > 0:
issues.append(("architecture", "GUI has manual block/unblock that could conflict with DAG cascade_blocks"))
return issues
def check_import_issues():
"""Check for actual import problems - nested imports causing runtime issues."""
issues = []
for f in Path('src').glob('*.py'):
try:
content = f.read_text(encoding='utf-8', errors='ignore')
tree = ast.parse(content, filename=str(f))
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
for child in ast.walk(node):
if isinstance(child, (ast.Import, ast.ImportFrom)):
line = child.lineno or 0
# Check if this import is inside a HOT PATH function
if node.name in ['_process_pending_gui_tasks', '_gui_func', 'run', 'tick']:
issues.append((f.name, f"Nested import `{ast.unparse(child).strip()[:50]}` in hot path `{node.name}` line {line}"))
except:
pass
return issues
def check_potential_bugs():
"""Check for potential bugs - undefined variables, etc."""
bugs = []
# Check for == None vs is None patterns
for f in Path('src').glob('*.py'):
content = f.read_text(encoding='utf-8', errors='ignore')
lines = content.split('\n')
for i, line in enumerate(lines, 1):
# Skip comments
if line.strip().startswith('#'):
continue
# Check for mutable default arguments (common bug)
if re.search(r'def\s+\w+\([^)]*=\s*\[\s*\]', line):
bugs.append((f.name, i, "Mutable default argument", line.strip()[:60]))
if re.search(r'def\s+\w+\([^)]*=\s*\{\s*\}', line):
bugs.append((f.name, i, "Mutable default argument", line.strip()[:60]))
return bugs
def check_actual_duplicates():
"""Check for ACTUAL duplicate code - same logic copied."""
duplicates = []
seen_snippets = defaultdict(list)
# Look for duplicate patterns (3+ lines identical)
for f in sorted(Path('src').glob('*.py')):
try:
content = f.read_text(encoding='utf-8', errors='ignore')
lines = content.split('\n')
# Normalize and check consecutive duplicate lines
prev_normalized = None
dup_start = None
for i, line in enumerate(lines, 1):
if line.strip().startswith('#'):
prev_normalized = None
dup_start = None
continue
normalized = line.strip()
if not normalized:
prev_normalized = None
dup_start = None
continue
if normalized == prev_normalized:
if dup_start is None:
dup_start = i - 1
else:
if dup_start and i - dup_start >= 3:
# Found 3+ consecutive duplicate lines
duplicates.append((f.name, dup_start, i - 1, lines[dup_start-1].strip()[:60]))
dup_start = None
prev_normalized = normalized
except:
pass
return duplicates
def main():
print("=" * 70)
print("FOCUSED ENTROPY AUDIT - Muratori Style")
print("=" * 70)
print()
print("1. TICKET STATE MANAGEMENT ISSUES")
print("-" * 40)
issues = check_ticket_state_management()
if issues:
for issue in issues:
print(f" [{issue[0]}] {issue[1]}")
else:
print(" None found")
print()
print("2. NESTED IMPORTS IN HOT PATH FUNCTIONS")
print("-" * 40)
issues = check_import_issues()
if issues:
for fname, msg in issues[:10]:
print(f" [{fname}] {msg}")
else:
print(" None found")
print()
print("3. POTENTIAL BUGS (mutable defaults, etc)")
print("-" * 40)
issues = check_potential_bugs()
if issues:
for fname, line, bugtype, code in issues[:10]:
print(f" [{fname}:{line}] {bugtype}: {code}")
else:
print(" None found")
print()
print("4. ACTUAL DUPLICATE CODE (3+ consecutive lines)")
print("-" * 40)
duplicates = check_actual_duplicates()
if duplicates:
for fname, start, end, code in duplicates[:10]:
print(f" [{fname}:{start}-{end}] {code}")
else:
print(" None found")
print()
print("5. PATTERN USAGE ACROSS FILES")
print("-" * 40)
patterns = find_duplicate_logic_files()
for pattern, files in patterns.items():
if len(files) > 1:
print(f" {pattern}: {files}")
print()
if __name__ == "__main__":
main()