Compare commits
18 Commits
a84ea40d16
...
a97eb2a222
| Author | SHA1 | Date | |
|---|---|---|---|
| a97eb2a222 | |||
| 913cfee2dd | |||
| 3c7d4cd841 | |||
| a6c627a6b5 | |||
| 21157f92c3 | |||
| bee75e7b4d | |||
| 4c53ca11da | |||
| 1017a4d807 | |||
| e293c5e302 | |||
| c2c8732100 | |||
| d7a24d66ae | |||
| 528aaf1957 | |||
| f59ef247cf | |||
| 2ece9e1141 | |||
| 4c744f2c8e | |||
| 0ed01aa1c9 | |||
| 34bd61aa6c | |||
| 6aa642bc42 |
Binary file not shown.
122
aggregate.py
122
aggregate.py
@@ -17,6 +17,7 @@ import glob
|
||||
from pathlib import Path, PureWindowsPath
|
||||
import summarize
|
||||
import project_manager
|
||||
from file_cache import ASTParser
|
||||
|
||||
def find_next_increment(output_dir: Path, namespace: str) -> int:
|
||||
pattern = re.compile(rf"^{re.escape(namespace)}_(\d+)\.md$")
|
||||
@@ -63,9 +64,14 @@ def build_discussion_section(history: list[str]) -> str:
|
||||
sections.append(f"### Discussion Excerpt {i}\n\n{paste.strip()}")
|
||||
return "\n\n---\n\n".join(sections)
|
||||
|
||||
def build_files_section(base_dir: Path, files: list[str]) -> str:
|
||||
def build_files_section(base_dir: Path, files: list[str | dict]) -> str:
|
||||
sections = []
|
||||
for entry in files:
|
||||
for entry_raw in files:
|
||||
if isinstance(entry_raw, dict):
|
||||
entry = entry_raw.get("path")
|
||||
else:
|
||||
entry = entry_raw
|
||||
|
||||
paths = resolve_paths(base_dir, entry)
|
||||
if not paths:
|
||||
sections.append(f"### `{entry}`\n\n```text\nERROR: no files matched: {entry}\n```")
|
||||
@@ -99,7 +105,7 @@ def build_screenshots_section(base_dir: Path, screenshots: list[str]) -> str:
|
||||
return "\n\n---\n\n".join(sections)
|
||||
|
||||
|
||||
def build_file_items(base_dir: Path, files: list[str]) -> list[dict]:
|
||||
def build_file_items(base_dir: Path, files: list[str | dict]) -> list[dict]:
|
||||
"""
|
||||
Return a list of dicts describing each file, for use by ai_client when it
|
||||
wants to upload individual files rather than inline everything as markdown.
|
||||
@@ -110,12 +116,20 @@ def build_file_items(base_dir: Path, files: list[str]) -> list[dict]:
|
||||
content : str (file text, or error string)
|
||||
error : bool
|
||||
mtime : float (last modification time, for skip-if-unchanged optimization)
|
||||
tier : int | None (optional tier for context management)
|
||||
"""
|
||||
items = []
|
||||
for entry in files:
|
||||
for entry_raw in files:
|
||||
if isinstance(entry_raw, dict):
|
||||
entry = entry_raw.get("path")
|
||||
tier = entry_raw.get("tier")
|
||||
else:
|
||||
entry = entry_raw
|
||||
tier = None
|
||||
|
||||
paths = resolve_paths(base_dir, entry)
|
||||
if not paths:
|
||||
items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0})
|
||||
items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0, "tier": tier})
|
||||
continue
|
||||
for path in paths:
|
||||
try:
|
||||
@@ -130,10 +144,10 @@ def build_file_items(base_dir: Path, files: list[str]) -> list[dict]:
|
||||
content = f"ERROR: {e}"
|
||||
mtime = 0.0
|
||||
error = True
|
||||
items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime})
|
||||
items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime, "tier": tier})
|
||||
return items
|
||||
|
||||
def build_summary_section(base_dir: Path, files: list[str]) -> str:
|
||||
def build_summary_section(base_dir: Path, files: list[str | dict]) -> str:
|
||||
"""
|
||||
Build a compact summary section using summarize.py — one short block per file.
|
||||
Used as the initial <context> block instead of full file contents.
|
||||
@@ -187,7 +201,99 @@ def build_discussion_text(history: list[str]) -> str:
|
||||
return "## Discussion History\n\n" + build_discussion_section(history)
|
||||
|
||||
|
||||
def build_markdown(base_dir: Path, files: list[str], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str:
|
||||
def build_tier1_context(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str]) -> str:
|
||||
"""
|
||||
Tier 1 Context: Strategic/Orchestration.
|
||||
Full content for core conductor files and files with tier=1, summaries for others.
|
||||
"""
|
||||
core_files = {"product.md", "tech-stack.md", "workflow.md", "tracks.md"}
|
||||
|
||||
parts = []
|
||||
|
||||
# Files section
|
||||
if file_items:
|
||||
sections = []
|
||||
for item in file_items:
|
||||
path = item.get("path")
|
||||
name = path.name if path else ""
|
||||
|
||||
if name in core_files or item.get("tier") == 1:
|
||||
# Include in full
|
||||
sections.append("### `" + (item.get("entry") or str(path)) + "`\n\n" +
|
||||
f"```{path.suffix.lstrip('.') if path.suffix else 'text'}\n{item.get('content', '')}\n```")
|
||||
else:
|
||||
# Summarize
|
||||
sections.append("### `" + (item.get("entry") or str(path)) + "`\n\n" +
|
||||
summarize.summarise_file(path, item.get("content", "")))
|
||||
|
||||
parts.append("## Files (Tier 1 - Mixed)\n\n" + "\n\n---\n\n".join(sections))
|
||||
|
||||
if screenshots:
|
||||
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
|
||||
|
||||
if history:
|
||||
parts.append("## Discussion History\n\n" + build_discussion_section(history))
|
||||
|
||||
return "\n\n---\n\n".join(parts)
|
||||
|
||||
|
||||
def build_tier2_context(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str]) -> str:
|
||||
"""
|
||||
Tier 2 Context: Architectural/Tech Lead.
|
||||
Full content for all files (standard behavior).
|
||||
"""
|
||||
return build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history, summary_only=False)
|
||||
|
||||
|
||||
def build_tier3_context(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str], focus_files: list[str]) -> str:
|
||||
"""
|
||||
Tier 3 Context: Execution/Worker.
|
||||
Full content for focus_files and files with tier=3, summaries/skeletons for others.
|
||||
"""
|
||||
parts = []
|
||||
|
||||
if file_items:
|
||||
sections = []
|
||||
for item in file_items:
|
||||
path = item.get("path")
|
||||
entry = item.get("entry", "")
|
||||
path_str = str(path) if path else ""
|
||||
|
||||
# Check if this file is in focus_files (by name or path)
|
||||
is_focus = False
|
||||
for focus in focus_files:
|
||||
if focus == entry or (path and focus == path.name) or focus in path_str:
|
||||
is_focus = True
|
||||
break
|
||||
|
||||
if is_focus or item.get("tier") == 3:
|
||||
sections.append("### `" + (entry or path_str) + "`\n\n" +
|
||||
f"```{path.suffix.lstrip('.') if path and path.suffix else 'text'}\n{item.get('content', '')}\n```")
|
||||
else:
|
||||
content = item.get("content", "")
|
||||
if path and path.suffix == ".py" and not item.get("error"):
|
||||
try:
|
||||
parser = ASTParser("python")
|
||||
skeleton = parser.get_skeleton(content)
|
||||
sections.append(f"### `{entry or path_str}` (AST Skeleton)\n\n```python\n{skeleton}\n```")
|
||||
except Exception as e:
|
||||
# Fallback to summary if AST parsing fails
|
||||
sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content))
|
||||
else:
|
||||
sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content))
|
||||
|
||||
parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections))
|
||||
|
||||
if screenshots:
|
||||
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
|
||||
|
||||
if history:
|
||||
parts.append("## Discussion History\n\n" + build_discussion_section(history))
|
||||
|
||||
return "\n\n---\n\n".join(parts)
|
||||
|
||||
|
||||
def build_markdown(base_dir: Path, files: list[str | dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str:
|
||||
parts = []
|
||||
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits
|
||||
if files:
|
||||
|
||||
@@ -20,6 +20,8 @@ To serve as an expert-level utility for personal developer use on small projects
|
||||
- **Native DAG Execution Engine:** Employs a Python-based Directed Acyclic Graph (DAG) engine to manage complex task dependencies, supporting automated topological sorting and robust cycle detection.
|
||||
- **Programmable Execution State Machine:** Governing the transition between "Auto-Queue" (autonomous worker spawning) and "Step Mode" (explicit manual approval for each task transition).
|
||||
- **Role-Scoped Documentation:** Automated mapping of foundational documents to specific tiers to prevent token bloat and maintain high-signal context.
|
||||
- **Tiered Context Scoping:** Employs optimized context subsets for each tier. Tiers 1 & 2 receive strategic documents and full history, while Tier 3/4 workers receive task-specific "Focus Files" and automated AST dependency skeletons.
|
||||
- **Worker Spawn Interceptor:** A mandatory security gate that intercepts every sub-agent launch. Provides a GUI modal allowing the user to review, modify, or reject the worker's prompt and file context before it is sent to the API.
|
||||
- **Strict Memory Siloing:** Employs tree-sitter AST-based interface extraction (Skeleton View, Curated View) and "Context Amnesia" to provide workers only with the absolute minimum context required, preventing hallucination loops.
|
||||
- **Explicit Execution Control:** All AI-generated PowerShell scripts require explicit human confirmation via interactive UI dialogs before execution, supported by a global "Linear Execution Clutch" for deterministic debugging.
|
||||
- **Asynchronous Event-Driven Architecture:** Uses an `AsyncEventQueue` to link GUI actions to the backend engine, ensuring the interface remains fully responsive during multi-model generation and parallel worker execution.
|
||||
|
||||
@@ -20,9 +20,10 @@ This file tracks all major tracks for the project. Each track has its own detail
|
||||
|
||||
---
|
||||
|
||||
- [~] **Track: Tiered Context Scoping & HITL Approval**
|
||||
- [x] **Track: Tiered Context Scoping & HITL Approval**
|
||||
*Link: [./tracks/tiered_context_scoping_hitl_approval/](./tracks/tiered_context_scoping_hitl_approval/)*
|
||||
|
||||
|
||||
---
|
||||
|
||||
- [ ] **Track: MMA Dashboard Visualization Overhaul**
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
# Implementation Plan: Tiered Context Scoping & HITL Approval
|
||||
|
||||
## Phase 1: Context Subsetting
|
||||
- [~] Task: Refactor `aggregate.py` to support targeted context builds (e.g., `build_tier1_context`, `build_tier3_context`).
|
||||
- [ ] Task: Integrate AST skeleton extraction into the standard Tier 3 context build.
|
||||
- [ ] Task: Update the project state to track which files are assigned to which tier.
|
||||
## Phase 1: Context Subsetting [checkpoint: d7a24d6]
|
||||
- [x] Task: Refactor `aggregate.py` to support targeted context builds (e.g., `build_tier1_context`, `build_tier3_context`). 6aa642b
|
||||
- [x] Task: Integrate AST skeleton extraction into the standard Tier 3 context build. 0ed01aa
|
||||
- [x] Task: Update the project state to track which files are assigned to which tier. 2ece9e1
|
||||
|
||||
## Phase 2: The Spawn Interceptor
|
||||
- [ ] Task: Create a signaling mechanism in `multi_agent_conductor.py` to emit a "Worker Spawn Requested" event.
|
||||
- [ ] Task: Implement the interception logic that pauses the async dispatcher until a signal is received from the GUI.
|
||||
## Phase 2: The Spawn Interceptor [checkpoint: 4c53ca1]
|
||||
- [x] Task: Create a signaling mechanism in `multi_agent_conductor.py` to emit a "Worker Spawn Requested" event. e293c5e
|
||||
- [x] Task: Implement the interception logic that pauses the async dispatcher until a signal is received from the GUI. 4c53ca1
|
||||
|
||||
## Phase 3: Approval UX Modal
|
||||
- [ ] Task: Design the "Approve Worker Spawn" modal in DearPyGui.
|
||||
- [ ] Task: Populate the modal with the target role, the exact prompt, and a read-only view of the specific file context.
|
||||
- [ ] Task: Wire the "Approve", "Modify", and "Reject" buttons to resume or cancel the intercepted spawn.
|
||||
## Phase 3: Approval UX Modal [checkpoint: 21157f9]
|
||||
- [x] Task: Design the "Approve Worker Spawn" modal in DearPyGui. 21157f9
|
||||
- [x] Task: Populate the modal with the target role, the exact prompt, and a read-only view of the specific file context. 21157f9
|
||||
- [x] Task: Wire the "Approve", "Modify", and "Reject" buttons to resume or cancel the intercepted spawn. 21157f9
|
||||
111
gui_2.py
111
gui_2.py
@@ -131,6 +131,29 @@ class MMAApprovalDialog:
|
||||
return self._approved, self._payload
|
||||
|
||||
|
||||
class MMASpawnApprovalDialog:
|
||||
def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str):
|
||||
self._ticket_id = ticket_id
|
||||
self._role = role
|
||||
self._prompt = prompt
|
||||
self._context_md = context_md
|
||||
self._condition = threading.Condition()
|
||||
self._done = False
|
||||
self._approved = False
|
||||
self._abort = False
|
||||
|
||||
def wait(self) -> dict:
|
||||
with self._condition:
|
||||
while not self._done:
|
||||
self._condition.wait(timeout=0.1)
|
||||
return {
|
||||
'approved': self._approved,
|
||||
'abort': self._abort,
|
||||
'prompt': self._prompt,
|
||||
'context_md': self._context_md
|
||||
}
|
||||
|
||||
|
||||
class App:
|
||||
"""The main ImGui interface orchestrator for Manual Slop."""
|
||||
|
||||
@@ -246,6 +269,13 @@ class App:
|
||||
self._mma_approval_edit_mode = False
|
||||
self._mma_approval_payload = ""
|
||||
|
||||
# MMA Spawn approval state
|
||||
self._pending_mma_spawn = None
|
||||
self._mma_spawn_open = False
|
||||
self._mma_spawn_edit_mode = False
|
||||
self._mma_spawn_prompt = ''
|
||||
self._mma_spawn_context = ''
|
||||
|
||||
# Orchestration State
|
||||
self.ui_epic_input = ""
|
||||
self.proposed_tracks: list[dict] = []
|
||||
@@ -989,6 +1019,21 @@ class App:
|
||||
if "dialog_container" in task:
|
||||
task["dialog_container"][0] = dlg
|
||||
|
||||
elif action == "mma_spawn_approval":
|
||||
dlg = MMASpawnApprovalDialog(
|
||||
task.get("ticket_id"),
|
||||
task.get("role"),
|
||||
task.get("prompt"),
|
||||
task.get("context_md")
|
||||
)
|
||||
self._pending_mma_spawn = task
|
||||
self._mma_spawn_prompt = task.get("prompt", "")
|
||||
self._mma_spawn_context = task.get("context_md", "")
|
||||
self._mma_spawn_open = True
|
||||
self._mma_spawn_edit_mode = False
|
||||
if "dialog_container" in task:
|
||||
task["dialog_container"][0] = dlg
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error executing GUI task: {e}")
|
||||
|
||||
@@ -1020,7 +1065,7 @@ class App:
|
||||
else:
|
||||
print("[DEBUG] No pending dialog to reject")
|
||||
|
||||
def _handle_mma_respond(self, approved: bool, payload: str = None):
|
||||
def _handle_mma_respond(self, approved: bool, payload: str = None, abort: bool = False, prompt: str = None, context_md: str = None):
|
||||
if self._pending_mma_approval:
|
||||
dlg = self._pending_mma_approval.get("dialog_container", [None])[0]
|
||||
if dlg:
|
||||
@@ -1032,6 +1077,20 @@ class App:
|
||||
dlg._condition.notify_all()
|
||||
self._pending_mma_approval = None
|
||||
|
||||
if self._pending_mma_spawn:
|
||||
dlg = self._pending_mma_spawn.get("dialog_container", [None])[0]
|
||||
if dlg:
|
||||
with dlg._condition:
|
||||
dlg._approved = approved
|
||||
dlg._abort = abort
|
||||
if prompt is not None:
|
||||
dlg._prompt = prompt
|
||||
if context_md is not None:
|
||||
dlg._context_md = context_md
|
||||
dlg._done = True
|
||||
dlg._condition.notify_all()
|
||||
self._pending_mma_spawn = None
|
||||
|
||||
def _handle_approve_ask(self):
|
||||
"""Responds with approval for a pending /api/ask request."""
|
||||
if not self._ask_request_id: return
|
||||
@@ -1821,6 +1880,56 @@ class App:
|
||||
imgui.close_current_popup()
|
||||
imgui.end_popup()
|
||||
|
||||
# MMA Spawn Approval Modal
|
||||
if self._pending_mma_spawn:
|
||||
if not self._mma_spawn_open:
|
||||
imgui.open_popup("MMA Spawn Approval")
|
||||
self._mma_spawn_open = True
|
||||
self._mma_spawn_edit_mode = False
|
||||
self._mma_spawn_prompt = self._pending_mma_spawn.get("prompt", "")
|
||||
self._mma_spawn_context = self._pending_mma_spawn.get("context_md", "")
|
||||
else:
|
||||
self._mma_spawn_open = False
|
||||
|
||||
if imgui.begin_popup_modal("MMA Spawn Approval", None, imgui.WindowFlags_.always_auto_resize)[0]:
|
||||
if not self._pending_mma_spawn:
|
||||
imgui.close_current_popup()
|
||||
else:
|
||||
role = self._pending_mma_spawn.get("role", "??")
|
||||
ticket_id = self._pending_mma_spawn.get("ticket_id", "??")
|
||||
imgui.text(f"Spawning {role} for Ticket {ticket_id}")
|
||||
imgui.separator()
|
||||
|
||||
if self._mma_spawn_edit_mode:
|
||||
imgui.text("Edit Prompt:")
|
||||
_, self._mma_spawn_prompt = imgui.input_text_multiline("##spawn_prompt", self._mma_spawn_prompt, imgui.ImVec2(800, 200))
|
||||
imgui.text("Edit Context MD:")
|
||||
_, self._mma_spawn_context = imgui.input_text_multiline("##spawn_context", self._mma_spawn_context, imgui.ImVec2(800, 300))
|
||||
else:
|
||||
imgui.text("Proposed Prompt:")
|
||||
imgui.begin_child("spawn_prompt_preview", imgui.ImVec2(800, 150), True)
|
||||
imgui.text_unformatted(self._mma_spawn_prompt)
|
||||
imgui.end_child()
|
||||
imgui.text("Proposed Context MD:")
|
||||
imgui.begin_child("spawn_context_preview", imgui.ImVec2(800, 250), True)
|
||||
imgui.text_unformatted(self._mma_spawn_context)
|
||||
imgui.end_child()
|
||||
|
||||
imgui.separator()
|
||||
if imgui.button("Approve", imgui.ImVec2(120, 0)):
|
||||
self._handle_mma_respond(approved=True, prompt=self._mma_spawn_prompt, context_md=self._mma_spawn_context)
|
||||
imgui.close_current_popup()
|
||||
|
||||
imgui.same_line()
|
||||
if imgui.button("Edit Mode" if not self._mma_spawn_edit_mode else "Preview Mode", imgui.ImVec2(120, 0)):
|
||||
self._mma_spawn_edit_mode = not self._mma_spawn_edit_mode
|
||||
|
||||
imgui.same_line()
|
||||
if imgui.button("Abort", imgui.ImVec2(120, 0)):
|
||||
self._handle_mma_respond(approved=False, abort=True)
|
||||
imgui.close_current_popup()
|
||||
imgui.end_popup()
|
||||
|
||||
if self.show_script_output:
|
||||
if self._trigger_script_blink:
|
||||
self._trigger_script_blink = False
|
||||
|
||||
@@ -281,6 +281,60 @@ def get_code_outline(path: str) -> str:
|
||||
return f"ERROR generating outline for '{path}': {e}"
|
||||
|
||||
|
||||
def get_definition(path: str, name: str) -> str:
|
||||
"""
|
||||
Returns the source code for a specific class, function, or method definition.
|
||||
path: Path to the code file.
|
||||
name: Name of the definition to retrieve (e.g., 'MyClass', 'my_function', 'MyClass.my_method').
|
||||
"""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err:
|
||||
return err
|
||||
if not p.exists():
|
||||
return f"ERROR: file not found: {path}"
|
||||
if not p.is_file():
|
||||
return f"ERROR: not a file: {path}"
|
||||
|
||||
if p.suffix != ".py":
|
||||
return f"ERROR: get_definition currently only supports .py files (unsupported: {p.suffix})"
|
||||
|
||||
try:
|
||||
import ast
|
||||
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
|
||||
lines = code.splitlines()
|
||||
tree = ast.parse(code)
|
||||
|
||||
# Split name for methods (e.g., "MyClass.my_method")
|
||||
parts = name.split(".")
|
||||
target_class = parts[0] if len(parts) > 1 else None
|
||||
target_name = parts[-1]
|
||||
|
||||
def get_source_from_node(node):
|
||||
# In Python 3.8+, ast.get_source_segment is available
|
||||
# But we can also use lineno and end_lineno
|
||||
if hasattr(node, "lineno") and hasattr(node, "end_lineno"):
|
||||
# lineno is 1-indexed
|
||||
start = node.lineno - 1
|
||||
end = node.end_lineno
|
||||
return "\n".join(lines[start:end])
|
||||
return f"ERROR: Could not extract source for node {node}"
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if target_class:
|
||||
if isinstance(node, ast.ClassDef) and node.name == target_class:
|
||||
for body_node in node.body:
|
||||
if isinstance(body_node, (ast.FunctionDef, ast.AsyncFunctionDef)) and body_node.name == target_name:
|
||||
return get_source_from_node(body_node)
|
||||
else:
|
||||
if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef)) and node.name == target_name:
|
||||
return get_source_from_node(node)
|
||||
|
||||
return f"ERROR: could not find definition '{name}' in {path}"
|
||||
|
||||
except Exception as e:
|
||||
return f"ERROR retrieving definition '{name}' from '{path}': {e}"
|
||||
|
||||
|
||||
def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str:
|
||||
"""
|
||||
Returns the git diff for a file or directory.
|
||||
@@ -436,7 +490,7 @@ def get_ui_performance() -> str:
|
||||
# ------------------------------------------------------------------ tool dispatch
|
||||
|
||||
|
||||
TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary", "get_python_skeleton", "get_code_outline", "get_git_diff", "web_search", "fetch_url", "get_ui_performance"}
|
||||
TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary", "get_python_skeleton", "get_code_outline", "get_definition", "get_git_diff", "web_search", "fetch_url", "get_ui_performance"}
|
||||
|
||||
|
||||
def dispatch(tool_name: str, tool_input: dict) -> str:
|
||||
@@ -455,6 +509,8 @@ def dispatch(tool_name: str, tool_input: dict) -> str:
|
||||
return get_python_skeleton(tool_input.get("path", ""))
|
||||
if tool_name == "get_code_outline":
|
||||
return get_code_outline(tool_input.get("path", ""))
|
||||
if tool_name == "get_definition":
|
||||
return get_definition(tool_input.get("path", ""), tool_input.get("name", ""))
|
||||
if tool_name == "get_git_diff":
|
||||
return get_git_diff(
|
||||
tool_input.get("path", ""),
|
||||
@@ -586,6 +642,27 @@ MCP_TOOL_SPECS = [
|
||||
"required": ["path"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "get_definition",
|
||||
"description": (
|
||||
"Get the full source code of a specific class, function, or method definition. "
|
||||
"This is more efficient than reading the whole file if you know what you're looking for."
|
||||
),
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": "Path to the .py file.",
|
||||
},
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "The name of the class or function to retrieve. Use 'ClassName.method_name' for methods.",
|
||||
}
|
||||
},
|
||||
"required": ["path", "name"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "get_git_diff",
|
||||
"description": (
|
||||
@@ -648,3 +725,4 @@ MCP_TOOL_SPECS = [
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import ai_client
|
||||
import json
|
||||
import asyncio
|
||||
from typing import List, Optional
|
||||
from typing import List, Optional, Tuple
|
||||
from dataclasses import asdict
|
||||
import events
|
||||
from models import Ticket, Track, WorkerContext
|
||||
from file_cache import ASTParser
|
||||
from pathlib import Path
|
||||
|
||||
from dag_engine import TrackDAG, ExecutionEngine
|
||||
|
||||
@@ -181,6 +182,62 @@ def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_
|
||||
|
||||
return False
|
||||
|
||||
def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> Tuple[bool, str, str]:
|
||||
"""
|
||||
Pushes a spawn approval request to the GUI and waits for response.
|
||||
Returns (approved, modified_prompt, modified_context)
|
||||
"""
|
||||
import threading
|
||||
import time
|
||||
import asyncio
|
||||
|
||||
dialog_container = [None]
|
||||
|
||||
task = {
|
||||
"action": "mma_spawn_approval",
|
||||
"ticket_id": ticket_id,
|
||||
"role": role,
|
||||
"prompt": prompt,
|
||||
"context_md": context_md,
|
||||
"dialog_container": dialog_container
|
||||
}
|
||||
|
||||
# Push to queue
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_running():
|
||||
asyncio.run_coroutine_threadsafe(event_queue.put("mma_spawn_approval", task), loop)
|
||||
else:
|
||||
event_queue._queue.put_nowait(("mma_spawn_approval", task))
|
||||
except Exception:
|
||||
# Fallback if no loop
|
||||
event_queue._queue.put_nowait(("mma_spawn_approval", task))
|
||||
|
||||
# Wait for the GUI to create the dialog and for the user to respond
|
||||
start = time.time()
|
||||
while dialog_container[0] is None and time.time() - start < 60:
|
||||
time.sleep(0.1)
|
||||
|
||||
if dialog_container[0]:
|
||||
res = dialog_container[0].wait()
|
||||
|
||||
if isinstance(res, dict):
|
||||
approved = res.get("approved", False)
|
||||
abort = res.get("abort", False)
|
||||
modified_prompt = res.get("prompt", prompt)
|
||||
modified_context = res.get("context_md", context_md)
|
||||
return approved and not abort, modified_prompt, modified_context
|
||||
else:
|
||||
# Fallback for old tuple style if any
|
||||
approved, final_payload = res
|
||||
modified_prompt = prompt
|
||||
modified_context = context_md
|
||||
if isinstance(final_payload, dict):
|
||||
modified_prompt = final_payload.get("prompt", prompt)
|
||||
modified_context = final_payload.get("context_md", context_md)
|
||||
return approved, modified_prompt, modified_context
|
||||
|
||||
return False, prompt, context_md
|
||||
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None, event_queue: events.AsyncEventQueue = None, engine: Optional['ConductorEngine'] = None, md_content: str = ""):
|
||||
"""
|
||||
Simulates the lifecycle of a single agent working on a ticket.
|
||||
@@ -202,10 +259,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
|
||||
for i, file_path in enumerate(context_files):
|
||||
try:
|
||||
abs_path = Path(file_path)
|
||||
if not abs_path.is_absolute() and engine:
|
||||
# Resolve relative to project base if possible
|
||||
# (This is a bit simplified, but helps)
|
||||
pass
|
||||
# (This is a bit simplified, but helps)
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
if i == 0:
|
||||
@@ -229,6 +283,22 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
|
||||
"start your response with 'BLOCKED' and explain why."
|
||||
)
|
||||
|
||||
# HITL Clutch: call confirm_spawn if event_queue is provided
|
||||
if event_queue:
|
||||
approved, modified_prompt, modified_context = confirm_spawn(
|
||||
role="Tier 3 Worker",
|
||||
prompt=user_message,
|
||||
context_md=md_content,
|
||||
event_queue=event_queue,
|
||||
ticket_id=ticket.id
|
||||
)
|
||||
if not approved:
|
||||
ticket.mark_blocked("Spawn rejected by user.")
|
||||
return "BLOCKED: Spawn rejected by user."
|
||||
|
||||
user_message = modified_prompt
|
||||
md_content = modified_context
|
||||
|
||||
# HITL Clutch: pass the queue and ticket_id to confirm_execution
|
||||
def clutch_callback(payload: str) -> bool:
|
||||
if not event_queue:
|
||||
@@ -246,9 +316,6 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
|
||||
# Update usage in engine if provided
|
||||
if engine:
|
||||
stats = {} # ai_client.get_token_stats() is not available
|
||||
# ai_client provides aggregate stats, for granular tier tracking
|
||||
# we'd need to diff before/after or have ai_client return usage per call.
|
||||
# For Phase 4, we'll use a simplified diff approach.
|
||||
engine.tier_usage["Tier 3"]["input"] += stats.get("prompt_tokens", 0)
|
||||
engine.tier_usage["Tier 3"]["output"] += stats.get("candidates_tokens", 0)
|
||||
|
||||
|
||||
101
outline_tool.py
101
outline_tool.py
@@ -1,88 +1,55 @@
|
||||
import tree_sitter
|
||||
import tree_sitter_python
|
||||
import ast
|
||||
from pathlib import Path
|
||||
|
||||
class CodeOutliner:
|
||||
def __init__(self):
|
||||
self.language = tree_sitter.Language(tree_sitter_python.language())
|
||||
self.parser = tree_sitter.Parser(self.language)
|
||||
pass
|
||||
|
||||
def outline(self, code: str) -> str:
|
||||
tree = self.parser.parse(bytes(code, "utf8"))
|
||||
lines = code.splitlines()
|
||||
code = code.lstrip(chr(0xFEFF))
|
||||
try:
|
||||
tree = ast.parse(code)
|
||||
except SyntaxError as e:
|
||||
return f"ERROR parsing code: {e}"
|
||||
|
||||
output = []
|
||||
|
||||
def get_docstring(node):
|
||||
# In Python, docstring is usually the first expression statement in a block
|
||||
body = node.child_by_field_name("body")
|
||||
if body and body.type == "block":
|
||||
for child in body.children:
|
||||
if child.type == "comment":
|
||||
continue
|
||||
if child.type == "expression_statement":
|
||||
expr = child.children[0]
|
||||
if expr.type == "string":
|
||||
doc = code[expr.start_byte:expr.end_byte].strip()
|
||||
# Strip quotes
|
||||
if doc.startswith(('"""', "'''")):
|
||||
doc = doc[3:-3]
|
||||
elif doc.startswith(('"', "'")):
|
||||
doc = doc[1:-1]
|
||||
return doc.splitlines()[0] if doc else ""
|
||||
break
|
||||
doc = ast.get_docstring(node)
|
||||
if doc:
|
||||
return doc.splitlines()[0]
|
||||
return None
|
||||
|
||||
def walk(node, indent=0):
|
||||
node_type = node.type
|
||||
name = None
|
||||
if isinstance(node, ast.ClassDef):
|
||||
start_line = node.lineno
|
||||
end_line = getattr(node, "end_lineno", start_line)
|
||||
output.append(f"{' ' * indent}[Class] {node.name} (Lines {start_line}-{end_line})")
|
||||
doc = get_docstring(node)
|
||||
if doc:
|
||||
output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"")
|
||||
for item in node.body:
|
||||
walk(item, indent + 1)
|
||||
|
||||
if node_type == "class_definition":
|
||||
name_node = node.child_by_field_name("name")
|
||||
if name_node:
|
||||
name = code[name_node.start_byte:name_node.end_byte]
|
||||
start_line = node.start_point.row + 1
|
||||
end_line = node.end_point.row + 1
|
||||
output.append(f"{' ' * indent}[Class] {name} (Lines {start_line}-{end_line})")
|
||||
doc = get_docstring(node)
|
||||
if doc:
|
||||
output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"")
|
||||
elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||
start_line = node.lineno
|
||||
end_line = getattr(node, "end_lineno", start_line)
|
||||
prefix = "[Async Func]" if isinstance(node, ast.AsyncFunctionDef) else "[Func]"
|
||||
|
||||
elif node_type in ("function_definition", "async_function_definition"):
|
||||
name_node = node.child_by_field_name("name")
|
||||
if name_node:
|
||||
name = code[name_node.start_byte:name_node.end_byte]
|
||||
start_line = node.start_point.row + 1
|
||||
end_line = node.end_point.row + 1
|
||||
prefix = "[Async Func]" if node_type == "async_function_definition" else "[Func]"
|
||||
# Check if it's a method
|
||||
# We can check the indent or the parent, but in AST walk we know if we are inside a ClassDef
|
||||
# Let's use a simpler heuristic for the outline: if indent > 0, it's likely a method.
|
||||
if indent > 0:
|
||||
prefix = "[Method]"
|
||||
|
||||
# Check if it's a method (parent is a class body)
|
||||
parent = node.parent
|
||||
while parent and parent.type != "class_definition":
|
||||
if parent.type == "module":
|
||||
break
|
||||
parent = parent.parent
|
||||
output.append(f"{' ' * indent}{prefix} {node.name} (Lines {start_line}-{end_line})")
|
||||
doc = get_docstring(node)
|
||||
if doc:
|
||||
output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"")
|
||||
|
||||
if parent and parent.type == "class_definition":
|
||||
prefix = "[Method]"
|
||||
for node in tree.body:
|
||||
walk(node)
|
||||
|
||||
output.append(f"{' ' * indent}{prefix} {name} (Lines {start_line}-{end_line})")
|
||||
doc = get_docstring(node)
|
||||
if doc:
|
||||
output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"")
|
||||
|
||||
for child in node.children:
|
||||
# Don't recurse into function bodies for outlining functions,
|
||||
# but we DO want to recurse into classes to find methods.
|
||||
if node_type == "class_definition":
|
||||
if child.type == "block":
|
||||
walk(child, indent + 1)
|
||||
elif node_type == "module":
|
||||
walk(child, indent)
|
||||
elif node_type == "block":
|
||||
walk(child, indent)
|
||||
|
||||
walk(tree.root_node)
|
||||
return "\n".join(output)
|
||||
|
||||
def get_outline(path: Path, code: str) -> str:
|
||||
|
||||
@@ -82,3 +82,25 @@ def test_cb_plan_epic_launches_thread(app_instance):
|
||||
|
||||
mock_get_history.assert_called_once()
|
||||
mock_gen_tracks.assert_called_once()
|
||||
|
||||
def test_process_pending_gui_tasks_mma_spawn_approval(app_instance):
|
||||
"""Verifies that the 'mma_spawn_approval' action correctly updates the UI state."""
|
||||
task = {
|
||||
"action": "mma_spawn_approval",
|
||||
"ticket_id": "T1",
|
||||
"role": "Tier 3 Worker",
|
||||
"prompt": "Test Prompt",
|
||||
"context_md": "Test Context",
|
||||
"dialog_container": [None]
|
||||
}
|
||||
app_instance._pending_gui_tasks.append(task)
|
||||
|
||||
app_instance._process_pending_gui_tasks()
|
||||
|
||||
assert app_instance._pending_mma_spawn == task
|
||||
assert app_instance._mma_spawn_prompt == "Test Prompt"
|
||||
assert app_instance._mma_spawn_context == "Test Context"
|
||||
assert app_instance._mma_spawn_open is True
|
||||
assert app_instance._mma_spawn_edit_mode is False
|
||||
assert task["dialog_container"][0] is not None
|
||||
assert task["dialog_container"][0]._ticket_id == "T1"
|
||||
|
||||
90
tests/test_spawn_interception.py
Normal file
90
tests/test_spawn_interception.py
Normal file
@@ -0,0 +1,90 @@
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
import multi_agent_conductor
|
||||
from models import Ticket, WorkerContext
|
||||
import events
|
||||
import asyncio
|
||||
import concurrent.futures
|
||||
|
||||
class MockDialog:
|
||||
def __init__(self, approved, final_payload=None):
|
||||
self.approved = approved
|
||||
self.final_payload = final_payload
|
||||
def wait(self):
|
||||
# Match the new return format: a dictionary
|
||||
res = {'approved': self.approved, 'abort': False}
|
||||
if self.final_payload:
|
||||
res.update(self.final_payload)
|
||||
return res
|
||||
|
||||
@pytest.fixture
|
||||
def mock_ai_client():
|
||||
with patch("ai_client.send") as mock_send:
|
||||
mock_send.return_value = "Task completed"
|
||||
yield mock_send
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_confirm_spawn_pushed_to_queue():
|
||||
event_queue = events.AsyncEventQueue()
|
||||
ticket_id = "T1"
|
||||
role = "Tier 3 Worker"
|
||||
prompt = "Original Prompt"
|
||||
context_md = "Original Context"
|
||||
|
||||
# Start confirm_spawn in a thread since it blocks with time.sleep
|
||||
def run_confirm():
|
||||
return multi_agent_conductor.confirm_spawn(role, prompt, context_md, event_queue, ticket_id)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
future = loop.run_in_executor(executor, run_confirm)
|
||||
|
||||
# Wait for the event to appear in the queue
|
||||
event_name, payload = await event_queue.get()
|
||||
assert event_name == "mma_spawn_approval"
|
||||
assert payload["ticket_id"] == ticket_id
|
||||
assert payload["role"] == role
|
||||
assert payload["prompt"] == prompt
|
||||
assert payload["context_md"] == context_md
|
||||
assert "dialog_container" in payload
|
||||
|
||||
# Simulate GUI injecting a dialog
|
||||
payload["dialog_container"][0] = MockDialog(True, {"prompt": "Modified Prompt", "context_md": "Modified Context"})
|
||||
|
||||
approved, final_prompt, final_context = await future
|
||||
assert approved is True
|
||||
assert final_prompt == "Modified Prompt"
|
||||
assert final_context == "Modified Context"
|
||||
|
||||
@patch("multi_agent_conductor.confirm_spawn")
|
||||
def test_run_worker_lifecycle_approved(mock_confirm, mock_ai_client):
|
||||
ticket = Ticket(id="T1", description="desc", status="todo", assigned_to="user")
|
||||
context = WorkerContext(ticket_id="T1", model_name="model", messages=[])
|
||||
event_queue = events.AsyncEventQueue()
|
||||
|
||||
mock_confirm.return_value = (True, "Modified Prompt", "Modified Context")
|
||||
|
||||
multi_agent_conductor.run_worker_lifecycle(ticket, context, event_queue=event_queue)
|
||||
|
||||
mock_confirm.assert_called_once()
|
||||
# Check that ai_client.send was called with modified values
|
||||
args, kwargs = mock_ai_client.call_args
|
||||
assert kwargs["user_message"] == "Modified Prompt"
|
||||
assert kwargs["md_content"] == "Modified Context"
|
||||
assert ticket.status == "completed"
|
||||
|
||||
@patch("multi_agent_conductor.confirm_spawn")
|
||||
def test_run_worker_lifecycle_rejected(mock_confirm, mock_ai_client):
|
||||
ticket = Ticket(id="T1", description="desc", status="todo", assigned_to="user")
|
||||
context = WorkerContext(ticket_id="T1", model_name="model", messages=[])
|
||||
event_queue = events.AsyncEventQueue()
|
||||
|
||||
mock_confirm.return_value = (False, "Original Prompt", "Original Context")
|
||||
|
||||
result = multi_agent_conductor.run_worker_lifecycle(ticket, context, event_queue=event_queue)
|
||||
|
||||
mock_confirm.assert_called_once()
|
||||
mock_ai_client.assert_not_called()
|
||||
assert ticket.status == "blocked"
|
||||
assert "Spawn rejected by user" in ticket.blocked_reason
|
||||
assert "BLOCKED" in result
|
||||
136
tests/test_tiered_context.py
Normal file
136
tests/test_tiered_context.py
Normal file
@@ -0,0 +1,136 @@
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from aggregate import build_tier1_context, build_tier2_context, build_tier3_context
|
||||
|
||||
def test_build_tier1_context_exists():
|
||||
# This should fail if the function is not defined
|
||||
file_items = [
|
||||
{"path": Path("conductor/product.md"), "entry": "conductor/product.md", "content": "Product content", "error": False},
|
||||
{"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False}
|
||||
]
|
||||
history = ["User: hello", "AI: hi"]
|
||||
|
||||
result = build_tier1_context(file_items, Path("."), [], history)
|
||||
|
||||
assert "Product content" in result
|
||||
# other.py should be summarized, not full content in a code block
|
||||
assert "Other content" not in result or "Summarized" in result # Assuming summary format
|
||||
|
||||
def test_build_tier2_context_exists():
|
||||
file_items = [
|
||||
{"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False}
|
||||
]
|
||||
history = ["User: hello"]
|
||||
result = build_tier2_context(file_items, Path("."), [], history)
|
||||
assert "Other content" in result
|
||||
|
||||
def test_build_tier3_context_ast_skeleton(monkeypatch):
|
||||
from unittest.mock import MagicMock
|
||||
import aggregate
|
||||
import file_cache
|
||||
|
||||
# Mock ASTParser
|
||||
mock_parser_instance = MagicMock()
|
||||
mock_parser_instance.get_skeleton.return_value = "def other():\n ..."
|
||||
mock_parser_class = MagicMock(return_value=mock_parser_instance)
|
||||
|
||||
# Mock file_cache.ASTParser in aggregate module
|
||||
monkeypatch.setattr("aggregate.ASTParser", mock_parser_class)
|
||||
|
||||
file_items = [
|
||||
{"path": Path("other.py"), "entry": "other.py", "content": "def other():\n pass", "error": False}
|
||||
]
|
||||
history = []
|
||||
|
||||
# New behavior check: it should use ASTParser for .py files not in focus
|
||||
result = build_tier3_context(file_items, Path("."), [], history, focus_files=[])
|
||||
|
||||
assert "def other():" in result
|
||||
assert "..." in result
|
||||
assert "Python" not in result # summarize.py output should not be there if AST skeleton is used
|
||||
mock_parser_class.assert_called_once_with("python")
|
||||
mock_parser_instance.get_skeleton.assert_called_once_with("def other():\n pass")
|
||||
|
||||
def test_build_tier3_context_exists():
|
||||
file_items = [
|
||||
{"path": Path("focus.py"), "entry": "focus.py", "content": "def focus():\n pass", "error": False},
|
||||
{"path": Path("other.py"), "entry": "other.py", "content": "def other():\n pass", "error": False}
|
||||
]
|
||||
history = ["User: hello"]
|
||||
result = build_tier3_context(file_items, Path("."), [], history, focus_files=["focus.py"])
|
||||
|
||||
assert "def focus():" in result
|
||||
assert "pass" in result
|
||||
# other.py should have skeletonized content, not full "pass" (if get_skeleton works)
|
||||
# However, for a simple "pass", the skeleton might be the same or similar.
|
||||
# Let's check for the header
|
||||
assert "other.py" in result
|
||||
assert "AST Skeleton" in result
|
||||
|
||||
def test_build_file_items_with_tiers(tmp_path):
|
||||
from aggregate import build_file_items
|
||||
|
||||
# Create some dummy files
|
||||
file1 = tmp_path / "file1.txt"
|
||||
file1.write_text("content1")
|
||||
file2 = tmp_path / "file2.txt"
|
||||
file2.write_text("content2")
|
||||
|
||||
files_config = [
|
||||
"file1.txt",
|
||||
{"path": "file2.txt", "tier": 3}
|
||||
]
|
||||
|
||||
items = build_file_items(tmp_path, files_config)
|
||||
|
||||
assert len(items) == 2
|
||||
|
||||
item1 = next(i for i in items if i["entry"] == "file1.txt")
|
||||
assert item1["content"] == "content1"
|
||||
assert "tier" in item1
|
||||
assert item1["tier"] is None
|
||||
|
||||
item2 = next(i for i in items if i["entry"] == "file2.txt")
|
||||
assert item2["content"] == "content2"
|
||||
assert item2["tier"] == 3
|
||||
|
||||
def test_build_files_section_with_dicts(tmp_path):
|
||||
from aggregate import build_files_section
|
||||
|
||||
file1 = tmp_path / "file1.txt"
|
||||
file1.write_text("content1")
|
||||
|
||||
files_config = [
|
||||
{"path": str(file1)}
|
||||
]
|
||||
|
||||
result = build_files_section(tmp_path, files_config)
|
||||
assert "content1" in result
|
||||
assert "file1.txt" in result
|
||||
|
||||
def test_tiered_context_by_tier_field():
|
||||
file_items = [
|
||||
{"path": Path("tier1_file.txt"), "entry": "tier1_file.txt", "content": "Full Tier 1 Content\nLine 2", "tier": 1},
|
||||
{"path": Path("tier3_file.txt"), "entry": "tier3_file.txt", "content": "Full Tier 3 Content\nLine 2\nLine 3\nLine 4\nLine 5\nLine 6\nLine 7\nLine 8\nLine 9\nLine 10", "tier": 3},
|
||||
{"path": Path("other.txt"), "entry": "other.txt", "content": "Other Content\nLine 2\nLine 3\nLine 4\nLine 5\nLine 6\nLine 7\nLine 8\nLine 9\nLine 10", "tier": None}
|
||||
]
|
||||
|
||||
# Test Tier 1 Context
|
||||
result_t1 = build_tier1_context(file_items, Path("."), [], [])
|
||||
assert "Full Tier 1 Content" in result_t1
|
||||
assert "Line 2" in result_t1 # In full
|
||||
|
||||
# tier3_file.txt should be summarized
|
||||
assert "tier3_file.txt" in result_t1
|
||||
assert "preview:" in result_t1
|
||||
assert "Line 9" not in result_t1 # Only first 8 lines in preview
|
||||
|
||||
# Test Tier 3 Context
|
||||
result_t3 = build_tier3_context(file_items, Path("."), [], [], focus_files=[])
|
||||
assert "Full Tier 3 Content" in result_t3
|
||||
assert "Line 10" in result_t3 # In full
|
||||
|
||||
# tier1_file.txt should be summarized
|
||||
assert "tier1_file.txt" in result_t3
|
||||
assert "preview:" in result_t3
|
||||
assert "Full Tier 1 Content" in result_t3 # It's short, so it's in preview
|
||||
Reference in New Issue
Block a user