Compare commits

...

18 Commits

Author SHA1 Message Date
Ed_
a97eb2a222 chore(conductor): Mark track 'Tiered Context Scoping & HITL Approval' as complete 2026-02-27 22:32:07 -05:00
Ed_
913cfee2dd docs(conductor): Synchronize docs for track 'Tiered Context Scoping & HITL Approval' 2026-02-27 22:31:58 -05:00
Ed_
3c7d4cd841 conductor(plan): Finalize plan for track 'Tiered Context Scoping & HITL Approval' 2026-02-27 22:31:39 -05:00
Ed_
a6c627a6b5 conductor(plan): Mark phase 'Phase 3: Approval UX Modal' as complete 2026-02-27 22:31:11 -05:00
Ed_
21157f92c3 feat(mma): Finalize Approval UX Modal in GUI 2026-02-27 22:30:55 -05:00
Ed_
bee75e7b4d conductor(plan): Mark task 'Interception logic' as complete 2026-02-27 22:30:13 -05:00
Ed_
4c53ca11da feat(mma): Implement interception logic in GUI and Conductor 2026-02-27 22:29:55 -05:00
Ed_
1017a4d807 conductor(plan): Mark task 'Signaling mechanism' as complete 2026-02-27 22:27:19 -05:00
Ed_
e293c5e302 feat(mma): Implement spawn interception in multi_agent_conductor.py 2026-02-27 22:27:05 -05:00
Ed_
c2c8732100 conductor(plan): Mark phase 'Phase 1: Context Subsetting' as complete 2026-02-27 22:24:11 -05:00
Ed_
d7a24d66ae conductor(checkpoint): Checkpoint end of Phase 1 (Context Subsetting) 2026-02-27 22:23:57 -05:00
Ed_
528aaf1957 feat(mma): Finalize Phase 1 with AST-based outline and improved tiered selection 2026-02-27 22:23:50 -05:00
Ed_
f59ef247cf conductor(plan): Mark task 'Update project state' as complete 2026-02-27 22:23:31 -05:00
Ed_
2ece9e1141 feat(aggregate): support dictionary-based file entries with optional tiers 2026-02-27 22:21:18 -05:00
Ed_
4c744f2c8e conductor(plan): Mark task 'Integrate AST skeleton' as complete 2026-02-27 22:18:39 -05:00
Ed_
0ed01aa1c9 feat(mma): Integrate AST skeleton extraction into Tier 3 context build 2026-02-27 22:18:26 -05:00
Ed_
34bd61aa6c conductor(plan): Mark task 'Refactor aggregate.py' as complete 2026-02-27 22:16:55 -05:00
Ed_
6aa642bc42 feat(mma): Implement tiered context scoping and add get_definition tool 2026-02-27 22:16:43 -05:00
12 changed files with 679 additions and 101 deletions

Binary file not shown.

View File

