Compare commits

..

18 Commits

Author SHA1 Message Date
Ed_
a97eb2a222 chore(conductor): Mark track 'Tiered Context Scoping & HITL Approval' as complete 2026-02-27 22:32:07 -05:00
Ed_
913cfee2dd docs(conductor): Synchronize docs for track 'Tiered Context Scoping & HITL Approval' 2026-02-27 22:31:58 -05:00
Ed_
3c7d4cd841 conductor(plan): Finalize plan for track 'Tiered Context Scoping & HITL Approval' 2026-02-27 22:31:39 -05:00
Ed_
a6c627a6b5 conductor(plan): Mark phase 'Phase 3: Approval UX Modal' as complete 2026-02-27 22:31:11 -05:00
Ed_
21157f92c3 feat(mma): Finalize Approval UX Modal in GUI 2026-02-27 22:30:55 -05:00
Ed_
bee75e7b4d conductor(plan): Mark task 'Interception logic' as complete 2026-02-27 22:30:13 -05:00
Ed_
4c53ca11da feat(mma): Implement interception logic in GUI and Conductor 2026-02-27 22:29:55 -05:00
Ed_
1017a4d807 conductor(plan): Mark task 'Signaling mechanism' as complete 2026-02-27 22:27:19 -05:00
Ed_
e293c5e302 feat(mma): Implement spawn interception in multi_agent_conductor.py 2026-02-27 22:27:05 -05:00
Ed_
c2c8732100 conductor(plan): Mark phase 'Phase 1: Context Subsetting' as complete 2026-02-27 22:24:11 -05:00
Ed_
d7a24d66ae conductor(checkpoint): Checkpoint end of Phase 1 (Context Subsetting) 2026-02-27 22:23:57 -05:00
Ed_
528aaf1957 feat(mma): Finalize Phase 1 with AST-based outline and improved tiered selection 2026-02-27 22:23:50 -05:00
Ed_
f59ef247cf conductor(plan): Mark task 'Update project state' as complete 2026-02-27 22:23:31 -05:00
Ed_
2ece9e1141 feat(aggregate): support dictionary-based file entries with optional tiers 2026-02-27 22:21:18 -05:00
Ed_
4c744f2c8e conductor(plan): Mark task 'Integrate AST skeleton' as complete 2026-02-27 22:18:39 -05:00
Ed_
0ed01aa1c9 feat(mma): Integrate AST skeleton extraction into Tier 3 context build 2026-02-27 22:18:26 -05:00
Ed_
34bd61aa6c conductor(plan): Mark task 'Refactor aggregate.py' as complete 2026-02-27 22:16:55 -05:00
Ed_
6aa642bc42 feat(mma): Implement tiered context scoping and add get_definition tool 2026-02-27 22:16:43 -05:00
12 changed files with 679 additions and 101 deletions

Binary file not shown.

View File

