Private
Public Access
0
0

fix formatting on scripts

This commit is contained in:
2026-06-07 11:51:36 -04:00
parent 803f87137b
commit b94d949b4d
5 changed files with 263 additions and 266 deletions
+155 -158
View File
@@ -26,202 +26,199 @@ IGNORED_PATHS = {"__pycache__", ".git", "node_modules", "venv", ".venv", "env",
@dataclass
class FileStats:
path: str
total_lines: int = 0
code_lines: int = 0
comment_lines: int = 0
blank_lines: int = 0
classes: list[str] = field(default_factory=list)
functions: list[str] = field(default_factory=list)
methods: list[str] = field(default_factory=list)
top_level_decls: list[str] = field(default_factory=list)
path: str
total_lines: int = 0
code_lines: int = 0
comment_lines: int = 0
blank_lines: int = 0
classes: list[str] = field(default_factory=list)
functions: list[str] = field(default_factory=list)
methods: list[str] = field(default_factory=list)
top_level_decls: list[str] = field(default_factory=list)
@dataclass
class DirStats:
path: str
files: list[FileStats] = field(default_factory=list)
total_lines: int = 0
code_lines: int = 0
comment_lines: int = 0
blank_lines: int = 0
class_count: int = 0
function_count: int = 0
method_count: int = 0
file_count: int = 0
path: str
files: list[FileStats] = field(default_factory=list)
total_lines: int = 0
code_lines: int = 0
comment_lines: int = 0
blank_lines: int = 0
class_count: int = 0
function_count: int = 0
method_count: int = 0
file_count: int = 0
class CodeAnalyzer(ast.NodeVisitor):
def __init__(self) -> None:
self.classes: list[str] = []
self.functions: list[str] = []
self.methods: list[str] = []
self.top_level_decls: list[str] = []
self._in_class: bool = False
self._class_name: str = ""
def __init__(self) -> None:
self.classes: list[str] = []
self.functions: list[str] = []
self.methods: list[str] = []
self.top_level_decls: list[str] = []
self._in_class: bool = False
self._class_name: str = ""
def visit_ClassDef(self, node: ast.ClassDef) -> None:
self.classes.append(node.name)
self.top_level_decls.append(f"class {node.name}")
was_in_class = self._in_class
was_class_name = self._class_name
self._in_class = True
self._class_name = node.name
self.generic_visit(node)
self._in_class = was_in_class
self._class_name = was_class_name
def visit_ClassDef(self, node: ast.ClassDef) -> None:
self.classes.append(node.name)
self.top_level_decls.append(f"class {node.name}")
was_in_class = self._in_class
was_class_name = self._class_name
self._in_class = True
self._class_name = node.name
self.generic_visit(node)
self._in_class = was_in_class
self._class_name = was_class_name
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
name = node.name
if self._in_class:
self.methods.append(f"{self._class_name}.{name}")
else:
self.functions.append(name)
self.top_level_decls.append(f"def {name}")
self.generic_visit(node)
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
name = node.name
if self._in_class:
self.methods.append(f"{self._class_name}.{name}")
else:
self.functions.append(name)
self.top_level_decls.append(f"def {name}")
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
name = node.name
if self._in_class:
self.methods.append(f"{self._class_name}.{name}")
else:
self.functions.append(name)
self.top_level_decls.append(f"async def {name}")
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
name = node.name
if self._in_class:
self.methods.append(f"{self._class_name}.{name}")
else:
self.functions.append(name)
self.top_level_decls.append(f"async def {name}")
self.generic_visit(node)
def analyze_file(path: Path) -> FileStats | None:
try:
content = path.read_text(encoding="utf-8", errors="replace")
except Exception:
return None
try:
content = path.read_text(encoding="utf-8", errors="replace")
except Exception:
return None
stats = FileStats(path=str(path))
lines = content.splitlines()
stats.total_lines = len(lines)
stats = FileStats(path=str(path))
lines = content.splitlines()
stats.total_lines = len(lines)
for line in lines:
stripped = line.strip()
if not stripped:
stats.blank_lines += 1
elif stripped.startswith("#"):
stats.comment_lines += 1
else:
stats.code_lines += 1
for line in lines:
stripped = line.strip()
if not stripped: stats.blank_lines += 1
elif stripped.startswith("#"): stats.comment_lines += 1
else: stats.code_lines += 1
try:
tree = ast.parse(content, filename=str(path))
except Exception:
return stats
try:
tree = ast.parse(content, filename=str(path))
except Exception:
return stats
visitor = CodeAnalyzer()
visitor.visit(tree)
stats.classes = visitor.classes
stats.functions = visitor.functions
stats.methods = visitor.methods
stats.top_level_decls = visitor.top_level_decls
visitor = CodeAnalyzer()
visitor.visit(tree)
stats.classes = visitor.classes
stats.functions = visitor.functions
stats.methods = visitor.methods
stats.top_level_decls = visitor.top_level_decls
return stats
return stats
def walk_python_files(root: Path) -> Iterator[Path]:
for path in root.rglob("*.py"):
if any(ignored in path.parts for ignored in IGNORED_PATHS):
continue
yield path
for path in root.rglob("*.py"):
if any(ignored in path.parts for ignored in IGNORED_PATHS):
continue
yield path
def gather_dir_stats(root: Path) -> DirStats:
stats = DirStats(path=str(root))
for py_file in sorted(walk_python_files(root)):
file_stats = analyze_file(py_file)
if file_stats:
stats.files.append(file_stats)
stats.total_lines += file_stats.total_lines
stats.code_lines += file_stats.code_lines
stats.comment_lines += file_stats.comment_lines
stats.blank_lines += file_stats.blank_lines
stats.class_count += len(file_stats.classes)
stats.function_count += len(file_stats.functions)
stats.method_count += len(file_stats.methods)
stats.file_count += 1
return stats
stats = DirStats(path=str(root))
for py_file in sorted(walk_python_files(root)):
file_stats = analyze_file(py_file)
if file_stats:
stats.files.append(file_stats)
stats.total_lines += file_stats.total_lines
stats.code_lines += file_stats.code_lines
stats.comment_lines += file_stats.comment_lines
stats.blank_lines += file_stats.blank_lines
stats.class_count += len(file_stats.classes)
stats.function_count += len(file_stats.functions)
stats.method_count += len(file_stats.methods)
stats.file_count += 1
return stats
def format_bytes(num_bytes: int) -> str:
if num_bytes < 1024:
return f"{num_bytes}B"
elif num_bytes < 1024 * 1024:
return f"{num_bytes / 1024:.1f}KB"
else:
return f"{num_bytes / (1024 * 1024):.1f}MB"
if num_bytes < 1024:
return f"{num_bytes}B"
elif num_bytes < 1024 * 1024:
return f"{num_bytes / 1024:.1f}KB"
else:
return f"{num_bytes / (1024 * 1024):.1f}MB"
def print_stats(d: DirStats) -> None:
print(f"\n{'=' * 60}")
print(f" {d.path}/")
print(f"{'=' * 60}")
print(f" Files: {d.file_count:,}")
print(f" Lines: {d.total_lines:,} (code: {d.code_lines:,} | comment: {d.comment_lines:,} | blank: {d.blank_lines:,})")
print(f" Classes: {d.class_count:,}")
print(f" Functions: {d.function_count:,}")
print(f" Methods: {d.method_count:,}")
total_decls = d.class_count + d.function_count + d.method_count
print(f" Total decls: {total_decls:,}")
print(f"\n{'=' * 60}")
print(f" {d.path}/")
print(f"{'=' * 60}")
print(f" Files: {d.file_count:,}")
print(f" Lines: {d.total_lines:,} (code: {d.code_lines:,} | comment: {d.comment_lines:,} | blank: {d.blank_lines:,})")
print(f" Classes: {d.class_count:,}")
print(f" Functions: {d.function_count:,}")
print(f" Methods: {d.method_count:,}")
total_decls = d.class_count + d.function_count + d.method_count
print(f" Total decls: {total_decls:,}")
code_bytes = sum(f.total_lines * 50 for f in d.files)
print(f" Est. code size: ~{format_bytes(code_bytes)}")
code_bytes = sum(f.total_lines * 50 for f in d.files)
print(f" Est. code size: ~{format_bytes(code_bytes)}")
print()
if d.files:
print(f" {'File':<35} {'Lines':>8} {'Code':>7} {'Cmts':>6} {'Cls':>5} {'Fn':>5} {'Mth':>5}")
print(f" {'-' * 35} {'-' * 8} {'-' * 7} {'-' * 6} {'-' * 5} {'-' * 5} {'-' * 5}")
for f in sorted(d.files, key=lambda x: x.total_lines, reverse=True)[:20]:
print(f" {os.path.basename(f.path):<35} {f.total_lines:>8,} {f.code_lines:>7,} {f.comment_lines:>6,} {len(f.classes):>5} {len(f.functions):>5} {len(f.methods):>5}")
if len(d.files) > 20:
print(f" ... and {len(d.files) - 20} more files")
print()
if d.files:
print(f" {'File':<35} {'Lines':>8} {'Code':>7} {'Cmts':>6} {'Cls':>5} {'Fn':>5} {'Mth':>5}")
print(f" {'-' * 35} {'-' * 8} {'-' * 7} {'-' * 6} {'-' * 5} {'-' * 5} {'-' * 5}")
for f in sorted(d.files, key=lambda x: x.total_lines, reverse=True)[:20]:
print(f" {os.path.basename(f.path):<35} {f.total_lines:>8,} {f.code_lines:>7,} {f.comment_lines:>6,} {len(f.classes):>5} {len(f.functions):>5} {len(f.methods):>5}")
if len(d.files) > 20:
print(f" ... and {len(d.files) - 20} more files")
def main() -> None:
project_root = Path(__file__).parent.parent
print(f"Manual Slop — Codebase Statistics")
print(f"Generated: {__import__('datetime').datetime.now().isoformat()}")
print(f"Project root: {project_root}")
project_root = Path(__file__).parent.parent
print(f"Manual Slop — Codebase Statistics")
print(f"Generated: {__import__('datetime').datetime.now().isoformat()}")
print(f"Project root: {project_root}")
all_stats: list[DirStats] = []
for dirname in TARGET_DIRS:
dir_path = project_root / dirname
if dir_path.exists():
stats = gather_dir_stats(dir_path)
all_stats.append(stats)
print_stats(stats)
else:
print(f"\n[SKIPPED] {dirname}/ does not exist")
all_stats: list[DirStats] = []
for dirname in TARGET_DIRS:
dir_path = project_root / dirname
if dir_path.exists():
stats = gather_dir_stats(dir_path)
all_stats.append(stats)
print_stats(stats)
else:
print(f"\n[SKIPPED] {dirname}/ does not exist")
if len(all_stats) > 1:
combined = DirStats(path="(all)")
for s in all_stats:
combined.file_count += s.file_count
combined.total_lines += s.total_lines
combined.code_lines += s.code_lines
combined.comment_lines += s.comment_lines
combined.blank_lines += s.blank_lines
combined.class_count += s.class_count
combined.function_count += s.function_count
combined.method_count += s.method_count
print(f"\n{'=' * 60}")
print(f" COMBINED TOTALS")
print(f"{'=' * 60}")
print(f" Files: {combined.file_count:,}")
print(f" Lines: {combined.total_lines:,} (code: {combined.code_lines:,} | comment: {combined.comment_lines:,} | blank: {combined.blank_lines:,})")
print(f" Classes: {combined.class_count:,}")
print(f" Functions: {combined.function_count:,}")
print(f" Methods: {combined.method_count:,}")
total_decls = combined.class_count + combined.function_count + combined.method_count
print(f" Total decls: {total_decls:,}")
if len(all_stats) > 1:
combined = DirStats(path="(all)")
for s in all_stats:
combined.file_count += s.file_count
combined.total_lines += s.total_lines
combined.code_lines += s.code_lines
combined.comment_lines += s.comment_lines
combined.blank_lines += s.blank_lines
combined.class_count += s.class_count
combined.function_count += s.function_count
combined.method_count += s.method_count
print(f"\n{'=' * 60}")
print(f" COMBINED TOTALS")
print(f"{'=' * 60}")
print(f" Files: {combined.file_count:,}")
print(f" Lines: {combined.total_lines:,} (code: {combined.code_lines:,} | comment: {combined.comment_lines:,} | blank: {combined.blank_lines:,})")
print(f" Classes: {combined.class_count:,}")
print(f" Functions: {combined.function_count:,}")
print(f" Methods: {combined.method_count:,}")
total_decls = combined.class_count + combined.function_count + combined.method_count
print(f" Total decls: {total_decls:,}")
print(f"\nDone.")
print(f"\nDone.")
if __name__ == "__main__":
main()
main()
+36 -36
View File
@@ -10,54 +10,54 @@ import sys
from pathlib import Path
TOML_BASENAMES = {
"manual_slop", "config", "credentials",
"presets", "personas", "tool_presets",
"workspace_profiles", "tool_presets",
"manual_slop", "config", "credentials",
"presets", "personas", "tool_presets",
"workspace_profiles", "tool_presets",
}
PATTERNS = [
re.compile(rf'Path\(["\'](?:{"|".join(TOML_BASENAMES)})\.toml["\']'),
re.compile(rf'open\(["\'](?:{"|".join(TOML_BASENAMES)})\.toml["\']'),
re.compile(rf'["\']\.{{1,2}}/(?:{"|".join(TOML_BASENAMES)})\.toml["\']'),
re.compile(rf'Path\(["\']\.\./(?:{"|".join(TOML_BASENAMES)})\.toml["\']'),
re.compile(rf'Path\(["\'](?:{"|".join(TOML_BASENAMES)})\.toml["\']'),
re.compile(rf'open\(["\'](?:{"|".join(TOML_BASENAMES)})\.toml["\']'),
re.compile(rf'["\']\.{{1,2}}/(?:{"|".join(TOML_BASENAMES)})\.toml["\']'),
re.compile(rf'Path\(["\']\.\./(?:{"|".join(TOML_BASENAMES)})\.toml["\']'),
]
EXCLUDE_DIRS = {"artifacts", "logs", "__pycache__", "snapshots"}
def find_violations(tests_dir: Path) -> list[tuple[Path, int, str]]:
violations = []
for test_file in tests_dir.rglob("test_*.py"):
if any(excluded in test_file.parts for excluded in EXCLUDE_DIRS):
continue
try:
content = test_file.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError):
continue
for lineno, line in enumerate(content.splitlines(), start=1):
for pattern in PATTERNS:
if pattern.search(line):
violations.append((test_file, lineno, line.strip()))
break
return violations
violations = []
for test_file in tests_dir.rglob("test_*.py"):
if any(excluded in test_file.parts for excluded in EXCLUDE_DIRS):
continue
try:
content = test_file.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError):
continue
for lineno, line in enumerate(content.splitlines(), start=1):
for pattern in PATTERNS:
if pattern.search(line):
violations.append((test_file, lineno, line.strip()))
break
return violations
def main() -> int:
repo_root = Path(__file__).resolve().parent.parent
tests_dir = repo_root / "tests"
if not tests_dir.exists():
print(f"Tests dir not found: {tests_dir}", file=sys.stderr)
return 1
violations = find_violations(tests_dir)
if not violations:
print("OK: No tests reference real TOML files.")
return 0
print(f"FAIL: {len(violations)} test(s) reference real TOML files:")
for path, lineno, line in violations:
rel = path.relative_to(repo_root)
print(f" {rel}:{lineno}: {line}")
return 1
repo_root = Path(__file__).resolve().parent.parent
tests_dir = repo_root / "tests"
if not tests_dir.exists():
print(f"Tests dir not found: {tests_dir}", file=sys.stderr)
return 1
violations = find_violations(tests_dir)
if not violations:
print("OK: No tests reference real TOML files.")
return 0
print(f"FAIL: {len(violations)} test(s) reference real TOML files:")
for path, lineno, line in violations:
rel = path.relative_to(repo_root)
print(f" {rel}:{lineno}: {line}")
return 1
if __name__ == "__main__":
sys.exit(main())
sys.exit(main())
+3 -3
View File
@@ -13,9 +13,9 @@ LOG_FILE: str = 'logs/errors/mma_delegation.log'
def generate_skeleton(code: str) -> str:
"""
Parses Python code and replaces function/method bodies with '...',
preserving docstrings if present.
"""
Parses Python code and replaces function/method bodies with '...',
preserving docstrings if present.
"""
try:
PY_LANGUAGE = tree_sitter.Language(tree_sitter_python.language())
parser = tree_sitter.Parser(PY_LANGUAGE)
+40 -40
View File
@@ -2,46 +2,46 @@ import sys
import json
def main():
while True:
line = sys.stdin.readline()
if not line:
break
try:
req = json.loads(line)
method = req.get("method")
req_id = req.get("id")
if method == "tools/list":
resp = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"tools": [
{"name": "echo", "description": "Echo input", "inputSchema": {"type": "object"}}
]
}
}
elif method == "tools/call":
name = req["params"].get("name")
args = req["params"].get("arguments", {})
if name == "echo":
resp = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [{"type": "text", "text": f"ECHO: {args}"}]
}
}
else:
resp = {"jsonrpc": "2.0", "id": req_id, "error": {"message": "Unknown tool"}}
else:
resp = {"jsonrpc": "2.0", "id": req_id, "error": {"message": "Unknown method"}}
sys.stdout.write(json.dumps(resp) + "\n")
sys.stdout.flush()
except Exception as e:
sys.stderr.write(f"Error: {e}\n")
sys.stderr.flush()
while True:
line = sys.stdin.readline()
if not line:
break
try:
req = json.loads(line)
method = req.get("method")
req_id = req.get("id")
if method == "tools/list":
resp = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"tools": [
{"name": "echo", "description": "Echo input", "inputSchema": {"type": "object"}}
]
}
}
elif method == "tools/call":
name = req["params"].get("name")
args = req["params"].get("arguments", {})
if name == "echo":
resp = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [{"type": "text", "text": f"ECHO: {args}"}]
}
}
else:
resp = {"jsonrpc": "2.0", "id": req_id, "error": {"message": "Unknown tool"}}
else:
resp = {"jsonrpc": "2.0", "id": req_id, "error": {"message": "Unknown method"}}
sys.stdout.write(json.dumps(resp) + "\n")
sys.stdout.flush()
except Exception as e:
sys.stderr.write(f"Error: {e}\n")
sys.stderr.flush()
if __name__ == "__main__":
main()
+29 -29
View File
@@ -3,34 +3,34 @@ import subprocess
import sys
def run_tests():
test_dir = "tests"
test_files = [f for f in os.listdir(test_dir) if f.startswith("test_") and f.endswith(".py")]
test_files.sort()
batch_size = 4
all_failed = []
print(f"Starting test execution of {len(test_files)} files in batches of {batch_size}...")
for i in range(0, len(test_files), batch_size):
batch = test_files[i:i + batch_size]
cmd = ["uv", "run", "pytest", "--maxfail=10"] + [os.path.join(test_dir, f) for f in batch]
print(f"\nBatch {i//batch_size + 1}: {' '.join(batch)}")
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError:
print(f"Batch {i//batch_size + 1} failed.")
all_failed.extend(batch)
if all_failed:
print("\n" + "="*30)
print(f"Total batches with failures: {len(all_failed)//batch_size + 1 if len(all_failed)%batch_size else len(all_failed)//batch_size}")
print("Files in failed batches:")
for f in all_failed:
print(f" - {f}")
print("="*30)
else:
print("\nAll batches passed successfully!")
test_dir = "tests"
test_files = [f for f in os.listdir(test_dir) if f.startswith("test_") and f.endswith(".py")]
test_files.sort()
batch_size = 32
all_failed = []
print(f"Starting test execution of {len(test_files)} files in batches of {batch_size}...")
for i in range(0, len(test_files), batch_size):
batch = test_files[i:i + batch_size]
cmd = ["uv", "run", "pytest", "--maxfail=10"] + [os.path.join(test_dir, f) for f in batch]
print(f"\nBatch {i//batch_size + 1}: {' '.join(batch)}")
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError:
print(f"Batch {i//batch_size + 1} failed.")
all_failed.extend(batch)
if all_failed:
print("\n" + "="*30)
print(f"Total batches with failures: {len(all_failed)//batch_size + 1 if len(all_failed)%batch_size else len(all_failed)//batch_size}")
print("Files in failed batches:")
for f in all_failed:
print(f" - {f}")
print("="*30)
else:
print("\nAll batches passed successfully!")
if __name__ == "__main__":
run_tests()
run_tests()