@@ -17,6 +17,7 @@ import glob
from pathlib import Path, PureWindowsPath
import summarize
import project_manager
from file_cache import ASTParser
def find_next_increment(output_dir: Path, namespace: str) -> int:
pattern = re.compile(rf"^{re.escape(namespace)}_(\d+)\.md$")
@@ -63,9 +64,14 @@ def build_discussion_section(history: list[str]) -> str:
sections.append(f"### Discussion Excerpt {i}\n\n{paste.strip()}")
return "\n\n---\n\n".join(sections)
def build_files_section(base_dir: Path, files: list[str]) -> str:
def build_files_section(base_dir: Path, files: list[str | dict]) -> str:
sections = []
for entry in files:
for entry_raw in files:
if isinstance(entry_raw, dict):
entry = entry_raw.get("path")
else:
entry = entry_raw
paths = resolve_paths(base_dir, entry)
if not paths:
sections.append(f"### `{entry}`\n\n```text\nERROR: no files matched: {entry}\n```")
@@ -99,7 +105,7 @@ def build_screenshots_section(base_dir: Path, screenshots: list[str]) -> str:
return "\n\n---\n\n".join(sections)
def build_file_items(base_dir: Path, files: list[str]) -> list[dict]:
def build_file_items(base_dir: Path, files: list[str | dict]) -> list[dict]:
"""
Return a list of dicts describing each file, for use by ai_client when it
wants to upload individual files rather than inline everything as markdown.
@@ -110,12 +116,20 @@ def build_file_items(base_dir: Path, files: list[str]) -> list[dict]:
content : str (file text, or error string)
error : bool
mtime : float (last modification time, for skip-if-unchanged optimization)
tier : int | None (optional tier for context management)
"""
items = []
for entry in files:
for entry_raw in files:
if isinstance(entry_raw, dict):
entry = entry_raw.get("path")
tier = entry_raw.get("tier")
else:
entry = entry_raw
tier = None
paths = resolve_paths(base_dir, entry)
if not paths:
items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0})
items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0, "tier": tier})
continue
for path in paths:
try:
@@ -130,10 +144,10 @@ def build_file_items(base_dir: Path, files: list[str]) -> list[dict]:
content = f"ERROR: {e}"
mtime = 0.0
error = True
items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime})
items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime, "tier": tier})
return items
def build_summary_section(base_dir: Path, files: list[str]) -> str:
def build_summary_section(base_dir: Path, files: list[str | dict]) -> str:
"""
Build a compact summary section using summarize.py — one short block per file.
Used as the initial <context> block instead of full file contents.
@@ -187,7 +201,99 @@ def build_discussion_text(history: list[str]) -> str:
return "## Discussion History\n\n" + build_discussion_section(history)
def build_markdown(base_dir: Path, files: list[str], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str:
def build_tier1_context(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str]) -> str:
"""
Tier 1 Context: Strategic/Orchestration.
Full content for core conductor files and files with tier=1, summaries for others.
"""
core_files = {"product.md", "tech-stack.md", "workflow.md", "tracks.md"}
parts = []
# Files section
if file_items:
sections = []
for item in file_items:
path = item.get("path")
name = path.name if path else ""
if name in core_files or item.get("tier") == 1:
# Include in full
sections.append("### `" + (item.get("entry") or str(path)) + "`\n\n" +
f"```{path.suffix.lstrip('.') if path.suffix else 'text'}\n{item.get('content', '')}\n```")
else:
# Summarize
sections.append("### `" + (item.get("entry") or str(path)) + "`\n\n" +
summarize.summarise_file(path, item.get("content", "")))
parts.append("## Files (Tier 1 - Mixed)\n\n" + "\n\n---\n\n".join(sections))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_tier2_context(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str]) -> str:
"""
Tier 2 Context: Architectural/Tech Lead.
Full content for all files (standard behavior).
"""
return build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history, summary_only=False)
def build_tier3_context(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str], focus_files: list[str]) -> str:
"""
Tier 3 Context: Execution/Worker.
Full content for focus_files and files with tier=3, summaries/skeletons for others.
"""
parts = []
if file_items:
sections = []
for item in file_items:
path = item.get("path")
entry = item.get("entry", "")
path_str = str(path) if path else ""
# Check if this file is in focus_files (by name or path)
is_focus = False
for focus in focus_files:
if focus == entry or (path and focus == path.name) or focus in path_str:
is_focus = True
break
if is_focus or item.get("tier") == 3:
sections.append("### `" + (entry or path_str) + "`\n\n" +
f"```{path.suffix.lstrip('.') if path and path.suffix else 'text'}\n{item.get('content', '')}\n```")
else:
content = item.get("content", "")
if path and path.suffix == ".py" and not item.get("error"):
try:
parser = ASTParser("python")
skeleton = parser.get_skeleton(content)
sections.append(f"### `{entry or path_str}` (AST Skeleton)\n\n```python\n{skeleton}\n```")
except Exception as e:
# Fallback to summary if AST parsing fails
sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content))
else:
sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content))
parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_markdown(base_dir: Path, files: list[str | dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str:
parts = []
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits
if files:

View File

@@ -20,6 +20,8 @@ To serve as an expert-level utility for personal developer use on small projects
- **Native DAG Execution Engine:** Employs a Python-based Directed Acyclic Graph (DAG) engine to manage complex task dependencies, supporting automated topological sorting and robust cycle detection.
- **Programmable Execution State Machine:** Governing the transition between "Auto-Queue" (autonomous worker spawning) and "Step Mode" (explicit manual approval for each task transition).
- **Role-Scoped Documentation:** Automated mapping of foundational documents to specific tiers to prevent token bloat and maintain high-signal context.
- **Tiered Context Scoping:** Employs optimized context subsets for each tier. Tiers 1 & 2 receive strategic documents and full history, while Tier 3/4 workers receive task-specific "Focus Files" and automated AST dependency skeletons.
- **Worker Spawn Interceptor:** A mandatory security gate that intercepts every sub-agent launch. Provides a GUI modal allowing the user to review, modify, or reject the worker's prompt and file context before it is sent to the API.
- **Strict Memory Siloing:** Employs tree-sitter AST-based interface extraction (Skeleton View, Curated View) and "Context Amnesia" to provide workers only with the absolute minimum context required, preventing hallucination loops.
- **Explicit Execution Control:** All AI-generated PowerShell scripts require explicit human confirmation via interactive UI dialogs before execution, supported by a global "Linear Execution Clutch" for deterministic debugging.
- **Asynchronous Event-Driven Architecture:** Uses an `AsyncEventQueue` to link GUI actions to the backend engine, ensuring the interface remains fully responsive during multi-model generation and parallel worker execution.

View File

@@ -20,9 +20,10 @@ This file tracks all major tracks for the project. Each track has its own detail
---
- [~] **Track: Tiered Context Scoping & HITL Approval**
- [x] **Track: Tiered Context Scoping & HITL Approval**
*Link: [./tracks/tiered_context_scoping_hitl_approval/](./tracks/tiered_context_scoping_hitl_approval/)*
---
- [ ] **Track: MMA Dashboard Visualization Overhaul**

View File

@@ -1,15 +1,15 @@
# Implementation Plan: Tiered Context Scoping & HITL Approval
## Phase 1: Context Subsetting
- [~] Task: Refactor `aggregate.py` to support targeted context builds (e.g., `build_tier1_context`, `build_tier3_context`).
- [ ] Task: Integrate AST skeleton extraction into the standard Tier 3 context build.
- [ ] Task: Update the project state to track which files are assigned to which tier.
## Phase 1: Context Subsetting [checkpoint: d7a24d6]
- [x] Task: Refactor `aggregate.py` to support targeted context builds (e.g., `build_tier1_context`, `build_tier3_context`). 6aa642b
- [x] Task: Integrate AST skeleton extraction into the standard Tier 3 context build. 0ed01aa
- [x] Task: Update the project state to track which files are assigned to which tier. 2ece9e1
## Phase 2: The Spawn Interceptor
- [ ] Task: Create a signaling mechanism in `multi_agent_conductor.py` to emit a "Worker Spawn Requested" event.
- [ ] Task: Implement the interception logic that pauses the async dispatcher until a signal is received from the GUI.
## Phase 2: The Spawn Interceptor [checkpoint: 4c53ca1]
- [x] Task: Create a signaling mechanism in `multi_agent_conductor.py` to emit a "Worker Spawn Requested" event. e293c5e
- [x] Task: Implement the interception logic that pauses the async dispatcher until a signal is received from the GUI. 4c53ca1
## Phase 3: Approval UX Modal
- [ ] Task: Design the "Approve Worker Spawn" modal in DearPyGui.
- [ ] Task: Populate the modal with the target role, the exact prompt, and a read-only view of the specific file context.
- [ ] Task: Wire the "Approve", "Modify", and "Reject" buttons to resume or cancel the intercepted spawn.
## Phase 3: Approval UX Modal [checkpoint: 21157f9]
- [x] Task: Design the "Approve Worker Spawn" modal in DearPyGui. 21157f9
- [x] Task: Populate the modal with the target role, the exact prompt, and a read-only view of the specific file context. 21157f9
- [x] Task: Wire the "Approve", "Modify", and "Reject" buttons to resume or cancel the intercepted spawn. 21157f9

111
gui_2.py
View File

@@ -131,6 +131,29 @@ class MMAApprovalDialog:
return self._approved, self._payload
class MMASpawnApprovalDialog:
def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str):
self._ticket_id = ticket_id
self._role = role
self._prompt = prompt
self._context_md = context_md
self._condition = threading.Condition()
self._done = False
self._approved = False
self._abort = False
def wait(self) -> dict:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1)
return {
'approved': self._approved,
'abort': self._abort,
'prompt': self._prompt,
'context_md': self._context_md
}
class App:
"""The main ImGui interface orchestrator for Manual Slop."""
@@ -246,6 +269,13 @@ class App:
self._mma_approval_edit_mode = False
self._mma_approval_payload = ""
# MMA Spawn approval state
self._pending_mma_spawn = None
self._mma_spawn_open = False
self._mma_spawn_edit_mode = False
self._mma_spawn_prompt = ''
self._mma_spawn_context = ''
# Orchestration State
self.ui_epic_input = ""
self.proposed_tracks: list[dict] = []
@@ -989,6 +1019,21 @@ class App:
if "dialog_container" in task:
task["dialog_container"][0] = dlg
elif action == "mma_spawn_approval":
dlg = MMASpawnApprovalDialog(
task.get("ticket_id"),
task.get("role"),
task.get("prompt"),
task.get("context_md")
)
self._pending_mma_spawn = task
self._mma_spawn_prompt = task.get("prompt", "")
self._mma_spawn_context = task.get("context_md", "")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
if "dialog_container" in task:
task["dialog_container"][0] = dlg
except Exception as e:
print(f"Error executing GUI task: {e}")
@@ -1020,7 +1065,7 @@ class App:
else:
print("[DEBUG] No pending dialog to reject")
def _handle_mma_respond(self, approved: bool, payload: str = None):
def _handle_mma_respond(self, approved: bool, payload: str = None, abort: bool = False, prompt: str = None, context_md: str = None):
if self._pending_mma_approval:
dlg = self._pending_mma_approval.get("dialog_container", [None])[0]
if dlg:
@@ -1031,6 +1076,20 @@ class App:
dlg._done = True
dlg._condition.notify_all()
self._pending_mma_approval = None
if self._pending_mma_spawn:
dlg = self._pending_mma_spawn.get("dialog_container", [None])[0]
if dlg:
with dlg._condition:
dlg._approved = approved
dlg._abort = abort
if prompt is not None:
dlg._prompt = prompt
if context_md is not None:
dlg._context_md = context_md
dlg._done = True
dlg._condition.notify_all()
self._pending_mma_spawn = None
def _handle_approve_ask(self):
"""Responds with approval for a pending /api/ask request."""
@@ -1821,6 +1880,56 @@ class App:
imgui.close_current_popup()
imgui.end_popup()
# MMA Spawn Approval Modal
if self._pending_mma_spawn:
if not self._mma_spawn_open:
imgui.open_popup("MMA Spawn Approval")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
self._mma_spawn_prompt = self._pending_mma_spawn.get("prompt", "")
self._mma_spawn_context = self._pending_mma_spawn.get("context_md", "")
else:
self._mma_spawn_open = False
if imgui.begin_popup_modal("MMA Spawn Approval", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_mma_spawn:
imgui.close_current_popup()
else:
role = self._pending_mma_spawn.get("role", "??")
ticket_id = self._pending_mma_spawn.get("ticket_id", "??")
imgui.text(f"Spawning {role} for Ticket {ticket_id}")
imgui.separator()
if self._mma_spawn_edit_mode:
imgui.text("Edit Prompt:")
_, self._mma_spawn_prompt = imgui.input_text_multiline("##spawn_prompt", self._mma_spawn_prompt, imgui.ImVec2(800, 200))
imgui.text("Edit Context MD:")
_, self._mma_spawn_context = imgui.input_text_multiline("##spawn_context", self._mma_spawn_context, imgui.ImVec2(800, 300))
else:
imgui.text("Proposed Prompt:")
imgui.begin_child("spawn_prompt_preview", imgui.ImVec2(800, 150), True)
imgui.text_unformatted(self._mma_spawn_prompt)
imgui.end_child()
imgui.text("Proposed Context MD:")
imgui.begin_child("spawn_context_preview", imgui.ImVec2(800, 250), True)
imgui.text_unformatted(self._mma_spawn_context)
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=True, prompt=self._mma_spawn_prompt, context_md=self._mma_spawn_context)
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Edit Mode" if not self._mma_spawn_edit_mode else "Preview Mode", imgui.ImVec2(120, 0)):
self._mma_spawn_edit_mode = not self._mma_spawn_edit_mode
imgui.same_line()
if imgui.button("Abort", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=False, abort=True)
imgui.close_current_popup()
imgui.end_popup()
if self.show_script_output:
if self._trigger_script_blink:
self._trigger_script_blink = False

View File

@@ -281,6 +281,60 @@ def get_code_outline(path: str) -> str:
return f"ERROR generating outline for '{path}': {e}"
def get_definition(path: str, name: str) -> str:
"""
Returns the source code for a specific class, function, or method definition.
path: Path to the code file.
name: Name of the definition to retrieve (e.g., 'MyClass', 'my_function', 'MyClass.my_method').
"""
p, err = _resolve_and_check(path)
if err:
return err
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
if p.suffix != ".py":
return f"ERROR: get_definition currently only supports .py files (unsupported: {p.suffix})"
try:
import ast
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
lines = code.splitlines()
tree = ast.parse(code)
# Split name for methods (e.g., "MyClass.my_method")
parts = name.split(".")
target_class = parts[0] if len(parts) > 1 else None
target_name = parts[-1]
def get_source_from_node(node):
# In Python 3.8+, ast.get_source_segment is available
# But we can also use lineno and end_lineno
if hasattr(node, "lineno") and hasattr(node, "end_lineno"):
# lineno is 1-indexed
start = node.lineno - 1
end = node.end_lineno
return "\n".join(lines[start:end])
return f"ERROR: Could not extract source for node {node}"
for node in ast.walk(tree):
if target_class:
if isinstance(node, ast.ClassDef) and node.name == target_class:
for body_node in node.body:
if isinstance(body_node, (ast.FunctionDef, ast.AsyncFunctionDef)) and body_node.name == target_name:
return get_source_from_node(body_node)
else:
if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef)) and node.name == target_name:
return get_source_from_node(node)
return f"ERROR: could not find definition '{name}' in {path}"
except Exception as e:
return f"ERROR retrieving definition '{name}' from '{path}': {e}"
def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str:
"""
Returns the git diff for a file or directory.
@@ -436,7 +490,7 @@ def get_ui_performance() -> str:
# ------------------------------------------------------------------ tool dispatch
TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary", "get_python_skeleton", "get_code_outline", "get_git_diff", "web_search", "fetch_url", "get_ui_performance"}
TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary", "get_python_skeleton", "get_code_outline", "get_definition", "get_git_diff", "web_search", "fetch_url", "get_ui_performance"}
def dispatch(tool_name: str, tool_input: dict) -> str:
@@ -455,6 +509,8 @@ def dispatch(tool_name: str, tool_input: dict) -> str:
return get_python_skeleton(tool_input.get("path", ""))
if tool_name == "get_code_outline":
return get_code_outline(tool_input.get("path", ""))
if tool_name == "get_definition":
return get_definition(tool_input.get("path", ""), tool_input.get("name", ""))
if tool_name == "get_git_diff":
return get_git_diff(
tool_input.get("path", ""),
@@ -586,6 +642,27 @@ MCP_TOOL_SPECS = [
"required": ["path"],
},
},
{
"name": "get_definition",
"description": (
"Get the full source code of a specific class, function, or method definition. "
"This is more efficient than reading the whole file if you know what you're looking for."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the .py file.",
},
"name": {
"type": "string",
"description": "The name of the class or function to retrieve. Use 'ClassName.method_name' for methods.",
}
},
"required": ["path", "name"],
},
},
{
"name": "get_git_diff",
"description": (
@@ -648,3 +725,4 @@ MCP_TOOL_SPECS = [
}
}
]