@@ -17,6 +17,7 @@ import glob
from pathlib import Path, PureWindowsPath from pathlib import Path, PureWindowsPath
import summarize import summarize
import project_manager import project_manager
from file_cache import ASTParser
def find_next_increment(output_dir: Path, namespace: str) -> int: def find_next_increment(output_dir: Path, namespace: str) -> int:
pattern = re.compile(rf"^{re.escape(namespace)}_(\d+)\.md$") pattern = re.compile(rf"^{re.escape(namespace)}_(\d+)\.md$")
@@ -63,9 +64,14 @@ def build_discussion_section(history: list[str]) -> str:
sections.append(f"### Discussion Excerpt {i}\n\n{paste.strip()}") sections.append(f"### Discussion Excerpt {i}\n\n{paste.strip()}")
return "\n\n---\n\n".join(sections) return "\n\n---\n\n".join(sections)
def build_files_section(base_dir: Path, files: list[str]) -> str: def build_files_section(base_dir: Path, files: list[str | dict]) -> str:
sections = [] sections = []
for entry in files: for entry_raw in files:
if isinstance(entry_raw, dict):
entry = entry_raw.get("path")
else:
entry = entry_raw
paths = resolve_paths(base_dir, entry) paths = resolve_paths(base_dir, entry)
if not paths: if not paths:
sections.append(f"### `{entry}`\n\n```text\nERROR: no files matched: {entry}\n```") sections.append(f"### `{entry}`\n\n```text\nERROR: no files matched: {entry}\n```")
@@ -99,7 +105,7 @@ def build_screenshots_section(base_dir: Path, screenshots: list[str]) -> str:
return "\n\n---\n\n".join(sections) return "\n\n---\n\n".join(sections)
def build_file_items(base_dir: Path, files: list[str]) -> list[dict]: def build_file_items(base_dir: Path, files: list[str | dict]) -> list[dict]:
""" """
Return a list of dicts describing each file, for use by ai_client when it Return a list of dicts describing each file, for use by ai_client when it
wants to upload individual files rather than inline everything as markdown. wants to upload individual files rather than inline everything as markdown.
@@ -110,12 +116,20 @@ def build_file_items(base_dir: Path, files: list[str]) -> list[dict]:
content : str (file text, or error string) content : str (file text, or error string)
error : bool error : bool
mtime : float (last modification time, for skip-if-unchanged optimization) mtime : float (last modification time, for skip-if-unchanged optimization)
tier : int | None (optional tier for context management)
""" """
items = [] items = []
for entry in files: for entry_raw in files:
if isinstance(entry_raw, dict):
entry = entry_raw.get("path")
tier = entry_raw.get("tier")
else:
entry = entry_raw
tier = None
paths = resolve_paths(base_dir, entry) paths = resolve_paths(base_dir, entry)
if not paths: if not paths:
items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0}) items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0, "tier": tier})
continue continue
for path in paths: for path in paths:
try: try:
@@ -130,10 +144,10 @@ def build_file_items(base_dir: Path, files: list[str]) -> list[dict]:
content = f"ERROR: {e}" content = f"ERROR: {e}"
mtime = 0.0 mtime = 0.0
error = True error = True
items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime}) items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime, "tier": tier})
return items return items
def build_summary_section(base_dir: Path, files: list[str]) -> str: def build_summary_section(base_dir: Path, files: list[str | dict]) -> str:
""" """
Build a compact summary section using summarize.py — one short block per file. Build a compact summary section using summarize.py — one short block per file.
Used as the initial <context> block instead of full file contents. Used as the initial <context> block instead of full file contents.
@@ -187,7 +201,99 @@ def build_discussion_text(history: list[str]) -> str:
return "## Discussion History\n\n" + build_discussion_section(history) return "## Discussion History\n\n" + build_discussion_section(history)
def build_markdown(base_dir: Path, files: list[str], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str: def build_tier1_context(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str]) -> str:
"""
Tier 1 Context: Strategic/Orchestration.
Full content for core conductor files and files with tier=1, summaries for others.
"""
core_files = {"product.md", "tech-stack.md", "workflow.md", "tracks.md"}
parts = []
# Files section
if file_items:
sections = []
for item in file_items:
path = item.get("path")
name = path.name if path else ""
if name in core_files or item.get("tier") == 1:
# Include in full
sections.append("### `" + (item.get("entry") or str(path)) + "`\n\n" +
f"```{path.suffix.lstrip('.') if path.suffix else 'text'}\n{item.get('content', '')}\n```")
else:
# Summarize
sections.append("### `" + (item.get("entry") or str(path)) + "`\n\n" +
summarize.summarise_file(path, item.get("content", "")))
parts.append("## Files (Tier 1 - Mixed)\n\n" + "\n\n---\n\n".join(sections))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_tier2_context(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str]) -> str:
"""
Tier 2 Context: Architectural/Tech Lead.
Full content for all files (standard behavior).
"""
return build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history, summary_only=False)
def build_tier3_context(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str], focus_files: list[str]) -> str:
"""
Tier 3 Context: Execution/Worker.
Full content for focus_files and files with tier=3, summaries/skeletons for others.
"""
parts = []
if file_items:
sections = []
for item in file_items:
path = item.get("path")
entry = item.get("entry", "")
path_str = str(path) if path else ""
# Check if this file is in focus_files (by name or path)
is_focus = False
for focus in focus_files:
if focus == entry or (path and focus == path.name) or focus in path_str:
is_focus = True
break
if is_focus or item.get("tier") == 3:
sections.append("### `" + (entry or path_str) + "`\n\n" +
f"```{path.suffix.lstrip('.') if path and path.suffix else 'text'}\n{item.get('content', '')}\n```")
else:
content = item.get("content", "")
if path and path.suffix == ".py" and not item.get("error"):
try:
parser = ASTParser("python")
skeleton = parser.get_skeleton(content)
sections.append(f"### `{entry or path_str}` (AST Skeleton)\n\n```python\n{skeleton}\n```")
except Exception as e:
# Fallback to summary if AST parsing fails
sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content))
else:
sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content))
parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_markdown(base_dir: Path, files: list[str | dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str:
parts = [] parts = []
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits # STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits
if files: if files:

View File

@@ -20,6 +20,8 @@ To serve as an expert-level utility for personal developer use on small projects
- **Native DAG Execution Engine:** Employs a Python-based Directed Acyclic Graph (DAG) engine to manage complex task dependencies, supporting automated topological sorting and robust cycle detection. - **Native DAG Execution Engine:** Employs a Python-based Directed Acyclic Graph (DAG) engine to manage complex task dependencies, supporting automated topological sorting and robust cycle detection.
- **Programmable Execution State Machine:** Governing the transition between "Auto-Queue" (autonomous worker spawning) and "Step Mode" (explicit manual approval for each task transition). - **Programmable Execution State Machine:** Governing the transition between "Auto-Queue" (autonomous worker spawning) and "Step Mode" (explicit manual approval for each task transition).
- **Role-Scoped Documentation:** Automated mapping of foundational documents to specific tiers to prevent token bloat and maintain high-signal context. - **Role-Scoped Documentation:** Automated mapping of foundational documents to specific tiers to prevent token bloat and maintain high-signal context.
- **Tiered Context Scoping:** Employs optimized context subsets for each tier. Tiers 1 & 2 receive strategic documents and full history, while Tier 3/4 workers receive task-specific "Focus Files" and automated AST dependency skeletons.
- **Worker Spawn Interceptor:** A mandatory security gate that intercepts every sub-agent launch. Provides a GUI modal allowing the user to review, modify, or reject the worker's prompt and file context before it is sent to the API.
- **Strict Memory Siloing:** Employs tree-sitter AST-based interface extraction (Skeleton View, Curated View) and "Context Amnesia" to provide workers only with the absolute minimum context required, preventing hallucination loops. - **Strict Memory Siloing:** Employs tree-sitter AST-based interface extraction (Skeleton View, Curated View) and "Context Amnesia" to provide workers only with the absolute minimum context required, preventing hallucination loops.
- **Explicit Execution Control:** All AI-generated PowerShell scripts require explicit human confirmation via interactive UI dialogs before execution, supported by a global "Linear Execution Clutch" for deterministic debugging. - **Explicit Execution Control:** All AI-generated PowerShell scripts require explicit human confirmation via interactive UI dialogs before execution, supported by a global "Linear Execution Clutch" for deterministic debugging.
- **Asynchronous Event-Driven Architecture:** Uses an `AsyncEventQueue` to link GUI actions to the backend engine, ensuring the interface remains fully responsive during multi-model generation and parallel worker execution. - **Asynchronous Event-Driven Architecture:** Uses an `AsyncEventQueue` to link GUI actions to the backend engine, ensuring the interface remains fully responsive during multi-model generation and parallel worker execution.

View File

@@ -20,9 +20,10 @@ This file tracks all major tracks for the project. Each track has its own detail
--- ---
- [~] **Track: Tiered Context Scoping & HITL Approval** - [x] **Track: Tiered Context Scoping & HITL Approval**
*Link: [./tracks/tiered_context_scoping_hitl_approval/](./tracks/tiered_context_scoping_hitl_approval/)* *Link: [./tracks/tiered_context_scoping_hitl_approval/](./tracks/tiered_context_scoping_hitl_approval/)*
--- ---
- [ ] **Track: MMA Dashboard Visualization Overhaul** - [ ] **Track: MMA Dashboard Visualization Overhaul**

View File

@@ -1,15 +1,15 @@
# Implementation Plan: Tiered Context Scoping & HITL Approval # Implementation Plan: Tiered Context Scoping & HITL Approval
## Phase 1: Context Subsetting ## Phase 1: Context Subsetting [checkpoint: d7a24d6]
- [~] Task: Refactor `aggregate.py` to support targeted context builds (e.g., `build_tier1_context`, `build_tier3_context`). - [x] Task: Refactor `aggregate.py` to support targeted context builds (e.g., `build_tier1_context`, `build_tier3_context`). 6aa642b
- [ ] Task: Integrate AST skeleton extraction into the standard Tier 3 context build. - [x] Task: Integrate AST skeleton extraction into the standard Tier 3 context build. 0ed01aa
- [ ] Task: Update the project state to track which files are assigned to which tier. - [x] Task: Update the project state to track which files are assigned to which tier. 2ece9e1
## Phase 2: The Spawn Interceptor ## Phase 2: The Spawn Interceptor [checkpoint: 4c53ca1]
- [ ] Task: Create a signaling mechanism in `multi_agent_conductor.py` to emit a "Worker Spawn Requested" event. - [x] Task: Create a signaling mechanism in `multi_agent_conductor.py` to emit a "Worker Spawn Requested" event. e293c5e
- [ ] Task: Implement the interception logic that pauses the async dispatcher until a signal is received from the GUI. - [x] Task: Implement the interception logic that pauses the async dispatcher until a signal is received from the GUI. 4c53ca1
## Phase 3: Approval UX Modal ## Phase 3: Approval UX Modal [checkpoint: 21157f9]
- [ ] Task: Design the "Approve Worker Spawn" modal in DearPyGui. - [x] Task: Design the "Approve Worker Spawn" modal in DearPyGui. 21157f9
- [ ] Task: Populate the modal with the target role, the exact prompt, and a read-only view of the specific file context. - [x] Task: Populate the modal with the target role, the exact prompt, and a read-only view of the specific file context. 21157f9
- [ ] Task: Wire the "Approve", "Modify", and "Reject" buttons to resume or cancel the intercepted spawn. - [x] Task: Wire the "Approve", "Modify", and "Reject" buttons to resume or cancel the intercepted spawn. 21157f9

111
gui_2.py
View File

@@ -131,6 +131,29 @@ class MMAApprovalDialog:
return self._approved, self._payload return self._approved, self._payload
class MMASpawnApprovalDialog:
def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str):
self._ticket_id = ticket_id
self._role = role
self._prompt = prompt
self._context_md = context_md
self._condition = threading.Condition()
self._done = False
self._approved = False
self._abort = False
def wait(self) -> dict:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1)
return {
'approved': self._approved,
'abort': self._abort,
'prompt': self._prompt,
'context_md': self._context_md
}
class App: class App:
"""The main ImGui interface orchestrator for Manual Slop.""" """The main ImGui interface orchestrator for Manual Slop."""
@@ -246,6 +269,13 @@ class App:
self._mma_approval_edit_mode = False self._mma_approval_edit_mode = False
self._mma_approval_payload = "" self._mma_approval_payload = ""
# MMA Spawn approval state
self._pending_mma_spawn = None
self._mma_spawn_open = False
self._mma_spawn_edit_mode = False
self._mma_spawn_prompt = ''
self._mma_spawn_context = ''
# Orchestration State # Orchestration State
self.ui_epic_input = "" self.ui_epic_input = ""
self.proposed_tracks: list[dict] = [] self.proposed_tracks: list[dict] = []
@@ -989,6 +1019,21 @@ class App:
if "dialog_container" in task: if "dialog_container" in task:
task["dialog_container"][0] = dlg task["dialog_container"][0] = dlg
elif action == "mma_spawn_approval":
dlg = MMASpawnApprovalDialog(
task.get("ticket_id"),
task.get("role"),
task.get("prompt"),
task.get("context_md")
)
self._pending_mma_spawn = task
self._mma_spawn_prompt = task.get("prompt", "")
self._mma_spawn_context = task.get("context_md", "")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
if "dialog_container" in task:
task["dialog_container"][0] = dlg
except Exception as e: except Exception as e:
print(f"Error executing GUI task: {e}") print(f"Error executing GUI task: {e}")
@@ -1020,7 +1065,7 @@ class App:
else: else:
print("[DEBUG] No pending dialog to reject") print("[DEBUG] No pending dialog to reject")
def _handle_mma_respond(self, approved: bool, payload: str = None): def _handle_mma_respond(self, approved: bool, payload: str = None, abort: bool = False, prompt: str = None, context_md: str = None):
if self._pending_mma_approval: if self._pending_mma_approval:
dlg = self._pending_mma_approval.get("dialog_container", [None])[0] dlg = self._pending_mma_approval.get("dialog_container", [None])[0]
if dlg: if dlg:
@@ -1032,6 +1077,20 @@ class App:
dlg._condition.notify_all() dlg._condition.notify_all()
self._pending_mma_approval = None self._pending_mma_approval = None
if self._pending_mma_spawn:
dlg = self._pending_mma_spawn.get("dialog_container", [None])[0]
if dlg:
with dlg._condition:
dlg._approved = approved
dlg._abort = abort
if prompt is not None:
dlg._prompt = prompt
if context_md is not None:
dlg._context_md = context_md
dlg._done = True
dlg._condition.notify_all()
self._pending_mma_spawn = None
def _handle_approve_ask(self): def _handle_approve_ask(self):
"""Responds with approval for a pending /api/ask request.""" """Responds with approval for a pending /api/ask request."""
if not self._ask_request_id: return if not self._ask_request_id: return
@@ -1821,6 +1880,56 @@ class App:
imgui.close_current_popup() imgui.close_current_popup()
imgui.end_popup() imgui.end_popup()
# MMA Spawn Approval Modal
if self._pending_mma_spawn:
if not self._mma_spawn_open:
imgui.open_popup("MMA Spawn Approval")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
self._mma_spawn_prompt = self._pending_mma_spawn.get("prompt", "")
self._mma_spawn_context = self._pending_mma_spawn.get("context_md", "")
else:
self._mma_spawn_open = False
if imgui.begin_popup_modal("MMA Spawn Approval", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_mma_spawn:
imgui.close_current_popup()
else:
role = self._pending_mma_spawn.get("role", "??")
ticket_id = self._pending_mma_spawn.get("ticket_id", "??")
imgui.text(f"Spawning {role} for Ticket {ticket_id}")
imgui.separator()
if self._mma_spawn_edit_mode:
imgui.text("Edit Prompt:")
_, self._mma_spawn_prompt = imgui.input_text_multiline("##spawn_prompt", self._mma_spawn_prompt, imgui.ImVec2(800, 200))
imgui.text("Edit Context MD:")
_, self._mma_spawn_context = imgui.input_text_multiline("##spawn_context", self._mma_spawn_context, imgui.ImVec2(800, 300))
else:
imgui.text("Proposed Prompt:")
imgui.begin_child("spawn_prompt_preview", imgui.ImVec2(800, 150), True)
imgui.text_unformatted(self._mma_spawn_prompt)
imgui.end_child()
imgui.text("Proposed Context MD:")
imgui.begin_child("spawn_context_preview", imgui.ImVec2(800, 250), True)
imgui.text_unformatted(self._mma_spawn_context)
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=True, prompt=self._mma_spawn_prompt, context_md=self._mma_spawn_context)
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Edit Mode" if not self._mma_spawn_edit_mode else "Preview Mode", imgui.ImVec2(120, 0)):
self._mma_spawn_edit_mode = not self._mma_spawn_edit_mode
imgui.same_line()
if imgui.button("Abort", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=False, abort=True)
imgui.close_current_popup()
imgui.end_popup()
if self.show_script_output: if self.show_script_output:
if self._trigger_script_blink: if self._trigger_script_blink:
self._trigger_script_blink = False self._trigger_script_blink = False

