chore(scripts): remove one-shot migrators and repros
These 6 scripts were one-shot migration tools and repros from past tracks. The migrations are done; the bugs are fixed; the SDM tags are in place. Removed (6 files, ~22 KB): - migrate_cruft.ps1 (2.6 KB) - filesystem cruft migration (done in consolidate_cruft_and_log_taxonomy_20260228) - profile_baseline.py (2.4 KB) - profiling baseline (baselines live in docs/reports/) - repro_history.py (2.3 KB) - repro for fixed history bug (bug fixed in hot_reload_python_20260516) - sdm_injector.py (6.8 KB) - SDM tag injector (tags in place since sdm_docstrings_20260509) - sdm_mapper.py (7.3 KB) - SDM tag mapper (pilot) (tags in place) - update_paths.py (789 B) - sys.path patcher (src/ layout is now standard)
This commit is contained in:
@@ -1,60 +0,0 @@
|
||||
$projectRoot = Resolve-Path (Join-Path $PSScriptRoot "..")
|
||||
$logsDir = Join-Path $projectRoot "logs"
|
||||
$sessionsDir = Join-Path $logsDir "sessions"
|
||||
$agentsDir = Join-Path $logsDir "agents"
|
||||
$errorsDir = Join-Path $logsDir "errors"
|
||||
$testsDir = Join-Path $projectRoot "tests"
|
||||
$artifactsDir = Join-Path $testsDir "artifacts"
|
||||
|
||||
# Ensure target directories exist
|
||||
New-Item -ItemType Directory -Force -Path $sessionsDir | Out-Null
|
||||
New-Item -ItemType Directory -Force -Path $agentsDir | Out-Null
|
||||
New-Item -ItemType Directory -Force -Path $errorsDir | Out-Null
|
||||
New-Item -ItemType Directory -Force -Path $artifactsDir | Out-Null
|
||||
|
||||
Write-Host "Migrating logs and temporary files to new taxonomy..."
|
||||
|
||||
# 1. Move temp files
|
||||
Get-ChildItem -Path $projectRoot -Filter "temp_*" -File | ForEach-Object {
|
||||
Write-Host "Moving $($_.Name) to tests/artifacts/"
|
||||
Move-Item -Path $_.FullName -Destination $artifactsDir -Force
|
||||
}
|
||||
if (Test-Path $testsDir) {
|
||||
Get-ChildItem -Path $testsDir -Filter "temp_*" -File | ForEach-Object {
|
||||
Write-Host "Moving $($_.Name) to tests/artifacts/"
|
||||
Move-Item -Path $_.FullName -Destination $artifactsDir -Force
|
||||
}
|
||||
}
|
||||
|
||||
# 2. Move MMA logs to logs/agents/
|
||||
Get-ChildItem -Path $logsDir -Filter "mma_*.log" -File | ForEach-Object {
|
||||
Write-Host "Moving $($_.Name) to logs/agents/"
|
||||
Move-Item -Path $_.FullName -Destination $agentsDir -Force
|
||||
}
|
||||
|
||||
# 3. Move error/test logs to logs/errors/
|
||||
Get-ChildItem -Path $logsDir -Filter "*.log" -File | Where-Object { $_.Name -like "*test*" -or $_.Name -like "gui_*.log" } | ForEach-Object {
|
||||
Write-Host "Moving $($_.Name) to logs/errors/"
|
||||
Move-Item -Path $_.FullName -Destination $errorsDir -Force
|
||||
}
|
||||
|
||||
# 4. Move log_registry.toml to logs/sessions/
|
||||
if (Test-Path (Join-Path $logsDir "log_registry.toml")) {
|
||||
Write-Host "Moving log_registry.toml to logs/sessions/"
|
||||
Move-Item -Path (Join-Path $logsDir "log_registry.toml") -Destination $sessionsDir -Force
|
||||
}
|
||||
|
||||
# 5. Move session directories to logs/sessions/
|
||||
# Pattern: Starts with 202 (year)
|
||||
Get-ChildItem -Path $logsDir -Directory | Where-Object { $_.Name -match "^202\d" } | ForEach-Object {
|
||||
Write-Host "Moving session directory $($_.Name) to logs/sessions/"
|
||||
Move-Item -Path $_.FullName -Destination $sessionsDir -Force
|
||||
}
|
||||
|
||||
# 6. Move remaining .log files to logs/sessions/
|
||||
Get-ChildItem -Path $logsDir -Filter "*.log" -File | ForEach-Object {
|
||||
Write-Host "Moving session log $($_.Name) to logs/sessions/"
|
||||
Move-Item -Path $_.FullName -Destination $sessionsDir -Force
|
||||
}
|
||||
|
||||
Write-Host "Migration complete."
|
||||
@@ -1,78 +0,0 @@
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
# Add project root to sys.path
|
||||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||||
if project_root not in sys.path:
|
||||
sys.path.append(project_root)
|
||||
|
||||
from src.performance_monitor import get_monitor
|
||||
from src.aggregate import build_file_items
|
||||
from src.dag_engine import TrackDAG
|
||||
from src.models import Ticket
|
||||
|
||||
def run_aggregation_profile():
|
||||
monitor = get_monitor()
|
||||
base_dir = Path(project_root)
|
||||
# Get 30 python files for profiling
|
||||
files = [str(p.relative_to(base_dir)) for p in base_dir.glob("src/*.py")][:30]
|
||||
with monitor.scope("aggregation"):
|
||||
build_file_items(base_dir, files)
|
||||
|
||||
def run_dag_profile():
|
||||
monitor = get_monitor()
|
||||
tickets = []
|
||||
# Create 60 tickets with multiple dependencies to simulate complexity
|
||||
for i in range(60):
|
||||
deps = []
|
||||
if i > 0: deps.append(f"ticket_{i-1}")
|
||||
if i > 5: deps.append(f"ticket_{i-5}")
|
||||
if i > 10: deps.append(f"ticket_{i-10}")
|
||||
tickets.append(Ticket(
|
||||
id=f"ticket_{i}",
|
||||
description=f"Ticket {i}",
|
||||
depends_on=deps
|
||||
))
|
||||
dag = TrackDAG(tickets)
|
||||
with monitor.scope("dag_operations"):
|
||||
# Run operations 50 times per profile call
|
||||
for _ in range(50):
|
||||
dag.topological_sort()
|
||||
dag.has_cycle()
|
||||
|
||||
def print_metrics():
|
||||
monitor = get_monitor()
|
||||
metrics = monitor.get_metrics()
|
||||
print(f"{'Component':<30} | {'Avg (ms)':<12} | {'Count':<8} | {'Max (ms)':<12} | {'Min (ms)':<12}")
|
||||
print("-" * 85)
|
||||
# Collect all base component names
|
||||
components = set()
|
||||
for key in metrics.keys():
|
||||
if key.startswith("time_") and key.endswith("_ms") and not key.endswith("_avg"):
|
||||
components.add(key[5:-3])
|
||||
|
||||
for comp in sorted(list(components)):
|
||||
avg = metrics.get(f"time_{comp}_ms_avg", 0.0)
|
||||
count = int(metrics.get(f"count_{comp}", 0))
|
||||
max_val = metrics.get(f"max_{comp}_ms", 0.0)
|
||||
min_val = metrics.get(f"min_{comp}_ms", 0.0)
|
||||
print(f"{comp:<30} | {avg:<12.4f} | {count:<8} | {max_val:<12.4f} | {min_val:<12.4f}")
|
||||
|
||||
def main():
|
||||
monitor = get_monitor()
|
||||
monitor.enabled = True
|
||||
print("Starting Profiling Baseline...")
|
||||
print("Running aggregation profile (5 iterations)...")
|
||||
for _ in range(5):
|
||||
run_aggregation_profile()
|
||||
print("Running DAG profile (5 iterations)...")
|
||||
for _ in range(5):
|
||||
run_dag_profile()
|
||||
print("\nBaseline Performance Metrics:")
|
||||
print_metrics()
|
||||
monitor.stop()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,65 +0,0 @@
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
project_root = os.path.abspath(os.getcwd())
|
||||
sys.path.append(project_root)
|
||||
sys.path.append(os.path.join(project_root, 'src'))
|
||||
|
||||
from src import app_controller, gui_2, models, project_manager, events
|
||||
|
||||
def test_repro():
|
||||
print("Initializing App...")
|
||||
app = gui_2.App()
|
||||
ctrl = app.controller
|
||||
|
||||
# 1. Scaffold a fake project
|
||||
proj_path = os.path.abspath("tests/artifacts/repro_project.toml")
|
||||
if os.path.exists(proj_path): os.remove(proj_path)
|
||||
|
||||
print(f"Scaffolding project at {proj_path}...")
|
||||
proj = project_manager.default_project("Repro")
|
||||
project_manager.save_project(proj, proj_path)
|
||||
|
||||
# 2. Load the project
|
||||
ctrl.active_project_path = proj_path
|
||||
ctrl.init_state()
|
||||
|
||||
# 3. Enable history
|
||||
ctrl.ui_auto_add_history = True
|
||||
print(f"ui_auto_add_history set to: {ctrl.ui_auto_add_history}")
|
||||
|
||||
# 4. Simulate a request event
|
||||
print("Triggering user_request event...")
|
||||
event_payload = events.UserRequestEvent(prompt="Hello Test", stable_md="Context", file_items=[], disc_text="", base_dir=".")
|
||||
# The event is processed by _handle_request_event which calls ai_client.send
|
||||
# But ai_client.send emits events via its global emitter.
|
||||
|
||||
from src import ai_client
|
||||
# Simulate ai_client emitting the "request_start" event which should trigger _on_api_event
|
||||
print("Emitting request_start event...")
|
||||
ai_client.events.emit("request_start", payload={
|
||||
"provider": "repro",
|
||||
"model": "repro-model",
|
||||
"message": "Hello Test"
|
||||
}, kind="request")
|
||||
|
||||
# 5. Wait for history processor
|
||||
print("Waiting for history processing...")
|
||||
# Headless mode would call it in queue_fallback, but we can call it manually
|
||||
time.sleep(0.5)
|
||||
ctrl._process_pending_history_adds()
|
||||
|
||||
print(f"History entries: {len(ctrl.disc_entries)}")
|
||||
for e in ctrl.disc_entries:
|
||||
print(f" - {e.get('role')}: {e.get('content')}")
|
||||
|
||||
if len(ctrl.disc_entries) == 0:
|
||||
print("REPRODUCED: History is empty!")
|
||||
else:
|
||||
print("SUCCESS: History updated.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_repro()
|
||||
@@ -1,173 +0,0 @@
|
||||
import ast
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import sys
|
||||
import re
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
|
||||
def find_closing_quotes_pos(line: str) -> Tuple[int, str]:
|
||||
pos_double = line.rfind('"""')
|
||||
pos_single = line.rfind("'''")
|
||||
if pos_double != -1 and pos_single != -1:
|
||||
if pos_double > pos_single: return pos_double, '"""'
|
||||
else: return pos_single, "'''"
|
||||
elif pos_double != -1:
|
||||
return pos_double, '"""'
|
||||
elif pos_single != -1:
|
||||
return pos_single, "'''"
|
||||
return -1, ""
|
||||
|
||||
class SdmDocstringInjectorVisitor(ast.NodeVisitor):
|
||||
def __init__(self, file_path: str, sdm_tags_map: Dict[str, Any], lines: List[str]):
|
||||
self.file_path = file_path
|
||||
self.sdm_tags_map = sdm_tags_map
|
||||
self.lines = lines
|
||||
self.targets_to_modify = []
|
||||
self.current_class_name = None
|
||||
self.project_root = pathlib.Path.cwd().resolve()
|
||||
|
||||
def get_rel_path(self, path):
|
||||
p = pathlib.Path(path).resolve()
|
||||
try:
|
||||
return str(p.relative_to(self.project_root)).replace("\\", "/")
|
||||
except (ValueError, RuntimeError):
|
||||
return str(p).replace("\\", "/")
|
||||
|
||||
def _get_sdm_tags(self, name: str, node_type: str, parent_class_name: Optional[str] = None) -> List[str]:
|
||||
relative_file_path = self.get_rel_path(self.file_path)
|
||||
file_data = self.sdm_tags_map.get(relative_file_path)
|
||||
if not file_data: return []
|
||||
tags = []
|
||||
if node_type == 'ClassDef':
|
||||
class_data = file_data.get('classes', {}).get(name, {})
|
||||
class_tag = class_data.get('class_tag')
|
||||
if class_tag: tags.append(class_tag)
|
||||
elif node_type in ('FunctionDef', 'AsyncFunctionDef'):
|
||||
if parent_class_name:
|
||||
class_data = file_data.get('classes', {}).get(parent_class_name, {})
|
||||
tag = class_data.get('methods', {}).get(name)
|
||||
if tag: tags.append(tag)
|
||||
else:
|
||||
tag = file_data.get('functions', {}).get(name)
|
||||
if tag: tags.append(tag)
|
||||
return tags
|
||||
|
||||
def _process_node(self, node, node_type: str):
|
||||
if not node.body: return
|
||||
sdm_tags = self._get_sdm_tags(node.name, node_type, self.current_class_name)
|
||||
first_body_node = node.body[0]
|
||||
if (node.lineno == first_body_node.lineno): return
|
||||
|
||||
docstring_node = None
|
||||
if isinstance(node.body[0], ast.Expr) and \
|
||||
isinstance(node.body[0].value, ast.Constant) and isinstance(node.body[0].value.value, str):
|
||||
docstring_node = node.body[0].value
|
||||
|
||||
# Use col_offset of the first body node for exact matching
|
||||
body_indent_count = first_body_node.col_offset
|
||||
|
||||
if docstring_node:
|
||||
self.targets_to_modify.append({
|
||||
'type': 'append', 'node': node, 'name': node.name, 'sdm_tags': sdm_tags,
|
||||
'start_lineno': docstring_node.lineno, 'end_lineno': docstring_node.end_lineno,
|
||||
'indent_count': body_indent_count, 'existing_doc': docstring_node.value
|
||||
})
|
||||
elif sdm_tags:
|
||||
self.targets_to_modify.append({
|
||||
'type': 'new', 'node': node, 'name': node.name, 'sdm_tags': sdm_tags,
|
||||
'insert_lineno': first_body_node.lineno, 'indent_count': body_indent_count
|
||||
})
|
||||
|
||||
def visit_ClassDef(self, node):
|
||||
self._process_node(node, 'ClassDef')
|
||||
old_class = self.current_class_name
|
||||
self.current_class_name = node.name
|
||||
self.generic_visit(node)
|
||||
self.current_class_name = old_class
|
||||
|
||||
def visit_FunctionDef(self, node):
|
||||
self._process_node(node, 'FunctionDef')
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_AsyncFunctionDef(self, node):
|
||||
self._process_node(node, 'AsyncFunctionDef')
|
||||
self.generic_visit(node)
|
||||
|
||||
def strip_tags(docstring: str) -> str:
|
||||
lines = docstring.splitlines()
|
||||
new_lines = []
|
||||
for line in lines:
|
||||
if re.search(r'\[C:.*\]|\[M:.*\]|\[U:.*\]|\[VARS:.*\]', line): continue
|
||||
new_lines.append(line)
|
||||
while new_lines and not new_lines[-1].strip(): new_lines.pop()
|
||||
return "\n".join(new_lines)
|
||||
|
||||
def process_file(py_file_path: pathlib.Path, sdm_tags_map):
|
||||
try:
|
||||
with open(py_file_path, 'r', encoding='utf-8') as f: content = f.read()
|
||||
lines = content.splitlines()
|
||||
if not lines: return
|
||||
try: tree = ast.parse(content)
|
||||
except SyntaxError: return
|
||||
visitor = SdmDocstringInjectorVisitor(str(py_file_path.resolve()), sdm_tags_map, lines)
|
||||
visitor.visit(tree)
|
||||
if not visitor.targets_to_modify: return
|
||||
visitor.targets_to_modify.sort(key=lambda t: t['node'].lineno, reverse=True)
|
||||
modified_lines = lines[:]
|
||||
file_modified = False
|
||||
for target in visitor.targets_to_modify:
|
||||
sdm_tags = target['sdm_tags']
|
||||
indent = " " * target['indent_count']
|
||||
if target['type'] == 'append':
|
||||
clean_doc = strip_tags(target['existing_doc'])
|
||||
if sdm_tags:
|
||||
prepared_tags = [f"{indent}{line}" for t in sdm_tags for line in t.splitlines()]
|
||||
new_content = (clean_doc + "\n" + "\n".join(prepared_tags)) if clean_doc.strip() else "\n".join(prepared_tags)
|
||||
else:
|
||||
new_content = clean_doc
|
||||
start_idx = target['start_lineno'] - 1
|
||||
end_idx = target['end_lineno'] - 1
|
||||
first_line, last_line = modified_lines[start_idx], modified_lines[end_idx]
|
||||
q_start_pos = first_line.find('"""')
|
||||
if q_start_pos == -1: q_start_pos = first_line.find("'''")
|
||||
q_end_pos, q_type = find_closing_quotes_pos(last_line)
|
||||
if q_start_pos != -1 and q_end_pos != -1:
|
||||
q_prefix, q_suffix = first_line[:q_start_pos + 3], last_line[q_end_pos:]
|
||||
if "\n" in new_content or (start_idx != end_idx):
|
||||
replacement = [q_prefix] + [f"{indent}{l}" for l in new_content.splitlines()] + [f"{indent}{q_suffix}"]
|
||||
else:
|
||||
replacement = [f"{q_prefix}{new_content}{q_suffix}"]
|
||||
modified_lines[start_idx:end_idx+1] = replacement
|
||||
file_modified = True
|
||||
elif sdm_tags:
|
||||
prepared_tags = [f"{indent}{line}" for t in sdm_tags for line in t.splitlines()]
|
||||
new_doc = [f'{indent}"""', "\n".join(prepared_tags), f'{indent}"""']
|
||||
insert_idx = target['insert_lineno'] - 1
|
||||
while insert_idx > 0 and not modified_lines[insert_idx-1].strip(): insert_idx -= 1
|
||||
modified_lines[insert_idx:insert_idx] = new_doc
|
||||
file_modified = True
|
||||
if file_modified:
|
||||
with open(py_file_path, 'w', encoding='utf-8') as f: f.write("\n".join(modified_lines))
|
||||
except Exception as e: print(f"Error processing {py_file_path}: {e}", file=sys.stderr)
|
||||
|
||||
def main():
|
||||
sdm_report_path = "sdm_report_refined.json"
|
||||
if not pathlib.Path(sdm_report_path).exists():
|
||||
print(f"Error: {sdm_report_path} not found.", file=sys.stderr); sys.exit(1)
|
||||
with open(sdm_report_path, 'r', encoding='utf-8') as f: sdm_tags_map = json.load(f)
|
||||
targets = sys.argv[1:]
|
||||
if not targets:
|
||||
for d in ["src", "simulation", "tests"]:
|
||||
sd = pathlib.Path(d)
|
||||
if sd.exists():
|
||||
for f in sd.rglob("*.py"): process_file(f, sdm_tags_map)
|
||||
else:
|
||||
for t in targets:
|
||||
tp = pathlib.Path(t)
|
||||
if tp.is_file(): process_file(tp, sdm_tags_map)
|
||||
elif tp.is_dir():
|
||||
for f in tp.rglob("*.py"): process_file(f, sdm_tags_map)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,206 +0,0 @@
|
||||
import ast
|
||||
import os
|
||||
import json
|
||||
import sys
|
||||
import pathlib
|
||||
|
||||
class SDMMapper:
|
||||
def __init__(self):
|
||||
self.files = {} # path -> {"functions": {}, "classes": {}}
|
||||
self.functions_global = {} # name -> {"file": str, "class": str, "callers": set()}
|
||||
self.current_file = ""
|
||||
self.current_class = None
|
||||
self.current_function = None
|
||||
self.project_root = pathlib.Path.cwd().resolve()
|
||||
|
||||
def get_rel_path(self, path):
|
||||
p = pathlib.Path(path).resolve()
|
||||
try:
|
||||
return str(p.relative_to(self.project_root)).replace("\\", "/")
|
||||
except (ValueError, RuntimeError):
|
||||
return str(p).replace("\\", "/")
|
||||
|
||||
def collect_symbols(self, dirs):
|
||||
for d in dirs:
|
||||
if not os.path.exists(d): continue
|
||||
for root, _, files in os.walk(d):
|
||||
for f in files:
|
||||
if f.endswith(".py"):
|
||||
path = os.path.join(root, f)
|
||||
rel_path = self.get_rel_path(path)
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8-sig") as file:
|
||||
tree = ast.parse(file.read(), filename=path)
|
||||
if rel_path not in self.files:
|
||||
self.files[rel_path] = {"functions": {}, "classes": {}}
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||
full_name = node.name
|
||||
# In first pass, we just note definitions.
|
||||
# Class-member identification happens in visit_ClassDef.
|
||||
pass
|
||||
elif isinstance(node, ast.ClassDef):
|
||||
if node.name not in self.files[rel_path]["classes"]:
|
||||
self.files[rel_path]["classes"][node.name] = {"methods": {}, "variables": {}}
|
||||
except Exception as e:
|
||||
print(f"Error collecting symbols from {path}: {e}", file=sys.stderr)
|
||||
|
||||
def analyze_files(self, dirs):
|
||||
for d in dirs:
|
||||
if not os.path.exists(d): continue
|
||||
for root, _, files in os.walk(d):
|
||||
for f in files:
|
||||
if f.endswith(".py"):
|
||||
self.analyze_file(os.path.join(root, f))
|
||||
|
||||
def analyze_file(self, path):
|
||||
self.current_file = self.get_rel_path(path)
|
||||
if self.current_file not in self.files:
|
||||
self.files[self.current_file] = {"functions": {}, "classes": {}}
|
||||
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8-sig") as file:
|
||||
tree = ast.parse(file.read(), filename=path)
|
||||
visitor = SDMVisitor(self)
|
||||
visitor.visit(tree)
|
||||
except Exception as e:
|
||||
print(f"Error analyzing {path}: {e}", file=sys.stderr)
|
||||
|
||||
class SDMVisitor(ast.NodeVisitor):
|
||||
def __init__(self, mapper):
|
||||
self.mapper = mapper
|
||||
self.current_class = None
|
||||
self.current_function = None
|
||||
|
||||
def visit_ClassDef(self, node):
|
||||
old_class = self.current_class
|
||||
self.current_class = node.name
|
||||
if self.current_class not in self.mapper.files[self.mapper.current_file]["classes"]:
|
||||
self.mapper.files[self.mapper.current_file]["classes"][self.current_class] = {"methods": {}, "variables": {}}
|
||||
self.generic_visit(node)
|
||||
self.current_class = old_class
|
||||
|
||||
def visit_FunctionDef(self, node):
|
||||
old_func = self.current_function
|
||||
self.current_function = node.name
|
||||
|
||||
full_name = f"{self.current_class}.{node.name}" if self.current_class else node.name
|
||||
if full_name not in self.mapper.functions_global:
|
||||
self.mapper.functions_global[full_name] = {
|
||||
"file": self.mapper.current_file,
|
||||
"class": self.current_class,
|
||||
"callers": set()
|
||||
}
|
||||
|
||||
self.generic_visit(node)
|
||||
self.current_function = old_func
|
||||
|
||||
def visit_AsyncFunctionDef(self, node):
|
||||
self.visit_FunctionDef(node)
|
||||
|
||||
def visit_Call(self, node):
|
||||
name = None
|
||||
if isinstance(node.func, ast.Name):
|
||||
name = node.func.id
|
||||
elif isinstance(node.func, ast.Attribute):
|
||||
name = node.func.attr
|
||||
|
||||
if name:
|
||||
# Try to find if it's a known function/method
|
||||
potential_matches = [n for n in self.mapper.functions_global if n == name or n.endswith("." + name)]
|
||||
for match in potential_matches:
|
||||
match_file = self.mapper.functions_global[match]["file"]
|
||||
# EXTERNAL FILTER: Only add caller if it's from a different file
|
||||
if match_file != self.mapper.current_file:
|
||||
caller_name = f"{self.current_class}.{self.current_function}" if self.current_class else (self.current_function or "module")
|
||||
# Include file name for external clarity
|
||||
self.mapper.functions_global[match]["callers"].add(f"{self.mapper.current_file}:{caller_name}")
|
||||
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_Attribute(self, node):
|
||||
if isinstance(node.value, ast.Name) and node.value.id == "self" and self.current_class:
|
||||
attr_name = node.attr
|
||||
class_data = self.mapper.files[self.mapper.current_file]["classes"][self.current_class]
|
||||
if attr_name not in class_data["variables"]:
|
||||
class_data["variables"][attr_name] = {"mutations": [], "usages": set()}
|
||||
|
||||
if isinstance(node.ctx, ast.Store):
|
||||
class_data["variables"][attr_name]["mutations"].append({
|
||||
"file": self.mapper.current_file,
|
||||
"line": node.lineno,
|
||||
"method": self.current_function
|
||||
})
|
||||
elif isinstance(node.ctx, ast.Load):
|
||||
class_data["variables"][attr_name]["usages"].add(self.mapper.current_file)
|
||||
self.generic_visit(node)
|
||||
|
||||
def main():
|
||||
target = "."
|
||||
if len(sys.argv) > 1:
|
||||
target = sys.argv[1]
|
||||
|
||||
mapper = SDMMapper()
|
||||
dirs = ["src", "simulation", "tests"]
|
||||
|
||||
if os.path.isfile(target):
|
||||
mapper.collect_symbols(dirs)
|
||||
mapper.analyze_file(target)
|
||||
else:
|
||||
search_dirs = [target] if target in dirs else dirs
|
||||
mapper.collect_symbols(search_dirs)
|
||||
mapper.analyze_files(search_dirs)
|
||||
|
||||
# Build the final grouped report
|
||||
report = {}
|
||||
|
||||
# 1. Add functions/methods
|
||||
for full_name, data in mapper.functions_global.items():
|
||||
f_path = data["file"]
|
||||
if f_path not in report: report[f_path] = {"functions": {}, "classes": {}}
|
||||
|
||||
# External callers only
|
||||
callers = sorted(list(data["callers"]))
|
||||
if not callers:
|
||||
continue
|
||||
|
||||
tag = f"[C: {', '.join(callers)}]"
|
||||
if data["class"]:
|
||||
c_name = data["class"]
|
||||
if c_name not in report[f_path]["classes"]:
|
||||
report[f_path]["classes"][c_name] = {"methods": {}, "variables": {}}
|
||||
m_name = full_name.split(".")[-1]
|
||||
report[f_path]["classes"][c_name]["methods"][m_name] = tag
|
||||
else:
|
||||
report[f_path]["functions"][full_name] = tag
|
||||
|
||||
# 2. Add class variables
|
||||
for f_path, f_data in mapper.files.items():
|
||||
if f_path not in report: continue
|
||||
for c_name, c_data in f_data["classes"].items():
|
||||
if c_name not in report[f_path]["classes"]:
|
||||
report[f_path]["classes"][c_name] = {"methods": {}, "variables": {}}
|
||||
|
||||
class_vars_summary = []
|
||||
for v_name, v_data in c_data["variables"].items():
|
||||
# EXTERNAL FILTER: Only include mutations/usages from different files
|
||||
ext_muts = [f"{m['file']}:{m['line']}, {m['method']}" for m in v_data["mutations"] if m['file'] != f_path]
|
||||
ext_usages = [u for u in v_data["usages"] if u != f_path]
|
||||
|
||||
if not ext_muts and not ext_usages:
|
||||
continue
|
||||
|
||||
m_tag = f"[M: {'; '.join(ext_muts or ['None'])}]"
|
||||
u_tag = f"[U: {', '.join(sorted(list(ext_usages or ['None'])))}]"
|
||||
tag = f"{m_tag} {u_tag}"
|
||||
report[f_path]["classes"][c_name]["variables"][v_name] = tag
|
||||
class_vars_summary.append(f"{v_name}: {tag}")
|
||||
|
||||
if class_vars_summary:
|
||||
report[f_path]["classes"][c_name]["class_tag"] = "\n".join(class_vars_summary)
|
||||
|
||||
print(json.dumps(report, indent=1))
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,20 +0,0 @@
|
||||
import os
|
||||
import glob
|
||||
|
||||
# Correcting the pattern definition to be syntactically valid
|
||||
pattern = "sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), \"..\")))"
|
||||
replacement = pattern + "\nsys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), \"..\", \"src\")))"
|
||||
|
||||
# Files to update
|
||||
files = glob.glob("tests/*.py") + glob.glob("simulation/*.py") + glob.glob("scripts/*.py")
|
||||
|
||||
for file_path in files:
|
||||
if not os.path.isfile(file_path):
|
||||
continue
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
if pattern in content and replacement not in content:
|
||||
print(f"Updating {file_path}")
|
||||
new_content = content.replace(pattern, replacement)
|
||||
with open(file_path, 'w', encoding='utf-8') as f:
|
||||
f.write(new_content)
|
||||
Reference in New Issue
Block a user