View File

@@ -1,11 +1,12 @@
import ai_client
import json
import asyncio
from typing import List, Optional
from typing import List, Optional, Tuple
from dataclasses import asdict
import events
from models import Ticket, Track, WorkerContext
from file_cache import ASTParser
from pathlib import Path
from dag_engine import TrackDAG, ExecutionEngine
@@ -181,6 +182,62 @@ def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_
return False
def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> Tuple[bool, str, str]:
"""
Pushes a spawn approval request to the GUI and waits for response.
Returns (approved, modified_prompt, modified_context)
"""
import threading
import time
import asyncio
dialog_container = [None]
task = {
"action": "mma_spawn_approval",
"ticket_id": ticket_id,
"role": role,
"prompt": prompt,
"context_md": context_md,
"dialog_container": dialog_container
}
# Push to queue
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.run_coroutine_threadsafe(event_queue.put("mma_spawn_approval", task), loop)
else:
event_queue._queue.put_nowait(("mma_spawn_approval", task))
except Exception:
# Fallback if no loop
event_queue._queue.put_nowait(("mma_spawn_approval", task))
# Wait for the GUI to create the dialog and for the user to respond
start = time.time()
while dialog_container[0] is None and time.time() - start < 60:
time.sleep(0.1)
if dialog_container[0]:
res = dialog_container[0].wait()
if isinstance(res, dict):
approved = res.get("approved", False)
abort = res.get("abort", False)
modified_prompt = res.get("prompt", prompt)
modified_context = res.get("context_md", context_md)
return approved and not abort, modified_prompt, modified_context
else:
# Fallback for old tuple style if any
approved, final_payload = res
modified_prompt = prompt
modified_context = context_md
if isinstance(final_payload, dict):
modified_prompt = final_payload.get("prompt", prompt)
modified_context = final_payload.get("context_md", context_md)
return approved, modified_prompt, modified_context
return False, prompt, context_md
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None, event_queue: events.AsyncEventQueue = None, engine: Optional['ConductorEngine'] = None, md_content: str = ""):
"""
Simulates the lifecycle of a single agent working on a ticket.
@@ -202,10 +259,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
for i, file_path in enumerate(context_files):
try:
abs_path = Path(file_path)
if not abs_path.is_absolute() and engine:
# Resolve relative to project base if possible
# (This is a bit simplified, but helps)
pass
# (This is a bit simplified, but helps)
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if i == 0:
@@ -229,6 +283,22 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
"start your response with 'BLOCKED' and explain why."
)
# HITL Clutch: call confirm_spawn if event_queue is provided
if event_queue:
approved, modified_prompt, modified_context = confirm_spawn(
role="Tier 3 Worker",
prompt=user_message,
context_md=md_content,
event_queue=event_queue,
ticket_id=ticket.id
)
if not approved:
ticket.mark_blocked("Spawn rejected by user.")
return "BLOCKED: Spawn rejected by user."
user_message = modified_prompt
md_content = modified_context
# HITL Clutch: pass the queue and ticket_id to confirm_execution
def clutch_callback(payload: str) -> bool:
if not event_queue:
@@ -246,9 +316,6 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
# Update usage in engine if provided
if engine:
stats = {} # ai_client.get_token_stats() is not available
# ai_client provides aggregate stats, for granular tier tracking
# we'd need to diff before/after or have ai_client return usage per call.
# For Phase 4, we'll use a simplified diff approach.
engine.tier_usage["Tier 3"]["input"] += stats.get("prompt_tokens", 0)
engine.tier_usage["Tier 3"]["output"] += stats.get("candidates_tokens", 0)

View File

@@ -1,88 +1,55 @@
import tree_sitter
import tree_sitter_python
import ast
from pathlib import Path
class CodeOutliner:
def __init__(self):
self.language = tree_sitter.Language(tree_sitter_python.language())
self.parser = tree_sitter.Parser(self.language)
pass
def outline(self, code: str) -> str:
tree = self.parser.parse(bytes(code, "utf8"))
lines = code.splitlines()
code = code.lstrip(chr(0xFEFF))
try:
tree = ast.parse(code)
except SyntaxError as e:
return f"ERROR parsing code: {e}"
output = []
def get_docstring(node):
# In Python, docstring is usually the first expression statement in a block
body = node.child_by_field_name("body")
if body and body.type == "block":
for child in body.children:
if child.type == "comment":
continue
if child.type == "expression_statement":
expr = child.children[0]
if expr.type == "string":
doc = code[expr.start_byte:expr.end_byte].strip()
# Strip quotes
if doc.startswith(('"""', "'''")):
doc = doc[3:-3]
elif doc.startswith(('"', "'")):
doc = doc[1:-1]
return doc.splitlines()[0] if doc else ""
break
doc = ast.get_docstring(node)
if doc:
return doc.splitlines()[0]
return None
def walk(node, indent=0):
node_type = node.type
name = None
if isinstance(node, ast.ClassDef):
start_line = node.lineno
end_line = getattr(node, "end_lineno", start_line)
output.append(f"{' ' * indent}[Class] {node.name} (Lines {start_line}-{end_line})")
doc = get_docstring(node)
if doc:
output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"")
for item in node.body:
walk(item, indent + 1)
if node_type == "class_definition":
name_node = node.child_by_field_name("name")
if name_node:
name = code[name_node.start_byte:name_node.end_byte]
start_line = node.start_point.row + 1
end_line = node.end_point.row + 1
output.append(f"{' ' * indent}[Class] {name} (Lines {start_line}-{end_line})")
doc = get_docstring(node)
if doc:
output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"")
elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
start_line = node.lineno
end_line = getattr(node, "end_lineno", start_line)
prefix = "[Async Func]" if isinstance(node, ast.AsyncFunctionDef) else "[Func]"
# Check if it's a method
# We can check the indent or the parent, but in AST walk we know if we are inside a ClassDef
# Let's use a simpler heuristic for the outline: if indent > 0, it's likely a method.
if indent > 0:
prefix = "[Method]"
output.append(f"{' ' * indent}{prefix} {node.name} (Lines {start_line}-{end_line})")
doc = get_docstring(node)
if doc:
output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"")
for node in tree.body:
walk(node)
elif node_type in ("function_definition", "async_function_definition"):
name_node = node.child_by_field_name("name")
if name_node:
name = code[name_node.start_byte:name_node.end_byte]
start_line = node.start_point.row + 1
end_line = node.end_point.row + 1
prefix = "[Async Func]" if node_type == "async_function_definition" else "[Func]"
# Check if it's a method (parent is a class body)
parent = node.parent
while parent and parent.type != "class_definition":
if parent.type == "module":
break
parent = parent.parent
if parent and parent.type == "class_definition":
prefix = "[Method]"
output.append(f"{' ' * indent}{prefix} {name} (Lines {start_line}-{end_line})")
doc = get_docstring(node)
if doc:
output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"")
for child in node.children:
# Don't recurse into function bodies for outlining functions,
# but we DO want to recurse into classes to find methods.
if node_type == "class_definition":
if child.type == "block":
walk(child, indent + 1)
elif node_type == "module":
walk(child, indent)
elif node_type == "block":
walk(child, indent)
walk(tree.root_node)
return "\n".join(output)
def get_outline(path: Path, code: str) -> str:

View File

@@ -82,3 +82,25 @@ def test_cb_plan_epic_launches_thread(app_instance):
mock_get_history.assert_called_once()
mock_gen_tracks.assert_called_once()
def test_process_pending_gui_tasks_mma_spawn_approval(app_instance):
"""Verifies that the 'mma_spawn_approval' action correctly updates the UI state."""
task = {
"action": "mma_spawn_approval",
"ticket_id": "T1",
"role": "Tier 3 Worker",
"prompt": "Test Prompt",
"context_md": "Test Context",
"dialog_container": [None]
}
app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks()
assert app_instance._pending_mma_spawn == task
assert app_instance._mma_spawn_prompt == "Test Prompt"
assert app_instance._mma_spawn_context == "Test Context"
assert app_instance._mma_spawn_open is True
assert app_instance._mma_spawn_edit_mode is False
assert task["dialog_container"][0] is not None
assert task["dialog_container"][0]._ticket_id == "T1"

View File

@@ -0,0 +1,90 @@
import pytest
from unittest.mock import MagicMock, patch
import multi_agent_conductor
from models import Ticket, WorkerContext
import events
import asyncio
import concurrent.futures
class MockDialog:
def __init__(self, approved, final_payload=None):
self.approved = approved
self.final_payload = final_payload
def wait(self):
# Match the new return format: a dictionary
res = {'approved': self.approved, 'abort': False}
if self.final_payload:
res.update(self.final_payload)
return res
@pytest.fixture
def mock_ai_client():
with patch("ai_client.send") as mock_send:
mock_send.return_value = "Task completed"
yield mock_send
@pytest.mark.asyncio
async def test_confirm_spawn_pushed_to_queue():
event_queue = events.AsyncEventQueue()
ticket_id = "T1"
role = "Tier 3 Worker"
prompt = "Original Prompt"
context_md = "Original Context"
# Start confirm_spawn in a thread since it blocks with time.sleep
def run_confirm():
return multi_agent_conductor.confirm_spawn(role, prompt, context_md, event_queue, ticket_id)
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = loop.run_in_executor(executor, run_confirm)
# Wait for the event to appear in the queue
event_name, payload = await event_queue.get()
assert event_name == "mma_spawn_approval"
assert payload["ticket_id"] == ticket_id
assert payload["role"] == role
assert payload["prompt"] == prompt
assert payload["context_md"] == context_md
assert "dialog_container" in payload
# Simulate GUI injecting a dialog
payload["dialog_container"][0] = MockDialog(True, {"prompt": "Modified Prompt", "context_md": "Modified Context"})
approved, final_prompt, final_context = await future
assert approved is True
assert final_prompt == "Modified Prompt"
assert final_context == "Modified Context"
@patch("multi_agent_conductor.confirm_spawn")
def test_run_worker_lifecycle_approved(mock_confirm, mock_ai_client):
ticket = Ticket(id="T1", description="desc", status="todo", assigned_to="user")
context = WorkerContext(ticket_id="T1", model_name="model", messages=[])
event_queue = events.AsyncEventQueue()
mock_confirm.return_value = (True, "Modified Prompt", "Modified Context")
multi_agent_conductor.run_worker_lifecycle(ticket, context, event_queue=event_queue)
mock_confirm.assert_called_once()
# Check that ai_client.send was called with modified values
args, kwargs = mock_ai_client.call_args
assert kwargs["user_message"] == "Modified Prompt"
assert kwargs["md_content"] == "Modified Context"
assert ticket.status == "completed"
@patch("multi_agent_conductor.confirm_spawn")
def test_run_worker_lifecycle_rejected(mock_confirm, mock_ai_client):
ticket = Ticket(id="T1", description="desc", status="todo", assigned_to="user")
context = WorkerContext(ticket_id="T1", model_name="model", messages=[])
event_queue = events.AsyncEventQueue()
mock_confirm.return_value = (False, "Original Prompt", "Original Context")
result = multi_agent_conductor.run_worker_lifecycle(ticket, context, event_queue=event_queue)
mock_confirm.assert_called_once()
mock_ai_client.assert_not_called()
assert ticket.status == "blocked"
assert "Spawn rejected by user" in ticket.blocked_reason
assert "BLOCKED" in result