View File

@@ -281,6 +281,60 @@ def get_code_outline(path: str) -> str:
return f"ERROR generating outline for '{path}': {e}" return f"ERROR generating outline for '{path}': {e}"
def get_definition(path: str, name: str) -> str:
"""
Returns the source code for a specific class, function, or method definition.
path: Path to the code file.
name: Name of the definition to retrieve (e.g., 'MyClass', 'my_function', 'MyClass.my_method').
"""
p, err = _resolve_and_check(path)
if err:
return err
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
if p.suffix != ".py":
return f"ERROR: get_definition currently only supports .py files (unsupported: {p.suffix})"
try:
import ast
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
lines = code.splitlines()
tree = ast.parse(code)
# Split name for methods (e.g., "MyClass.my_method")
parts = name.split(".")
target_class = parts[0] if len(parts) > 1 else None
target_name = parts[-1]
def get_source_from_node(node):
# In Python 3.8+, ast.get_source_segment is available
# But we can also use lineno and end_lineno
if hasattr(node, "lineno") and hasattr(node, "end_lineno"):
# lineno is 1-indexed
start = node.lineno - 1
end = node.end_lineno
return "\n".join(lines[start:end])
return f"ERROR: Could not extract source for node {node}"
for node in ast.walk(tree):
if target_class:
if isinstance(node, ast.ClassDef) and node.name == target_class:
for body_node in node.body:
if isinstance(body_node, (ast.FunctionDef, ast.AsyncFunctionDef)) and body_node.name == target_name:
return get_source_from_node(body_node)
else:
if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef)) and node.name == target_name:
return get_source_from_node(node)
return f"ERROR: could not find definition '{name}' in {path}"
except Exception as e:
return f"ERROR retrieving definition '{name}' from '{path}': {e}"
def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str: def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str:
""" """
Returns the git diff for a file or directory. Returns the git diff for a file or directory.
@@ -436,7 +490,7 @@ def get_ui_performance() -> str:
# ------------------------------------------------------------------ tool dispatch # ------------------------------------------------------------------ tool dispatch
TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary", "get_python_skeleton", "get_code_outline", "get_git_diff", "web_search", "fetch_url", "get_ui_performance"} TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary", "get_python_skeleton", "get_code_outline", "get_definition", "get_git_diff", "web_search", "fetch_url", "get_ui_performance"}
def dispatch(tool_name: str, tool_input: dict) -> str: def dispatch(tool_name: str, tool_input: dict) -> str:
@@ -455,6 +509,8 @@ def dispatch(tool_name: str, tool_input: dict) -> str:
return get_python_skeleton(tool_input.get("path", "")) return get_python_skeleton(tool_input.get("path", ""))
if tool_name == "get_code_outline": if tool_name == "get_code_outline":
return get_code_outline(tool_input.get("path", "")) return get_code_outline(tool_input.get("path", ""))
if tool_name == "get_definition":
return get_definition(tool_input.get("path", ""), tool_input.get("name", ""))
if tool_name == "get_git_diff": if tool_name == "get_git_diff":
return get_git_diff( return get_git_diff(
tool_input.get("path", ""), tool_input.get("path", ""),
@@ -586,6 +642,27 @@ MCP_TOOL_SPECS = [
"required": ["path"], "required": ["path"],
}, },
}, },
{
"name": "get_definition",
"description": (
"Get the full source code of a specific class, function, or method definition. "
"This is more efficient than reading the whole file if you know what you're looking for."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the .py file.",
},
"name": {
"type": "string",
"description": "The name of the class or function to retrieve. Use 'ClassName.method_name' for methods.",
}
},
"required": ["path", "name"],
},
},
{ {
"name": "get_git_diff", "name": "get_git_diff",
"description": ( "description": (
@@ -648,3 +725,4 @@ MCP_TOOL_SPECS = [
} }
} }
] ]

