feat(mma): Implement tiered context scoping and add get_definition tool

This commit is contained in:
2026-02-27 22:16:43 -05:00
parent a84ea40d16
commit 6aa642bc42
4 changed files with 196 additions and 1 deletions

Binary file not shown.

View File

@@ -187,6 +187,87 @@ def build_discussion_text(history: list[str]) -> str:
return "## Discussion History\n\n" + build_discussion_section(history)
def build_tier1_context(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str]) -> str:
"""
Tier 1 Context: Strategic/Orchestration.
Full content for core conductor files, summaries for others.
"""
core_files = {"product.md", "tech-stack.md", "workflow.md", "tracks.md"}
parts = []
# Files section
if file_items:
sections = []
for item in file_items:
path = item.get("path")
name = path.name if path else ""
if name in core_files:
# Include in full
sections.append("### `" + (item.get("entry") or str(path)) + "`\n\n" +
f"```{path.suffix.lstrip('.') if path.suffix else 'text'}\n{item.get('content', '')}\n```")
else:
# Summarize
sections.append(summarize.summarise_file(path, item.get("content", "")))
parts.append("## Files (Tier 1 - Mixed)\n\n" + "\n\n---\n\n".join(sections))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_tier2_context(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str]) -> str:
"""
Tier 2 Context: Architectural/Tech Lead.
Full content for all files (standard behavior).
"""
return build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history, summary_only=False)
def build_tier3_context(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str], focus_files: list[str]) -> str:
"""
Tier 3 Context: Execution/Worker.
Full content for focus_files, summaries for others.
"""
parts = []
if file_items:
sections = []
for item in file_items:
path = item.get("path")
entry = item.get("entry", "")
path_str = str(path) if path else ""
# Check if this file is in focus_files (by name or path)
is_focus = False
for focus in focus_files:
if focus == entry or (path and focus == path.name) or focus in path_str:
is_focus = True
break
if is_focus:
sections.append("### `" + (entry or path_str) + "`\n\n" +
f"```{path.suffix.lstrip('.') if path and path.suffix else 'text'}\n{item.get('content', '')}\n```")
else:
sections.append(summarize.summarise_file(path, item.get("content", "")))
parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_markdown(base_dir: Path, files: list[str], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str:
parts = []
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits

View File

@@ -281,6 +281,60 @@ def get_code_outline(path: str) -> str:
return f"ERROR generating outline for '{path}': {e}"
def get_definition(path: str, name: str) -> str:
"""
Returns the source code for a specific class, function, or method definition.
path: Path to the code file.
name: Name of the definition to retrieve (e.g., 'MyClass', 'my_function', 'MyClass.my_method').
"""
p, err = _resolve_and_check(path)
if err:
return err
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
if p.suffix != ".py":
return f"ERROR: get_definition currently only supports .py files (unsupported: {p.suffix})"
try:
import ast
code = p.read_text(encoding="utf-8")
lines = code.splitlines()
tree = ast.parse(code)
# Split name for methods (e.g., "MyClass.my_method")
parts = name.split(".")
target_class = parts[0] if len(parts) > 1 else None
target_name = parts[-1]
def get_source_from_node(node):
# In Python 3.8+, ast.get_source_segment is available
# But we can also use lineno and end_lineno
if hasattr(node, "lineno") and hasattr(node, "end_lineno"):
# lineno is 1-indexed
start = node.lineno - 1
end = node.end_lineno
return "\n".join(lines[start:end])
return f"ERROR: Could not extract source for node {node}"
for node in ast.walk(tree):
if target_class:
if isinstance(node, ast.ClassDef) and node.name == target_class:
for body_node in node.body:
if isinstance(body_node, (ast.FunctionDef, ast.AsyncFunctionDef)) and body_node.name == target_name:
return get_source_from_node(body_node)
else:
if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef)) and node.name == target_name:
return get_source_from_node(node)
return f"ERROR: could not find definition '{name}' in {path}"
except Exception as e:
return f"ERROR retrieving definition '{name}' from '{path}': {e}"
def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str:
"""
Returns the git diff for a file or directory.
@@ -436,7 +490,7 @@ def get_ui_performance() -> str:
# ------------------------------------------------------------------ tool dispatch
TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary", "get_python_skeleton", "get_code_outline", "get_git_diff", "web_search", "fetch_url", "get_ui_performance"}
TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary", "get_python_skeleton", "get_code_outline", "get_definition", "get_git_diff", "web_search", "fetch_url", "get_ui_performance"}
def dispatch(tool_name: str, tool_input: dict) -> str:
@@ -455,6 +509,8 @@ def dispatch(tool_name: str, tool_input: dict) -> str:
return get_python_skeleton(tool_input.get("path", ""))
if tool_name == "get_code_outline":
return get_code_outline(tool_input.get("path", ""))
if tool_name == "get_definition":
return get_definition(tool_input.get("path", ""), tool_input.get("name", ""))
if tool_name == "get_git_diff":
return get_git_diff(
tool_input.get("path", ""),
@@ -586,6 +642,27 @@ MCP_TOOL_SPECS = [
"required": ["path"],
},
},
{
"name": "get_definition",
"description": (
"Get the full source code of a specific class, function, or method definition. "
"This is more efficient than reading the whole file if you know what you're looking for."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the .py file.",
},
"name": {
"type": "string",
"description": "The name of the class or function to retrieve. Use 'ClassName.method_name' for methods.",
}
},
"required": ["path", "name"],
},
},
{
"name": "get_git_diff",
"description": (
@@ -648,3 +725,4 @@ MCP_TOOL_SPECS = [
}
}
]

View File

@@ -0,0 +1,36 @@
import pytest
from pathlib import Path
from aggregate import build_tier1_context, build_tier2_context, build_tier3_context
def test_build_tier1_context_exists():
# This should fail if the function is not defined
file_items = [
{"path": Path("conductor/product.md"), "entry": "conductor/product.md", "content": "Product content", "error": False},
{"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False}
]
history = ["User: hello", "AI: hi"]
result = build_tier1_context(file_items, Path("."), [], history)
assert "Product content" in result
# other.py should be summarized, not full content in a code block
assert "Other content" not in result or "Summarized" in result # Assuming summary format
def test_build_tier2_context_exists():
file_items = [
{"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False}
]
history = ["User: hello"]
result = build_tier2_context(file_items, Path("."), [], history)
assert "Other content" in result
def test_build_tier3_context_exists():
file_items = [
{"path": Path("focus.py"), "entry": "focus.py", "content": "Focus content", "error": False},
{"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False}
]
history = ["User: hello"]
result = build_tier3_context(file_items, Path("."), [], history, focus_files=["focus.py"])
assert "Focus content" in result
assert "Other content" not in result