View File

@@ -0,0 +1,136 @@
import pytest
from pathlib import Path
from aggregate import build_tier1_context, build_tier2_context, build_tier3_context
def test_build_tier1_context_exists():
# This should fail if the function is not defined
file_items = [
{"path": Path("conductor/product.md"), "entry": "conductor/product.md", "content": "Product content", "error": False},
{"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False}
]
history = ["User: hello", "AI: hi"]
result = build_tier1_context(file_items, Path("."), [], history)
assert "Product content" in result
# other.py should be summarized, not full content in a code block
assert "Other content" not in result or "Summarized" in result # Assuming summary format
def test_build_tier2_context_exists():
file_items = [
{"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False}
]
history = ["User: hello"]
result = build_tier2_context(file_items, Path("."), [], history)
assert "Other content" in result
def test_build_tier3_context_ast_skeleton(monkeypatch):
from unittest.mock import MagicMock
import aggregate
import file_cache
# Mock ASTParser
mock_parser_instance = MagicMock()
mock_parser_instance.get_skeleton.return_value = "def other():\n ..."
mock_parser_class = MagicMock(return_value=mock_parser_instance)
# Mock file_cache.ASTParser in aggregate module
monkeypatch.setattr("aggregate.ASTParser", mock_parser_class)
file_items = [
{"path": Path("other.py"), "entry": "other.py", "content": "def other():\n pass", "error": False}
]
history = []
# New behavior check: it should use ASTParser for .py files not in focus
result = build_tier3_context(file_items, Path("."), [], history, focus_files=[])
assert "def other():" in result
assert "..." in result
assert "Python" not in result # summarize.py output should not be there if AST skeleton is used
mock_parser_class.assert_called_once_with("python")
mock_parser_instance.get_skeleton.assert_called_once_with("def other():\n pass")
def test_build_tier3_context_exists():
file_items = [
{"path": Path("focus.py"), "entry": "focus.py", "content": "def focus():\n pass", "error": False},
{"path": Path("other.py"), "entry": "other.py", "content": "def other():\n pass", "error": False}
]
history = ["User: hello"]
result = build_tier3_context(file_items, Path("."), [], history, focus_files=["focus.py"])
assert "def focus():" in result
assert "pass" in result
# other.py should have skeletonized content, not full "pass" (if get_skeleton works)
# However, for a simple "pass", the skeleton might be the same or similar.
# Let's check for the header
assert "other.py" in result
assert "AST Skeleton" in result
def test_build_file_items_with_tiers(tmp_path):
from aggregate import build_file_items
# Create some dummy files
file1 = tmp_path / "file1.txt"
file1.write_text("content1")
file2 = tmp_path / "file2.txt"
file2.write_text("content2")
files_config = [
"file1.txt",
{"path": "file2.txt", "tier": 3}
]
items = build_file_items(tmp_path, files_config)
assert len(items) == 2
item1 = next(i for i in items if i["entry"] == "file1.txt")
assert item1["content"] == "content1"
assert "tier" in item1
assert item1["tier"] is None
item2 = next(i for i in items if i["entry"] == "file2.txt")
assert item2["content"] == "content2"
assert item2["tier"] == 3
def test_build_files_section_with_dicts(tmp_path):
from aggregate import build_files_section
file1 = tmp_path / "file1.txt"
file1.write_text("content1")
files_config = [
{"path": str(file1)}
]
result = build_files_section(tmp_path, files_config)
assert "content1" in result
assert "file1.txt" in result
def test_tiered_context_by_tier_field():
file_items = [
{"path": Path("tier1_file.txt"), "entry": "tier1_file.txt", "content": "Full Tier 1 Content\nLine 2", "tier": 1},
{"path": Path("tier3_file.txt"), "entry": "tier3_file.txt", "content": "Full Tier 3 Content\nLine 2\nLine 3\nLine 4\nLine 5\nLine 6\nLine 7\nLine 8\nLine 9\nLine 10", "tier": 3},
{"path": Path("other.txt"), "entry": "other.txt", "content": "Other Content\nLine 2\nLine 3\nLine 4\nLine 5\nLine 6\nLine 7\nLine 8\nLine 9\nLine 10", "tier": None}
]
# Test Tier 1 Context
result_t1 = build_tier1_context(file_items, Path("."), [], [])
assert "Full Tier 1 Content" in result_t1
assert "Line 2" in result_t1 # In full
# tier3_file.txt should be summarized
assert "tier3_file.txt" in result_t1
assert "preview:" in result_t1
assert "Line 9" not in result_t1 # Only first 8 lines in preview
# Test Tier 3 Context
result_t3 = build_tier3_context(file_items, Path("."), [], [], focus_files=[])
assert "Full Tier 3 Content" in result_t3
assert "Line 10" in result_t3 # In full
# tier1_file.txt should be summarized
assert "tier1_file.txt" in result_t3
assert "preview:" in result_t3
assert "Full Tier 1 Content" in result_t3 # It's short, so it's in preview