View File

@@ -1,11 +1,12 @@
import ai_client import ai_client
import json import json
import asyncio import asyncio
from typing import List, Optional from typing import List, Optional, Tuple
from dataclasses import asdict from dataclasses import asdict
import events import events
from models import Ticket, Track, WorkerContext from models import Ticket, Track, WorkerContext
from file_cache import ASTParser from file_cache import ASTParser
from pathlib import Path
from dag_engine import TrackDAG, ExecutionEngine from dag_engine import TrackDAG, ExecutionEngine
@@ -181,6 +182,62 @@ def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_
return False return False
def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> Tuple[bool, str, str]:
"""
Pushes a spawn approval request to the GUI and waits for response.
Returns (approved, modified_prompt, modified_context)
"""
import threading
import time
import asyncio
dialog_container = [None]
task = {
"action": "mma_spawn_approval",
"ticket_id": ticket_id,
"role": role,
"prompt": prompt,
"context_md": context_md,
"dialog_container": dialog_container
}
# Push to queue
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.run_coroutine_threadsafe(event_queue.put("mma_spawn_approval", task), loop)
else:
event_queue._queue.put_nowait(("mma_spawn_approval", task))
except Exception:
# Fallback if no loop
event_queue._queue.put_nowait(("mma_spawn_approval", task))
# Wait for the GUI to create the dialog and for the user to respond
start = time.time()
while dialog_container[0] is None and time.time() - start < 60:
time.sleep(0.1)
if dialog_container[0]:
res = dialog_container[0].wait()
if isinstance(res, dict):
approved = res.get("approved", False)
abort = res.get("abort", False)
modified_prompt = res.get("prompt", prompt)
modified_context = res.get("context_md", context_md)
return approved and not abort, modified_prompt, modified_context
else:
# Fallback for old tuple style if any
approved, final_payload = res
modified_prompt = prompt
modified_context = context_md
if isinstance(final_payload, dict):
modified_prompt = final_payload.get("prompt", prompt)
modified_context = final_payload.get("context_md", context_md)
return approved, modified_prompt, modified_context
return False, prompt, context_md
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None, event_queue: events.AsyncEventQueue = None, engine: Optional['ConductorEngine'] = None, md_content: str = ""): def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None, event_queue: events.AsyncEventQueue = None, engine: Optional['ConductorEngine'] = None, md_content: str = ""):
""" """
Simulates the lifecycle of a single agent working on a ticket. Simulates the lifecycle of a single agent working on a ticket.
@@ -202,10 +259,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
for i, file_path in enumerate(context_files): for i, file_path in enumerate(context_files):
try: try:
abs_path = Path(file_path) abs_path = Path(file_path)
if not abs_path.is_absolute() and engine:
# Resolve relative to project base if possible
# (This is a bit simplified, but helps) # (This is a bit simplified, but helps)
pass
with open(file_path, 'r', encoding='utf-8') as f: with open(file_path, 'r', encoding='utf-8') as f:
content = f.read() content = f.read()
if i == 0: if i == 0:
@@ -229,6 +283,22 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
"start your response with 'BLOCKED' and explain why." "start your response with 'BLOCKED' and explain why."
) )
# HITL Clutch: call confirm_spawn if event_queue is provided
if event_queue:
approved, modified_prompt, modified_context = confirm_spawn(
role="Tier 3 Worker",
prompt=user_message,
context_md=md_content,
event_queue=event_queue,
ticket_id=ticket.id
)
if not approved:
ticket.mark_blocked("Spawn rejected by user.")
return "BLOCKED: Spawn rejected by user."
user_message = modified_prompt
md_content = modified_context
# HITL Clutch: pass the queue and ticket_id to confirm_execution # HITL Clutch: pass the queue and ticket_id to confirm_execution
def clutch_callback(payload: str) -> bool: def clutch_callback(payload: str) -> bool:
if not event_queue: if not event_queue:
@@ -246,9 +316,6 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
# Update usage in engine if provided # Update usage in engine if provided
if engine: if engine:
stats = {} # ai_client.get_token_stats() is not available stats = {} # ai_client.get_token_stats() is not available
# ai_client provides aggregate stats, for granular tier tracking
# we'd need to diff before/after or have ai_client return usage per call.
# For Phase 4, we'll use a simplified diff approach.
engine.tier_usage["Tier 3"]["input"] += stats.get("prompt_tokens", 0) engine.tier_usage["Tier 3"]["input"] += stats.get("prompt_tokens", 0)
engine.tier_usage["Tier 3"]["output"] += stats.get("candidates_tokens", 0) engine.tier_usage["Tier 3"]["output"] += stats.get("candidates_tokens", 0)

