refactor(sdm): Refine SDM tags to 'External Only' and update core files. Pruned internal references to conserve tokens.

This commit is contained in:
2026-05-09 14:31:13 -04:00
parent e9ebcb859a
commit 696c08692e
5 changed files with 720 additions and 59 deletions
+173
View File
@@ -0,0 +1,173 @@
import ast
import json
import os
import pathlib
import sys
import re
from typing import List, Dict, Any, Optional, Tuple
def find_closing_quotes_pos(line: str) -> Tuple[int, str]:
pos_double = line.rfind('"""')
pos_single = line.rfind("'''")
if pos_double != -1 and pos_single != -1:
if pos_double > pos_single: return pos_double, '"""'
else: return pos_single, "'''"
elif pos_double != -1:
return pos_double, '"""'
elif pos_single != -1:
return pos_single, "'''"
return -1, ""
class SdmDocstringInjectorVisitor(ast.NodeVisitor):
def __init__(self, file_path: str, sdm_tags_map: Dict[str, Any], lines: List[str]):
self.file_path = file_path
self.sdm_tags_map = sdm_tags_map
self.lines = lines
self.targets_to_modify = []
self.current_class_name = None
self.project_root = pathlib.Path.cwd().resolve()
def get_rel_path(self, path):
p = pathlib.Path(path).resolve()
try:
return str(p.relative_to(self.project_root)).replace("\\", "/")
except (ValueError, RuntimeError):
return str(p).replace("\\", "/")
def _get_sdm_tags(self, name: str, node_type: str, parent_class_name: Optional[str] = None) -> List[str]:
relative_file_path = self.get_rel_path(self.file_path)
file_data = self.sdm_tags_map.get(relative_file_path)
if not file_data: return []
tags = []
if node_type == 'ClassDef':
class_data = file_data.get('classes', {}).get(name, {})
class_tag = class_data.get('class_tag')
if class_tag: tags.append(class_tag)
elif node_type in ('FunctionDef', 'AsyncFunctionDef'):
if parent_class_name:
class_data = file_data.get('classes', {}).get(parent_class_name, {})
tag = class_data.get('methods', {}).get(name)
if tag: tags.append(tag)
else:
tag = file_data.get('functions', {}).get(name)
if tag: tags.append(tag)
return tags
def _process_node(self, node, node_type: str):
if not node.body: return
sdm_tags = self._get_sdm_tags(node.name, node_type, self.current_class_name)
first_body_node = node.body[0]
if (node.lineno == first_body_node.lineno): return
docstring_node = None
if isinstance(node.body[0], ast.Expr) and \
isinstance(node.body[0].value, ast.Constant) and isinstance(node.body[0].value.value, str):
docstring_node = node.body[0].value
# Use col_offset of the first body node for exact matching
body_indent_count = first_body_node.col_offset
if docstring_node:
self.targets_to_modify.append({
'type': 'append', 'node': node, 'name': node.name, 'sdm_tags': sdm_tags,
'start_lineno': docstring_node.lineno, 'end_lineno': docstring_node.end_lineno,
'indent_count': body_indent_count, 'existing_doc': docstring_node.value
})
elif sdm_tags:
self.targets_to_modify.append({
'type': 'new', 'node': node, 'name': node.name, 'sdm_tags': sdm_tags,
'insert_lineno': first_body_node.lineno, 'indent_count': body_indent_count
})
def visit_ClassDef(self, node):
self._process_node(node, 'ClassDef')
old_class = self.current_class_name
self.current_class_name = node.name
self.generic_visit(node)
self.current_class_name = old_class
def visit_FunctionDef(self, node):
self._process_node(node, 'FunctionDef')
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node):
self._process_node(node, 'AsyncFunctionDef')
self.generic_visit(node)
def strip_tags(docstring: str) -> str:
lines = docstring.splitlines()
new_lines = []
for line in lines:
if re.search(r'\[C:.*\]|\[M:.*\]|\[U:.*\]|\[VARS:.*\]', line): continue
new_lines.append(line)
while new_lines and not new_lines[-1].strip(): new_lines.pop()
return "\n".join(new_lines)
def process_file(py_file_path: pathlib.Path, sdm_tags_map):
try:
with open(py_file_path, 'r', encoding='utf-8') as f: content = f.read()
lines = content.splitlines()
if not lines: return
try: tree = ast.parse(content)
except SyntaxError: return
visitor = SdmDocstringInjectorVisitor(str(py_file_path.resolve()), sdm_tags_map, lines)
visitor.visit(tree)
if not visitor.targets_to_modify: return
visitor.targets_to_modify.sort(key=lambda t: t['node'].lineno, reverse=True)
modified_lines = lines[:]
file_modified = False
for target in visitor.targets_to_modify:
sdm_tags = target['sdm_tags']
indent = " " * target['indent_count']
if target['type'] == 'append':
clean_doc = strip_tags(target['existing_doc'])
if sdm_tags:
prepared_tags = [f"{indent}{line}" for t in sdm_tags for line in t.splitlines()]
new_content = (clean_doc + "\n" + "\n".join(prepared_tags)) if clean_doc.strip() else "\n".join(prepared_tags)
else:
new_content = clean_doc
start_idx = target['start_lineno'] - 1
end_idx = target['end_lineno'] - 1
first_line, last_line = modified_lines[start_idx], modified_lines[end_idx]
q_start_pos = first_line.find('"""')
if q_start_pos == -1: q_start_pos = first_line.find("'''")
q_end_pos, q_type = find_closing_quotes_pos(last_line)
if q_start_pos != -1 and q_end_pos != -1:
q_prefix, q_suffix = first_line[:q_start_pos + 3], last_line[q_end_pos:]
if "\n" in new_content or (start_idx != end_idx):
replacement = [q_prefix] + [f"{indent}{l}" for l in new_content.splitlines()] + [f"{indent}{q_suffix}"]
else:
replacement = [f"{q_prefix}{new_content}{q_suffix}"]
modified_lines[start_idx:end_idx+1] = replacement
file_modified = True
elif sdm_tags:
prepared_tags = [f"{indent}{line}" for t in sdm_tags for line in t.splitlines()]
new_doc = [f'{indent}"""', "\n".join(prepared_tags), f'{indent}"""']
insert_idx = target['insert_lineno'] - 1
while insert_idx > 0 and not modified_lines[insert_idx-1].strip(): insert_idx -= 1
modified_lines[insert_idx:insert_idx] = new_doc
file_modified = True
if file_modified:
with open(py_file_path, 'w', encoding='utf-8') as f: f.write("\n".join(modified_lines))
except Exception as e: print(f"Error processing {py_file_path}: {e}", file=sys.stderr)
def main():
sdm_report_path = "sdm_report_refined.json"
if not pathlib.Path(sdm_report_path).exists():
print(f"Error: {sdm_report_path} not found.", file=sys.stderr); sys.exit(1)
with open(sdm_report_path, 'r', encoding='utf-8') as f: sdm_tags_map = json.load(f)
targets = sys.argv[1:]
if not targets:
for d in ["src", "simulation", "tests"]:
sd = pathlib.Path(d)
if sd.exists():
for f in sd.rglob("*.py"): process_file(f, sdm_tags_map)
else:
for t in targets:
tp = pathlib.Path(t)
if tp.is_file(): process_file(tp, sdm_tags_map)
elif tp.is_dir():
for f in tp.rglob("*.py"): process_file(f, sdm_tags_map)
if __name__ == "__main__":
main()
+206
View File
@@ -0,0 +1,206 @@
import ast
import os
import json
import sys
import pathlib
class SDMMapper:
def __init__(self):
self.files = {} # path -> {"functions": {}, "classes": {}}
self.functions_global = {} # name -> {"file": str, "class": str, "callers": set()}
self.current_file = ""
self.current_class = None
self.current_function = None
self.project_root = pathlib.Path.cwd().resolve()
def get_rel_path(self, path):
p = pathlib.Path(path).resolve()
try:
return str(p.relative_to(self.project_root)).replace("\\", "/")
except (ValueError, RuntimeError):
return str(p).replace("\\", "/")
def collect_symbols(self, dirs):
for d in dirs:
if not os.path.exists(d): continue
for root, _, files in os.walk(d):
for f in files:
if f.endswith(".py"):
path = os.path.join(root, f)
rel_path = self.get_rel_path(path)
try:
with open(path, "r", encoding="utf-8-sig") as file:
tree = ast.parse(file.read(), filename=path)
if rel_path not in self.files:
self.files[rel_path] = {"functions": {}, "classes": {}}
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
full_name = node.name
# In first pass, we just note definitions.
# Class-member identification happens in visit_ClassDef.
pass
elif isinstance(node, ast.ClassDef):
if node.name not in self.files[rel_path]["classes"]:
self.files[rel_path]["classes"][node.name] = {"methods": {}, "variables": {}}
except Exception as e:
print(f"Error collecting symbols from {path}: {e}", file=sys.stderr)
def analyze_files(self, dirs):
for d in dirs:
if not os.path.exists(d): continue
for root, _, files in os.walk(d):
for f in files:
if f.endswith(".py"):
self.analyze_file(os.path.join(root, f))
def analyze_file(self, path):
self.current_file = self.get_rel_path(path)
if self.current_file not in self.files:
self.files[self.current_file] = {"functions": {}, "classes": {}}
try:
with open(path, "r", encoding="utf-8-sig") as file:
tree = ast.parse(file.read(), filename=path)
visitor = SDMVisitor(self)
visitor.visit(tree)
except Exception as e:
print(f"Error analyzing {path}: {e}", file=sys.stderr)
class SDMVisitor(ast.NodeVisitor):
def __init__(self, mapper):
self.mapper = mapper
self.current_class = None
self.current_function = None
def visit_ClassDef(self, node):
old_class = self.current_class
self.current_class = node.name
if self.current_class not in self.mapper.files[self.mapper.current_file]["classes"]:
self.mapper.files[self.mapper.current_file]["classes"][self.current_class] = {"methods": {}, "variables": {}}
self.generic_visit(node)
self.current_class = old_class
def visit_FunctionDef(self, node):
old_func = self.current_function
self.current_function = node.name
full_name = f"{self.current_class}.{node.name}" if self.current_class else node.name
if full_name not in self.mapper.functions_global:
self.mapper.functions_global[full_name] = {
"file": self.mapper.current_file,
"class": self.current_class,
"callers": set()
}
self.generic_visit(node)
self.current_function = old_func
def visit_AsyncFunctionDef(self, node):
self.visit_FunctionDef(node)
def visit_Call(self, node):
name = None
if isinstance(node.func, ast.Name):
name = node.func.id
elif isinstance(node.func, ast.Attribute):
name = node.func.attr
if name:
# Try to find if it's a known function/method
potential_matches = [n for n in self.mapper.functions_global if n == name or n.endswith("." + name)]
for match in potential_matches:
match_file = self.mapper.functions_global[match]["file"]
# EXTERNAL FILTER: Only add caller if it's from a different file
if match_file != self.mapper.current_file:
caller_name = f"{self.current_class}.{self.current_function}" if self.current_class else (self.current_function or "module")
# Include file name for external clarity
self.mapper.functions_global[match]["callers"].add(f"{self.mapper.current_file}:{caller_name}")
self.generic_visit(node)
def visit_Attribute(self, node):
if isinstance(node.value, ast.Name) and node.value.id == "self" and self.current_class:
attr_name = node.attr
class_data = self.mapper.files[self.mapper.current_file]["classes"][self.current_class]
if attr_name not in class_data["variables"]:
class_data["variables"][attr_name] = {"mutations": [], "usages": set()}
if isinstance(node.ctx, ast.Store):
class_data["variables"][attr_name]["mutations"].append({
"file": self.mapper.current_file,
"line": node.lineno,
"method": self.current_function
})
elif isinstance(node.ctx, ast.Load):
class_data["variables"][attr_name]["usages"].add(self.mapper.current_file)
self.generic_visit(node)
def main():
target = "."
if len(sys.argv) > 1:
target = sys.argv[1]
mapper = SDMMapper()
dirs = ["src", "simulation", "tests"]
if os.path.isfile(target):
mapper.collect_symbols(dirs)
mapper.analyze_file(target)
else:
search_dirs = [target] if target in dirs else dirs
mapper.collect_symbols(search_dirs)
mapper.analyze_files(search_dirs)
# Build the final grouped report
report = {}
# 1. Add functions/methods
for full_name, data in mapper.functions_global.items():
f_path = data["file"]
if f_path not in report: report[f_path] = {"functions": {}, "classes": {}}
# External callers only
callers = sorted(list(data["callers"]))
if not callers:
continue
tag = f"[C: {', '.join(callers)}]"
if data["class"]:
c_name = data["class"]
if c_name not in report[f_path]["classes"]:
report[f_path]["classes"][c_name] = {"methods": {}, "variables": {}}
m_name = full_name.split(".")[-1]
report[f_path]["classes"][c_name]["methods"][m_name] = tag
else:
report[f_path]["functions"][full_name] = tag
# 2. Add class variables
for f_path, f_data in mapper.files.items():
if f_path not in report: continue
for c_name, c_data in f_data["classes"].items():
if c_name not in report[f_path]["classes"]:
report[f_path]["classes"][c_name] = {"methods": {}, "variables": {}}
class_vars_summary = []
for v_name, v_data in c_data["variables"].items():
# EXTERNAL FILTER: Only include mutations/usages from different files
ext_muts = [f"{m['file']}:{m['line']}, {m['method']}" for m in v_data["mutations"] if m['file'] != f_path]
ext_usages = [u for u in v_data["usages"] if u != f_path]
if not ext_muts and not ext_usages:
continue
m_tag = f"[M: {'; '.join(ext_muts or ['None'])}]"
u_tag = f"[U: {', '.join(sorted(list(ext_usages or ['None'])))}]"
tag = f"{m_tag} {u_tag}"
report[f_path]["classes"][c_name]["variables"][v_name] = tag
class_vars_summary.append(f"{v_name}: {tag}")
if class_vars_summary:
report[f_path]["classes"][c_name]["class_tag"] = "\n".join(class_vars_summary)
print(json.dumps(report, indent=1))
if __name__ == "__main__":
main()