chore(conductor): Complete Comprehensive Path Mapping & Tooling
This commit is contained in:
+87
-1
@@ -30,7 +30,7 @@ Tool Categories:
|
||||
py_update_definition, py_get_signature, py_set_signature, py_get_class_summary,
|
||||
py_get_var_declaration, py_set_var_declaration
|
||||
- Analysis: get_file_summary, get_git_diff, py_find_usages, py_get_imports,
|
||||
py_check_syntax, py_get_hierarchy, py_get_docstring
|
||||
py_check_syntax, py_get_hierarchy, py_get_docstring, derive_code_path
|
||||
- Network: web_search, fetch_url
|
||||
- Runtime: get_ui_performance
|
||||
|
||||
@@ -947,6 +947,66 @@ def get_tree(path: str, max_depth: int = 2) -> str:
|
||||
return f"ERROR generating tree for '{path}': {e}"
|
||||
# ------------------------------------------------------------------ web tools
|
||||
|
||||
def derive_code_path(target: str, max_depth: int = 5) -> str:
|
||||
"""Recursively traces the execution path of a specific function or method."""
|
||||
from src.file_cache import ASTParser
|
||||
parser = ASTParser("python")
|
||||
found_path, found_code = None, None
|
||||
parts = target.split(".")
|
||||
symbol_name = parts[-1]
|
||||
if len(parts) > 1:
|
||||
possible_file = Path(*parts[:-1]).with_suffix(".py")
|
||||
if possible_file.exists(): found_path = str(possible_file)
|
||||
if not found_path:
|
||||
for root in ["src", "simulation"]:
|
||||
for p in Path(root).rglob("*.py"):
|
||||
if not _is_allowed(p): continue
|
||||
code = p.read_text(encoding="utf-8")
|
||||
if f"def {symbol_name}" in code or f"class {symbol_name}" in code:
|
||||
try:
|
||||
tree = ast.parse(code)
|
||||
if _get_symbol_node(tree, symbol_name):
|
||||
found_path, found_code = str(p), code
|
||||
break
|
||||
except Exception: continue
|
||||
if found_path: break
|
||||
if not found_path: return f"ERROR: could not find definition for '{target}'"
|
||||
if not found_code: found_code = Path(found_path).read_text(encoding="utf-8")
|
||||
visited, output = set(), [f"Code Path for: {target}", "=" * (11 + len(target)), ""]
|
||||
def trace(name, path, code, depth, indent):
|
||||
if depth > max_depth or (name, path) in visited: return
|
||||
visited.add((name, path))
|
||||
defn = parser.get_definition(code, name, path=path)
|
||||
if defn.startswith("ERROR:"):
|
||||
output.append(f"{indent}[!] {name} (Definition not found in {path})")
|
||||
return
|
||||
output.append(f"{indent}-> {name} ({path})")
|
||||
try:
|
||||
node = ast.parse(defn)
|
||||
calls = []
|
||||
for n in ast.walk(node):
|
||||
if isinstance(n, ast.Call):
|
||||
if isinstance(n.func, ast.Name): calls.append(n.func.id)
|
||||
elif isinstance(n.func, ast.Attribute): calls.append(n.func.attr)
|
||||
for call in sorted(set(calls)):
|
||||
if call in ("print", "len", "str", "int", "list", "dict", "set", "range", "enumerate", "isinstance", "getattr", "setattr", "hasattr"): continue
|
||||
c_path, c_code = None, None
|
||||
full_tree = ast.parse(code)
|
||||
if _get_symbol_node(full_tree, call): c_path, c_code = path, code
|
||||
else:
|
||||
for r in ["src", "simulation"]:
|
||||
for p in Path(r).rglob("*.py"):
|
||||
if not _is_allowed(p): continue
|
||||
f_code = p.read_text(encoding="utf-8")
|
||||
if f"def {call}" in f_code:
|
||||
c_path, c_code = str(p), f_code
|
||||
break
|
||||
if c_path: break
|
||||
if c_path: trace(call, c_path, c_code, depth + 1, indent + " ")
|
||||
except Exception as e: output.append(f"{indent} [!] Error parsing calls for {name}: {e}")
|
||||
trace(symbol_name, found_path, found_code, 0, "")
|
||||
return "\n".join(output)
|
||||
|
||||
class _DDGParser(HTMLParser):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
@@ -1283,6 +1343,11 @@ def dispatch(tool_name: str, tool_input: dict[str, Any]) -> str:
|
||||
return py_get_docstring(path, str(tool_input.get("name", "")))
|
||||
if tool_name == "get_tree":
|
||||
return get_tree(path, int(tool_input.get("max_depth", 2)))
|
||||
if tool_name == "derive_code_path":
|
||||
return derive_code_path(str(tool_input.get("target", "")), int(tool_input.get("max_depth", 5)))
|
||||
|
||||
if tool_name == "derive_code_path":
|
||||
return derive_code_path(str(tool_input.get("target", "")), int(tool_input.get("max_depth", 5)))
|
||||
|
||||
# Beads tools
|
||||
if tool_name.startswith("bd_"):
|
||||
@@ -2033,6 +2098,27 @@ MCP_TOOL_SPECS: list[dict[str, Any]] = [
|
||||
"type": "object",
|
||||
"properties": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "derive_code_path",
|
||||
"description": (
|
||||
"Recursively traces the execution path of a specific function or method across multiple files. "
|
||||
"Identifies call chains and data hand-offs to build an intensive technical map."
|
||||
),
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"target": {
|
||||
"type": "string",
|
||||
"description": "Fully qualified name of the target (e.g., 'src.ai_client.send') or class.method.",
|
||||
},
|
||||
"max_depth": {
|
||||
"type": "integer",
|
||||
"description": "Maximum recursion depth for the call graph (default 5).",
|
||||
},
|
||||
},
|
||||
"required": ["target"],
|
||||
},
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user