View File

@@ -1,88 +1,55 @@
import tree_sitter import ast
import tree_sitter_python
from pathlib import Path from pathlib import Path
class CodeOutliner: class CodeOutliner:
def __init__(self): def __init__(self):
self.language = tree_sitter.Language(tree_sitter_python.language()) pass
self.parser = tree_sitter.Parser(self.language)
def outline(self, code: str) -> str: def outline(self, code: str) -> str:
tree = self.parser.parse(bytes(code, "utf8")) code = code.lstrip(chr(0xFEFF))
lines = code.splitlines() try:
tree = ast.parse(code)
except SyntaxError as e:
return f"ERROR parsing code: {e}"
output = [] output = []
def get_docstring(node): def get_docstring(node):
# In Python, docstring is usually the first expression statement in a block doc = ast.get_docstring(node)
body = node.child_by_field_name("body") if doc:
if body and body.type == "block": return doc.splitlines()[0]
for child in body.children:
if child.type == "comment":
continue
if child.type == "expression_statement":
expr = child.children[0]
if expr.type == "string":
doc = code[expr.start_byte:expr.end_byte].strip()
# Strip quotes
if doc.startswith(('"""', "'''")):
doc = doc[3:-3]
elif doc.startswith(('"', "'")):
doc = doc[1:-1]
return doc.splitlines()[0] if doc else ""
break
return None return None
def walk(node, indent=0): def walk(node, indent=0):
node_type = node.type if isinstance(node, ast.ClassDef):
name = None start_line = node.lineno
end_line = getattr(node, "end_lineno", start_line)
if node_type == "class_definition": output.append(f"{' ' * indent}[Class] {node.name} (Lines {start_line}-{end_line})")
name_node = node.child_by_field_name("name")
if name_node:
name = code[name_node.start_byte:name_node.end_byte]
start_line = node.start_point.row + 1
end_line = node.end_point.row + 1
output.append(f"{' ' * indent}[Class] {name} (Lines {start_line}-{end_line})")
doc = get_docstring(node) doc = get_docstring(node)
if doc: if doc:
output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"") output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"")
for item in node.body:
walk(item, indent + 1)
elif node_type in ("function_definition", "async_function_definition"): elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
name_node = node.child_by_field_name("name") start_line = node.lineno
if name_node: end_line = getattr(node, "end_lineno", start_line)
name = code[name_node.start_byte:name_node.end_byte] prefix = "[Async Func]" if isinstance(node, ast.AsyncFunctionDef) else "[Func]"
start_line = node.start_point.row + 1
end_line = node.end_point.row + 1
prefix = "[Async Func]" if node_type == "async_function_definition" else "[Func]"
# Check if it's a method (parent is a class body) # Check if it's a method
parent = node.parent # We can check the indent or the parent, but in AST walk we know if we are inside a ClassDef
while parent and parent.type != "class_definition": # Let's use a simpler heuristic for the outline: if indent > 0, it's likely a method.
if parent.type == "module": if indent > 0:
break
parent = parent.parent
if parent and parent.type == "class_definition":
prefix = "[Method]" prefix = "[Method]"
output.append(f"{' ' * indent}{prefix} {name} (Lines {start_line}-{end_line})") output.append(f"{' ' * indent}{prefix} {node.name} (Lines {start_line}-{end_line})")
doc = get_docstring(node) doc = get_docstring(node)
if doc: if doc:
output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"") output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"")
for child in node.children: for node in tree.body:
# Don't recurse into function bodies for outlining functions, walk(node)
# but we DO want to recurse into classes to find methods.
if node_type == "class_definition":
if child.type == "block":
walk(child, indent + 1)
elif node_type == "module":
walk(child, indent)
elif node_type == "block":
walk(child, indent)
walk(tree.root_node)
return "\n".join(output) return "\n".join(output)
def get_outline(path: Path, code: str) -> str: def get_outline(path: Path, code: str) -> str:

View File

@@ -82,3 +82,25 @@ def test_cb_plan_epic_launches_thread(app_instance):
mock_get_history.assert_called_once() mock_get_history.assert_called_once()
mock_gen_tracks.assert_called_once() mock_gen_tracks.assert_called_once()
def test_process_pending_gui_tasks_mma_spawn_approval(app_instance):
"""Verifies that the 'mma_spawn_approval' action correctly updates the UI state."""
task = {
"action": "mma_spawn_approval",
"ticket_id": "T1",
"role": "Tier 3 Worker",
"prompt": "Test Prompt",
"context_md": "Test Context",
"dialog_container": [None]
}
app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks()
assert app_instance._pending_mma_spawn == task
assert app_instance._mma_spawn_prompt == "Test Prompt"
assert app_instance._mma_spawn_context == "Test Context"
assert app_instance._mma_spawn_open is True
assert app_instance._mma_spawn_edit_mode is False
assert task["dialog_container"][0] is not None
assert task["dialog_container"][0]._ticket_id == "T1"

View File

@@ -0,0 +1,90 @@
import pytest
from unittest.mock import MagicMock, patch
import multi_agent_conductor
from models import Ticket, WorkerContext
import events
import asyncio
import concurrent.futures
class MockDialog:
def __init__(self, approved, final_payload=None):
self.approved = approved
self.final_payload = final_payload
def wait(self):
# Match the new return format: a dictionary
res = {'approved': self.approved, 'abort': False}
if self.final_payload:
res.update(self.final_payload)
return res
@pytest.fixture
def mock_ai_client():
with patch("ai_client.send") as mock_send:
mock_send.return_value = "Task completed"
yield mock_send
@pytest.mark.asyncio
async def test_confirm_spawn_pushed_to_queue():
event_queue = events.AsyncEventQueue()
ticket_id = "T1"
role = "Tier 3 Worker"
prompt = "Original Prompt"
context_md = "Original Context"
# Start confirm_spawn in a thread since it blocks with time.sleep
def run_confirm():
return multi_agent_conductor.confirm_spawn(role, prompt, context_md, event_queue, ticket_id)
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = loop.run_in_executor(executor, run_confirm)
# Wait for the event to appear in the queue
event_name, payload = await event_queue.get()
assert event_name == "mma_spawn_approval"
assert payload["ticket_id"] == ticket_id
assert payload["role"] == role
assert payload["prompt"] == prompt
assert payload["context_md"] == context_md
assert "dialog_container" in payload
# Simulate GUI injecting a dialog
payload["dialog_container"][0] = MockDialog(True, {"prompt": "Modified Prompt", "context_md": "Modified Context"})
approved, final_prompt, final_context = await future
assert approved is True
assert final_prompt == "Modified Prompt"
assert final_context == "Modified Context"
@patch("multi_agent_conductor.confirm_spawn")
def test_run_worker_lifecycle_approved(mock_confirm, mock_ai_client):
ticket = Ticket(id="T1", description="desc", status="todo", assigned_to="user")
context = WorkerContext(ticket_id="T1", model_name="model", messages=[])
event_queue = events.AsyncEventQueue()
mock_confirm.return_value = (True, "Modified Prompt", "Modified Context")
multi_agent_conductor.run_worker_lifecycle(ticket, context, event_queue=event_queue)
mock_confirm.assert_called_once()
# Check that ai_client.send was called with modified values
args, kwargs = mock_ai_client.call_args
assert kwargs["user_message"] == "Modified Prompt"
assert kwargs["md_content"] == "Modified Context"
assert ticket.status == "completed"
@patch("multi_agent_conductor.confirm_spawn")
def test_run_worker_lifecycle_rejected(mock_confirm, mock_ai_client):
ticket = Ticket(id="T1", description="desc", status="todo", assigned_to="user")
context = WorkerContext(ticket_id="T1", model_name="model", messages=[])
event_queue = events.AsyncEventQueue()
mock_confirm.return_value = (False, "Original Prompt", "Original Context")
result = multi_agent_conductor.run_worker_lifecycle(ticket, context, event_queue=event_queue)
mock_confirm.assert_called_once()
mock_ai_client.assert_not_called()
assert ticket.status == "blocked"
assert "Spawn rejected by user" in ticket.blocked_reason
assert "BLOCKED" in result

View File

@@ -0,0 +1,136 @@
import pytest
from pathlib import Path
from aggregate import build_tier1_context, build_tier2_context, build_tier3_context
def test_build_tier1_context_exists():
# This should fail if the function is not defined
file_items = [
{"path": Path("conductor/product.md"), "entry": "conductor/product.md", "content": "Product content", "error": False},
{"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False}
]
history = ["User: hello", "AI: hi"]
result = build_tier1_context(file_items, Path("."), [], history)
assert "Product content" in result
# other.py should be summarized, not full content in a code block
assert "Other content" not in result or "Summarized" in result # Assuming summary format
def test_build_tier2_context_exists():
file_items = [
{"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False}
]
history = ["User: hello"]
result = build_tier2_context(file_items, Path("."), [], history)
assert "Other content" in result
def test_build_tier3_context_ast_skeleton(monkeypatch):
from unittest.mock import MagicMock
import aggregate
import file_cache
# Mock ASTParser
mock_parser_instance = MagicMock()
mock_parser_instance.get_skeleton.return_value = "def other():\n ..."
mock_parser_class = MagicMock(return_value=mock_parser_instance)
# Mock file_cache.ASTParser in aggregate module
monkeypatch.setattr("aggregate.ASTParser", mock_parser_class)
file_items = [
{"path": Path("other.py"), "entry": "other.py", "content": "def other():\n pass", "error": False}
]
history = []
# New behavior check: it should use ASTParser for .py files not in focus
result = build_tier3_context(file_items, Path("."), [], history, focus_files=[])
assert "def other():" in result
assert "..." in result
assert "Python" not in result # summarize.py output should not be there if AST skeleton is used
mock_parser_class.assert_called_once_with("python")
mock_parser_instance.get_skeleton.assert_called_once_with("def other():\n pass")
def test_build_tier3_context_exists():
file_items = [
{"path": Path("focus.py"), "entry": "focus.py", "content": "def focus():\n pass", "error": False},
{"path": Path("other.py"), "entry": "other.py", "content": "def other():\n pass", "error": False}
]
history = ["User: hello"]
result = build_tier3_context(file_items, Path("."), [], history, focus_files=["focus.py"])
assert "def focus():" in result
assert "pass" in result
# other.py should have skeletonized content, not full "pass" (if get_skeleton works)
# However, for a simple "pass", the skeleton might be the same or similar.
# Let's check for the header
assert "other.py" in result
assert "AST Skeleton" in result
def test_build_file_items_with_tiers(tmp_path):
from aggregate import build_file_items
# Create some dummy files
file1 = tmp_path / "file1.txt"
file1.write_text("content1")
file2 = tmp_path / "file2.txt"
file2.write_text("content2")
files_config = [
"file1.txt",
{"path": "file2.txt", "tier": 3}
]
items = build_file_items(tmp_path, files_config)
assert len(items) == 2
item1 = next(i for i in items if i["entry"] == "file1.txt")
assert item1["content"] == "content1"
assert "tier" in item1
assert item1["tier"] is None
item2 = next(i for i in items if i["entry"] == "file2.txt")
assert item2["content"] == "content2"
assert item2["tier"] == 3
def test_build_files_section_with_dicts(tmp_path):
from aggregate import build_files_section
file1 = tmp_path / "file1.txt"
file1.write_text("content1")
files_config = [
{"path": str(file1)}
]
result = build_files_section(tmp_path, files_config)
assert "content1" in result
assert "file1.txt" in result
def test_tiered_context_by_tier_field():
file_items = [
{"path": Path("tier1_file.txt"), "entry": "tier1_file.txt", "content": "Full Tier 1 Content\nLine 2", "tier": 1},
{"path": Path("tier3_file.txt"), "entry": "tier3_file.txt", "content": "Full Tier 3 Content\nLine 2\nLine 3\nLine 4\nLine 5\nLine 6\nLine 7\nLine 8\nLine 9\nLine 10", "tier": 3},
{"path": Path("other.txt"), "entry": "other.txt", "content": "Other Content\nLine 2\nLine 3\nLine 4\nLine 5\nLine 6\nLine 7\nLine 8\nLine 9\nLine 10", "tier": None}
]
# Test Tier 1 Context
result_t1 = build_tier1_context(file_items, Path("."), [], [])
assert "Full Tier 1 Content" in result_t1
assert "Line 2" in result_t1 # In full
# tier3_file.txt should be summarized
assert "tier3_file.txt" in result_t1
assert "preview:" in result_t1
assert "Line 9" not in result_t1 # Only first 8 lines in preview
# Test Tier 3 Context
result_t3 = build_tier3_context(file_items, Path("."), [], [], focus_files=[])
assert "Full Tier 3 Content" in result_t3
assert "Line 10" in result_t3 # In full
# tier1_file.txt should be summarized
assert "tier1_file.txt" in result_t3
assert "preview:" in result_t3
assert "Full Tier 1 Content" in result_t3 # It's short, so it's in preview