46 Commits

Author SHA1 Message Date
ed 51918d9bc3 chore: Checkpoint commit of unstaged changes, including new tests and debug scripts 2026-02-26 21:39:03 -05:00
ed 94a1c320a5 docs(mma): Add Phase 7 UX specification and update track plan 2026-02-26 21:37:45 -05:00
ed 8bb72e351d chore(conductor): Mark track 'MMA Core Engine Implementation' as complete and verify with Phase 6 tests 2026-02-26 21:34:28 -05:00
ed 971202e21b docs(conductor): Synchronize docs for track 'MMA Core Engine Implementation' 2026-02-26 20:47:58 -05:00
ed 1294091692 chore(conductor): Mark track 'MMA Core Engine Implementation' as complete 2026-02-26 20:47:04 -05:00
ed d4574dba41 conductor(plan): Mark Phase 5 as complete 2026-02-26 20:46:51 -05:00
ed 3982fda5f5 conductor(checkpoint): Checkpoint end of Phase 5 - Multi-Agent Dispatcher & Parallelization 2026-02-26 20:46:13 -05:00
ed dce1679a1f conductor(plan): Mark task 'UI Component Update' as complete 2026-02-26 20:45:45 -05:00
ed 68861c0744 feat(mma): Decouple UI from API calls using UserRequestEvent and AsyncEventQueue 2026-02-26 20:45:23 -05:00
ed 5206c7c569 conductor(plan): Mark task 'The Dispatcher Loop' as complete 2026-02-26 20:40:45 -05:00
ed 1dacd3613e feat(mma): Implement dynamic ticket parsing and dispatcher loop in ConductorEngine 2026-02-26 20:40:16 -05:00
ed 0acd1ea442 conductor(plan): Mark task 'Tier 1 & 2 System Prompts' as complete 2026-02-26 20:36:33 -05:00
ed a28d71b064 feat(mma): Implement structured system prompts for Tier 1 and Tier 2 2026-02-26 20:36:09 -05:00
ed 6be093cfc1 conductor(plan): Mark task 'The Event Bus' as complete 2026-02-26 20:34:15 -05:00
ed 695cb4a82e feat(mma): Implement AsyncEventQueue in events.py 2026-02-26 20:33:51 -05:00
ed 47d750ea9d conductor(plan): Mark Phase 4 as complete 2026-02-26 20:30:51 -05:00
ed 61d17ade0f conductor(checkpoint): Checkpoint end of Phase 4 - Tier 4 QA Interception 2026-02-26 20:30:29 -05:00
ed a5854b1488 conductor(plan): Mark task 'Payload Formatting' as complete 2026-02-26 20:30:04 -05:00
ed fb3da4de36 feat(mma): Integrate Tier 4 QA analysis across all providers and conductor 2026-02-26 20:29:34 -05:00
ed 80a10f4d12 conductor(plan): Mark task 'Tier 4 Instantiation' as complete 2026-02-26 20:22:29 -05:00
ed 8e4e32690c feat(mma): Implement run_tier4_analysis in ai_client.py 2026-02-26 20:22:04 -05:00
ed bb2f7a16d4 conductor(plan): Mark task 'The Interceptor Loop' as complete 2026-02-26 20:19:59 -05:00
ed bc654c2f57 feat(mma): Implement Tier 4 QA interceptor in shell_runner.py 2026-02-26 20:19:34 -05:00
ed a978562f55 conductor(plan): Mark Phase 3 as complete 2026-02-26 20:15:51 -05:00
ed e6c8d734cc conductor(checkpoint): Checkpoint end of Phase 3 - Linear Orchestrator & Execution Clutch 2026-02-26 20:15:17 -05:00
ed bc0cba4d3c conductor(plan): Mark task 'The HITL Execution Clutch' as complete 2026-02-26 20:14:52 -05:00
ed 1afd9c8c2a feat(mma): Implement HITL execution clutch and step-mode 2026-02-26 20:14:27 -05:00
ed cfd20c027d conductor(plan): Mark task 'Context Injection' as complete 2026-02-26 20:10:39 -05:00
ed 9d6d1746c6 feat(mma): Implement context injection using ASTParser in run_worker_lifecycle 2026-02-26 20:10:15 -05:00
ed 559355ce47 conductor(plan): Mark task 'The Engine Core' as complete 2026-02-26 20:08:15 -05:00
ed 7a301685c3 feat(mma): Implement ConductorEngine and run_worker_lifecycle 2026-02-26 20:07:51 -05:00
ed 4346eda88d conductor(plan): Mark Phase 2 as complete 2026-02-26 20:03:15 -05:00
ed a518a307f3 conductor(checkpoint): Checkpoint end of Phase 2 - State Machine & Data Structures 2026-02-26 20:02:56 -05:00
ed eac01c2975 conductor(plan): Mark task 'State Mutator Methods' as complete 2026-02-26 20:02:33 -05:00
ed e925b219cb feat(mma): Implement state mutator methods for Ticket and Track 2026-02-26 20:02:09 -05:00
ed d198a790c8 conductor(plan): Mark task 'Worker Context Definition' as complete 2026-02-26 20:00:15 -05:00
ed ee719296c4 feat(mma): Implement WorkerContext model 2026-02-26 19:59:51 -05:00
ed ccd286132f conductor(plan): Mark task 'The Dataclasses' as complete 2026-02-26 19:55:27 -05:00
ed f9b5a504e5 feat(mma): Implement Ticket and Track models 2026-02-26 19:55:03 -05:00
ed 0b2c0dd8d7 conductor(plan): Mark Phase 1 as complete 2026-02-26 19:53:03 -05:00
ed ac31e4112f conductor(checkpoint): Checkpoint end of Phase 1 - Memory Foundations 2026-02-26 19:48:59 -05:00
ed 449335df04 conductor(plan): Mark AST view extraction tasks as complete 2026-02-26 19:48:20 -05:00
ed b73a83e612 conductor(plan): Mark task 'Core Parser Class' as complete 2026-02-26 19:47:56 -05:00
ed 7a609cae69 feat(mma): Implement ASTParser in file_cache.py and refactor mcp_client.py 2026-02-26 19:47:33 -05:00
ed 4849ee2b8c conductor(plan): Mark task 'Dependency Setup' as complete 2026-02-26 19:29:46 -05:00
ed 8fb75cc7e2 feat(deps): Update requirements.txt with tree-sitter dependencies 2026-02-26 19:29:22 -05:00
49 changed files with 2567 additions and 205 deletions
BIN
View File
Binary file not shown.
+36
View File
@@ -0,0 +1,36 @@
# MMA Observability & UX Specification
## 1. Goal
Implement the visible surface area of the 4-Tier Hierarchical Multi-Model Architecture within `gui_2.py`. This ensures the user can monitor, control, and debug the multi-agent execution flow.
## 2. Core Components
### 2.1 MMA Dashboard Panel
- **Visibility:** A new dockable panel named "MMA Dashboard".
- **Track Status:** Display the current active `Track` ID and overall progress (e.g., "3/10 Tickets Complete").
- **Ticket DAG Visualization:** A list or simple graph representing the `Ticket` queue.
- Each ticket shows: `ID`, `Target`, `Status` (Pending, Running, Paused, Complete, Blocked).
- Visual indicators for dependencies (e.g., indented or linked).
### 2.2 The Execution Clutch (HITL)
- **Step Mode Toggle:** A global or per-track checkbox to enable "Step Mode".
- **Pause Points:**
- **Pre-Execution:** When a Tier 3 worker generates a tool call (e.g., `write_file`), the engine pauses.
- **UI Interaction:** The GUI displays the proposed script/change and provides:
- `[Approve]`: Proceed with execution.
- `[Edit Payload]`: Open the Memory Mutator.
- `[Abort]`: Mark the ticket as Blocked/Cancelled.
- **Visual Feedback:** Tactile/Arcade-style blinking or color changes when the engine is "Paused for HITL".
### 2.3 Memory Mutator (The "Debug" Superpower)
- **Functionality:** A modal or dedicated text area that allows the user to edit the raw JSON conversation history of a paused worker.
- **Use Case:** Fixing AI hallucinations or providing specific guidance mid-turn without restarting the context window.
- **Integration:** After editing, the "Approve" button sends the *modified* history back to the engine.
### 2.4 Tiered Metrics & Logs
- **Observability:** Show which model (Tier 1, 2, 3, or 4) is currently active.
- **Sub-Agent Logs:** Provide quick links to open the timestamped log files generated by `mma_exec.py`.
## 3. Technical Integration
- **Event Bus:** Use the existing `AsyncEventQueue` to push `StateUpdateEvents` from the `ConductorEngine` to the GUI.
- **Non-Blocking:** Ensure the UI remains responsive (FPS > 60) even when multiple tickets are processing or the engine is waiting for user input.
+117 -18
View File
@@ -21,6 +21,7 @@ import difflib
import threading import threading
import requests import requests
from pathlib import Path from pathlib import Path
from typing import Optional, Callable
import os import os
import project_manager import project_manager
import file_cache import file_cache
@@ -522,10 +523,10 @@ def _gemini_tool_declaration():
return types.Tool(function_declarations=declarations) if declarations else None return types.Tool(function_declarations=declarations) if declarations else None
def _run_script(script: str, base_dir: str) -> str: def _run_script(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str:
if confirm_and_run_callback is None: if confirm_and_run_callback is None:
return "ERROR: no confirmation handler registered" return "ERROR: no confirmation handler registered"
result = confirm_and_run_callback(script, base_dir) result = confirm_and_run_callback(script, base_dir, qa_callback)
if result is None: if result is None:
output = "USER REJECTED: command was not executed" output = "USER REJECTED: command was not executed"
else: else:
@@ -668,7 +669,9 @@ def _get_gemini_history_list(chat):
def _send_gemini(md_content: str, user_message: str, base_dir: str, def _send_gemini(md_content: str, user_message: str, base_dir: str,
file_items: list[dict] | None = None, file_items: list[dict] | None = None,
discussion_history: str = "") -> str: discussion_history: str = "",
pre_tool_callback = None,
qa_callback: Optional[Callable[[str], str]] = None) -> str:
global _gemini_chat, _gemini_cache, _gemini_cache_md_hash, _gemini_cache_created_at global _gemini_chat, _gemini_cache, _gemini_cache_md_hash, _gemini_cache_created_at
try: try:
@@ -676,7 +679,8 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
# Only stable content (files + screenshots) goes in the cached system instruction. # Only stable content (files + screenshots) goes in the cached system instruction.
# Discussion history is sent as conversation messages so the cache isn't invalidated every turn. # Discussion history is sent as conversation messages so the cache isn't invalidated every turn.
sys_instr = f"{_get_combined_system_prompt()}\n\n<context>\n{md_content}\n</context>" sys_instr = f"{_get_combined_system_prompt()}\n\n<context>\n{md_content}\n</context>"
tools_decl = [_gemini_tool_declaration()] td = _gemini_tool_declaration()
tools_decl = [td] if td else None
# DYNAMIC CONTEXT: Check if files/context changed mid-session # DYNAMIC CONTEXT: Check if files/context changed mid-session
current_md_hash = hashlib.md5(md_content.encode()).hexdigest() current_md_hash = hashlib.md5(md_content.encode()).hexdigest()
@@ -747,7 +751,7 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
except Exception as e: except Exception as e:
_gemini_cache = None _gemini_cache = None
_gemini_cache_created_at = None _gemini_cache_created_at = None
_append_comms("OUT", "request", {"message": f"[CACHE FAILED] {type(e).__name__}: {e} falling back to inline system_instruction"}) _append_comms("OUT", "request", {"message": f"[CACHE FAILED] {type(e).__name__}: {e} \u2014 falling back to inline system_instruction"})
kwargs = {"model": _model, "config": chat_config} kwargs = {"model": _model, "config": chat_config}
if old_history: if old_history:
@@ -767,7 +771,7 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
_cumulative_tool_bytes = 0 _cumulative_tool_bytes = 0
# Strip stale file refreshes and truncate old tool outputs ONCE before # Strip stale file refreshes and truncate old tool outputs ONCE before
# entering the tool loop (not per-round history entries don't change). # entering the tool loop (not per-round \u2014 history entries don't change).
if _gemini_chat and _get_gemini_history_list(_gemini_chat): if _gemini_chat and _get_gemini_history_list(_gemini_chat):
for msg in _get_gemini_history_list(_gemini_chat): for msg in _get_gemini_history_list(_gemini_chat):
if msg.role == "user" and hasattr(msg, "parts"): if msg.role == "user" and hasattr(msg, "parts"):
@@ -830,6 +834,16 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
f_resps, log = [], [] f_resps, log = [], []
for i, fc in enumerate(calls): for i, fc in enumerate(calls):
name, args = fc.name, dict(fc.args) name, args = fc.name, dict(fc.args)
# Check for tool confirmation if callback is provided
if pre_tool_callback:
payload_str = json.dumps({"tool": name, "args": args})
if not pre_tool_callback(payload_str):
out = "USER REJECTED: tool execution cancelled"
f_resps.append(types.Part.from_function_response(name=name, response={"output": out}))
log.append({"tool_use_id": name, "content": out})
continue
events.emit("tool_execution", payload={"status": "started", "tool": name, "args": args, "round": r_idx}) events.emit("tool_execution", payload={"status": "started", "tool": name, "args": args, "round": r_idx})
if name in mcp_client.TOOL_NAMES: if name in mcp_client.TOOL_NAMES:
_append_comms("OUT", "tool_call", {"name": name, "args": args}) _append_comms("OUT", "tool_call", {"name": name, "args": args})
@@ -837,7 +851,7 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
elif name == TOOL_NAME: elif name == TOOL_NAME:
scr = args.get("script", "") scr = args.get("script", "")
_append_comms("OUT", "tool_call", {"name": TOOL_NAME, "script": scr}) _append_comms("OUT", "tool_call", {"name": TOOL_NAME, "script": scr})
out = _run_script(scr, base_dir) out = _run_script(scr, base_dir, qa_callback)
else: out = f"ERROR: unknown tool '{name}'" else: out = f"ERROR: unknown tool '{name}'"
if i == len(calls) - 1: if i == len(calls) - 1:
@@ -868,7 +882,9 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
def _send_gemini_cli(md_content: str, user_message: str, base_dir: str, def _send_gemini_cli(md_content: str, user_message: str, base_dir: str,
file_items: list[dict] | None = None, file_items: list[dict] | None = None,
discussion_history: str = "") -> str: discussion_history: str = "",
pre_tool_callback = None,
qa_callback: Optional[Callable[[str], str]] = None) -> str:
global _gemini_cli_adapter global _gemini_cli_adapter
try: try:
if _gemini_cli_adapter is None: if _gemini_cli_adapter is None:
@@ -951,6 +967,20 @@ def _send_gemini_cli(md_content: str, user_message: str, base_dir: str,
args = fc.get("args", {}) args = fc.get("args", {})
call_id = fc.get("id") call_id = fc.get("id")
# Check for tool confirmation if callback is provided
if pre_tool_callback:
payload_str = json.dumps({"tool": name, "args": args})
if not pre_tool_callback(payload_str):
out = "USER REJECTED: tool execution cancelled"
tool_results_for_cli.append({
"role": "tool",
"tool_call_id": call_id,
"name": name,
"content": out
})
_append_comms("IN", "tool_result", {"name": name, "id": call_id, "output": out})
continue
events.emit("tool_execution", payload={"status": "started", "tool": name, "args": args, "round": r_idx}) events.emit("tool_execution", payload={"status": "started", "tool": name, "args": args, "round": r_idx})
if name in mcp_client.TOOL_NAMES: if name in mcp_client.TOOL_NAMES:
_append_comms("OUT", "tool_call", {"name": name, "id": call_id, "args": args}) _append_comms("OUT", "tool_call", {"name": name, "id": call_id, "args": args})
@@ -958,7 +988,7 @@ def _send_gemini_cli(md_content: str, user_message: str, base_dir: str,
elif name == TOOL_NAME: elif name == TOOL_NAME:
scr = args.get("script", "") scr = args.get("script", "")
_append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": call_id, "script": scr}) _append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": call_id, "script": scr})
out = _run_script(scr, base_dir) out = _run_script(scr, base_dir, qa_callback)
else: else:
out = f"ERROR: unknown tool '{name}'" out = f"ERROR: unknown tool '{name}'"
@@ -1251,13 +1281,13 @@ def _repair_anthropic_history(history: list[dict]):
}) })
def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_items: list[dict] | None = None, discussion_history: str = "") -> str: def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_items: list[dict] | None = None, discussion_history: str = "", pre_tool_callback = None, qa_callback: Optional[Callable[[str], str]] = None) -> str:
try: try:
_ensure_anthropic_client() _ensure_anthropic_client()
mcp_client.configure(file_items or [], [base_dir]) mcp_client.configure(file_items or [], [base_dir])
# Split system into two cache breakpoints: # Split system into two cache breakpoints:
# 1. Stable system prompt (never changes always a cache hit) # 1. Stable system prompt (never changes \u2014 always a cache hit)
# 2. Dynamic file context (invalidated only when files change) # 2. Dynamic file context (invalidated only when files change)
stable_prompt = _get_combined_system_prompt() stable_prompt = _get_combined_system_prompt()
stable_blocks = [{"type": "text", "text": stable_prompt, "cache_control": {"type": "ephemeral"}}] stable_blocks = [{"type": "text", "text": stable_prompt, "cache_control": {"type": "ephemeral"}}]
@@ -1382,6 +1412,19 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
b_name = getattr(block, "name", None) b_name = getattr(block, "name", None)
b_id = getattr(block, "id", "") b_id = getattr(block, "id", "")
b_input = getattr(block, "input", {}) b_input = getattr(block, "input", {})
# Check for tool confirmation if callback is provided
if pre_tool_callback:
payload_str = json.dumps({"tool": b_name, "args": b_input})
if not pre_tool_callback(payload_str):
output = "USER REJECTED: tool execution cancelled"
tool_results.append({
"type": "tool_result",
"tool_use_id": b_id,
"content": output,
})
continue
events.emit("tool_execution", payload={"status": "started", "tool": b_name, "args": b_input, "round": round_idx}) events.emit("tool_execution", payload={"status": "started", "tool": b_name, "args": b_input, "round": round_idx})
if b_name in mcp_client.TOOL_NAMES: if b_name in mcp_client.TOOL_NAMES:
_append_comms("OUT", "tool_call", {"name": b_name, "id": b_id, "args": b_input}) _append_comms("OUT", "tool_call", {"name": b_name, "id": b_id, "args": b_input})
@@ -1402,7 +1445,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
"id": b_id, "id": b_id,
"script": script, "script": script,
}) })
output = _run_script(script, base_dir) output = _run_script(script, base_dir, qa_callback)
_append_comms("IN", "tool_result", { _append_comms("IN", "tool_result", {
"name": TOOL_NAME, "name": TOOL_NAME,
"id": b_id, "id": b_id,
@@ -1480,7 +1523,9 @@ def _ensure_deepseek_client():
def _send_deepseek(md_content: str, user_message: str, base_dir: str, def _send_deepseek(md_content: str, user_message: str, base_dir: str,
file_items: list[dict] | None = None, file_items: list[dict] | None = None,
discussion_history: str = "", discussion_history: str = "",
stream: bool = False) -> str: stream: bool = False,
pre_tool_callback = None,
qa_callback: Optional[Callable[[str], str]] = None) -> str:
""" """
Sends a message to the DeepSeek API, handling tool calls and history. Sends a message to the DeepSeek API, handling tool calls and history.
Supports streaming responses. Supports streaming responses.
@@ -1652,6 +1697,19 @@ def _send_deepseek(md_content: str, user_message: str, base_dir: str,
except: except:
tool_args = {} tool_args = {}
# Check for tool confirmation if callback is provided
if pre_tool_callback:
payload_str = json.dumps({"tool": tool_name, "args": tool_args})
if not pre_tool_callback(payload_str):
tool_output = "USER REJECTED: tool execution cancelled"
tool_results_for_history.append({
"role": "tool",
"tool_call_id": tool_id,
"content": tool_output,
})
_append_comms("IN", "tool_result", {"name": tool_name, "id": tool_id, "output": tool_output})
continue
events.emit("tool_execution", payload={"status": "started", "tool": tool_name, "args": tool_args, "round": round_idx}) events.emit("tool_execution", payload={"status": "started", "tool": tool_name, "args": tool_args, "round": round_idx})
if tool_name in mcp_client.TOOL_NAMES: if tool_name in mcp_client.TOOL_NAMES:
@@ -1660,7 +1718,7 @@ def _send_deepseek(md_content: str, user_message: str, base_dir: str,
elif tool_name == TOOL_NAME: elif tool_name == TOOL_NAME:
script = tool_args.get("script", "") script = tool_args.get("script", "")
_append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": tool_id, "script": script}) _append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": tool_id, "script": script})
tool_output = _run_script(script, base_dir) tool_output = _run_script(script, base_dir, qa_callback)
else: else:
tool_output = f"ERROR: unknown tool '{tool_name}'" tool_output = f"ERROR: unknown tool '{tool_name}'"
@@ -1711,6 +1769,43 @@ def _send_deepseek(md_content: str, user_message: str, base_dir: str,
raise _classify_deepseek_error(e) from e raise _classify_deepseek_error(e) from e
def run_tier4_analysis(stderr: str) -> str:
"""
Stateless Tier 4 QA analysis of an error message.
Uses gemini-2.5-flash-lite to summarize the error and suggest a fix.
"""
if not stderr or not stderr.strip():
return ""
try:
_ensure_gemini_client()
prompt = (
f"You are a Tier 4 QA Agent specializing in error analysis.\n"
f"Analyze the following stderr output from a PowerShell command:\n\n"
f"```\n{stderr}\n```\n\n"
f"Provide a concise summary of the failure and suggest a fix in approximately 20 words."
)
# Use flash-lite for cost-effective stateless analysis
model_name = "gemini-2.5-flash-lite"
# We don't use the chat session here to keep it stateless
resp = _gemini_client.models.generate_content(
model=model_name,
contents=prompt,
config=types.GenerateContentConfig(
temperature=0.0,
max_output_tokens=150,
)
)
analysis = resp.text.strip()
return analysis
except Exception as e:
# We don't want to crash the main loop if QA analysis fails
return f"[QA ANALYSIS FAILED] {e}"
# ------------------------------------------------------------------ unified send # ------------------------------------------------------------------ unified send
def send( def send(
@@ -1720,6 +1815,8 @@ def send(
file_items: list[dict] | None = None, file_items: list[dict] | None = None,
discussion_history: str = "", discussion_history: str = "",
stream: bool = False, stream: bool = False,
pre_tool_callback = None,
qa_callback: Optional[Callable[[str], str]] = None,
) -> str: ) -> str:
""" """
Send a message to the active provider. Send a message to the active provider.
@@ -1733,16 +1830,18 @@ def send(
discussion_history : discussion history text (used by Gemini to inject as discussion_history : discussion history text (used by Gemini to inject as
conversation message instead of caching it) conversation message instead of caching it)
stream : Whether to use streaming (supported by DeepSeek) stream : Whether to use streaming (supported by DeepSeek)
pre_tool_callback : Optional callback (payload: str) -> bool called before tool execution
qa_callback : Optional callback (stderr: str) -> str called for Tier 4 error analysis
""" """
with _send_lock: with _send_lock:
if _provider == "gemini": if _provider == "gemini":
return _send_gemini(md_content, user_message, base_dir, file_items, discussion_history) return _send_gemini(md_content, user_message, base_dir, file_items, discussion_history, pre_tool_callback, qa_callback)
elif _provider == "gemini_cli": elif _provider == "gemini_cli":
return _send_gemini_cli(md_content, user_message, base_dir, file_items, discussion_history) return _send_gemini_cli(md_content, user_message, base_dir, file_items, discussion_history, pre_tool_callback, qa_callback)
elif _provider == "anthropic": elif _provider == "anthropic":
return _send_anthropic(md_content, user_message, base_dir, file_items, discussion_history) return _send_anthropic(md_content, user_message, base_dir, file_items, discussion_history, pre_tool_callback, qa_callback)
elif _provider == "deepseek": elif _provider == "deepseek":
return _send_deepseek(md_content, user_message, base_dir, file_items, discussion_history, stream=stream) return _send_deepseek(md_content, user_message, base_dir, file_items, discussion_history, stream=stream, pre_tool_callback=pre_tool_callback, qa_callback=qa_callback)
raise ValueError(f"unknown provider: {_provider}") raise ValueError(f"unknown provider: {_provider}")
def get_history_bleed_stats(md_content: str | None = None) -> dict: def get_history_bleed_stats(md_content: str | None = None) -> dict:
+4 -2
View File
@@ -15,10 +15,12 @@ To serve as an expert-level utility for personal developer use on small projects
- **Tier 2 (Tech Lead):** Technical oversight and track execution (`/conductor:implement`) using `gemini-3-flash-preview`. Maintains persistent context throughout implementation. - **Tier 2 (Tech Lead):** Technical oversight and track execution (`/conductor:implement`) using `gemini-3-flash-preview`. Maintains persistent context throughout implementation.
- **Tier 3 (Worker):** Surgical code implementation and TDD using `gemini-2.5-flash-lite` or `deepseek-v3`. Operates statelessly with tool access and dependency skeletons. - **Tier 3 (Worker):** Surgical code implementation and TDD using `gemini-2.5-flash-lite` or `deepseek-v3`. Operates statelessly with tool access and dependency skeletons.
- **Tier 4 (QA):** Error analysis and diagnostics using `gemini-2.5-flash-lite` or `deepseek-v3`. Operates statelessly with tool access. - **Tier 4 (QA):** Error analysis and diagnostics using `gemini-2.5-flash-lite` or `deepseek-v3`. Operates statelessly with tool access.
- **MMA Delegation Engine:** Utilizes the `mma-exec` CLI and `mma.ps1` helper to route tasks, ensuring role-scoped context and detailed observability via timestamped sub-agent logs. - **MMA Delegation Engine:** Utilizes the `mma-exec` CLI and `mma.ps1` helper to route tasks, ensuring role-scoped context and detailed observability via timestamped sub-agent logs. Supports dynamic ticket creation and dependency resolution via an automated Dispatcher Loop.
- **Role-Scoped Documentation:** Automated mapping of foundational documents to specific tiers to prevent token bloat and maintain high-signal context. - **Role-Scoped Documentation:** Automated mapping of foundational documents to specific tiers to prevent token bloat and maintain high-signal context.
- **Strict Memory Siloing:** Employs AST-based interface extraction and "Context Amnesia" to provide workers only with the absolute minimum context required, preventing hallucination loops. - **Strict Memory Siloing:** Employs tree-sitter AST-based interface extraction (Skeleton View, Curated View) and "Context Amnesia" to provide workers only with the absolute minimum context required, preventing hallucination loops.
- **Explicit Execution Control:** All AI-generated PowerShell scripts require explicit human confirmation via interactive UI dialogs before execution, supported by a global "Linear Execution Clutch" for deterministic debugging. - **Explicit Execution Control:** All AI-generated PowerShell scripts require explicit human confirmation via interactive UI dialogs before execution, supported by a global "Linear Execution Clutch" for deterministic debugging.
- **Asynchronous Event-Driven Architecture:** Uses an `AsyncEventQueue` to link GUI actions to the backend engine, ensuring the interface remains fully responsive during multi-model generation and parallel worker execution.
- **Automated Tier 4 QA:** Integrates real-time error interception in the shell runner, automatically forwarding technical failures to cheap sub-agents for 20-word diagnostic summaries injected back into the worker history.
- **Detailed History Management:** Rich discussion history with branching, timestamping, and specific git commit linkage per conversation. - **Detailed History Management:** Rich discussion history with branching, timestamping, and specific git commit linkage per conversation.
- **In-Depth Toolset Access:** MCP-like file exploration, URL fetching, search, and dynamic context aggregation embedded within a multi-viewport Dear PyGui/ImGui interface. - **In-Depth Toolset Access:** MCP-like file exploration, URL fetching, search, and dynamic context aggregation embedded within a multi-viewport Dear PyGui/ImGui interface.
- **Integrated Workspace:** A consolidated Hub-based layout (Context, AI Settings, Discussion, Operations) designed for expert multi-monitor workflows. - **Integrated Workspace:** A consolidated Hub-based layout (Context, AI Settings, Discussion, Operations) designed for expert multi-monitor workflows.
+2
View File
@@ -42,4 +42,6 @@
## Architectural Patterns ## Architectural Patterns
- **Event-Driven Metrics:** Uses a custom `EventEmitter` to decouple API lifecycle events from UI rendering, improving performance and responsiveness. - **Event-Driven Metrics:** Uses a custom `EventEmitter` to decouple API lifecycle events from UI rendering, improving performance and responsiveness.
- **Asynchronous Event Bus:** Employs an `AsyncEventQueue` based on `asyncio.Queue` to manage the communication between the UI and the backend multi-agent orchestrator without blocking.
- **Synchronous IPC Approval Flow:** A specialized bridge mechanism that allows headless AI providers (like Gemini CLI) to synchronously request and receive human approval for tool calls via the GUI's REST API hooks. - **Synchronous IPC Approval Flow:** A specialized bridge mechanism that allows headless AI providers (like Gemini CLI) to synchronously request and receive human approval for tool calls via the GUI's REST API hooks.
- **Interface-Driven Development (IDD):** Enforces a "Stub-and-Resolve" pattern where cross-module dependencies are resolved by generating signatures/contracts before implementation.
+1 -1
View File
@@ -30,7 +30,7 @@ This file tracks all major tracks for the project. Each track has its own detail
--- ---
- [ ] **Track: MMA Core Engine Implementation** - [~] **Track: MMA Core Engine Implementation**
*Link: [./tracks/mma_core_engine_20260224/](./tracks/mma_core_engine_20260224/)* *Link: [./tracks/mma_core_engine_20260224/](./tracks/mma_core_engine_20260224/)*
--- ---
@@ -1,48 +1,72 @@
# Implementation Plan: MMA Core Engine Implementation # Implementation Plan: MMA Core Engine Implementation
## Phase 1: Track 1 - The Memory Foundations (AST Parser) ## Phase 1: Track 1 - The Memory Foundations (AST Parser) [checkpoint: ac31e41]
- [ ] Task: Dependency Setup - [x] Task: Dependency Setup (8fb75cc)
- [ ] Add `tree-sitter` and `tree-sitter-python` to `pyproject.toml` / `requirements.txt` - [x] Add `tree-sitter` and `tree-sitter-python` to `pyproject.toml` / `requirements.txt` (8fb75cc)
- [ ] Task: Core Parser Class - [x] Task: Core Parser Class (7a609ca)
- [ ] Create `ASTParser` in `file_cache.py` - [x] Create `ASTParser` in `file_cache.py` (7a609ca)
- [ ] Task: Skeleton View Extraction - [x] Task: Skeleton View Extraction (7a609ca)
- [ ] Write query to extract `function_definition` and `class_definition` - [x] Write query to extract `function_definition` and `class_definition` (7a609ca)
- [ ] Replace bodies with `pass`, keep type hints and signatures - [x] Replace bodies with `pass`, keep type hints and signatures (7a609ca)
- [ ] Task: Curated View Extraction - [x] Task: Curated View Extraction (7a609ca)
- [ ] Keep class structures, module docstrings - [x] Keep class structures, module docstrings (7a609ca)
- [ ] Preserve `@core_logic` or `# [HOT]` function bodies, hide others - [x] Preserve `@core_logic` or `# [HOT]` function bodies, hide others (7a609ca)
## Phase 2: Track 2 - State Machine & Data Structures ## Phase 2: Track 2 - State Machine & Data Structures [checkpoint: a518a30]
- [ ] Task: The Dataclasses - [x] Task: The Dataclasses (f9b5a50)
- [ ] Create `models.py` defining `Ticket` and `Track` - [x] Create `models.py` defining `Ticket` and `Track` (f9b5a50)
- [ ] Task: Worker Context Definition - [x] Task: Worker Context Definition (ee71929)
- [ ] Define `WorkerContext` holding `Ticket` ID, model config, and ephemeral messages - [x] Define `WorkerContext` holding `Ticket` ID, model config, and ephemeral messages (ee71929)
- [ ] Task: State Mutator Methods - [x] Task: State Mutator Methods (e925b21)
- [ ] Implement `ticket.mark_blocked()`, `ticket.mark_complete()`, `track.get_executable_tickets()` - [x] Implement `ticket.mark_blocked()`, `ticket.mark_complete()`, `track.get_executable_tickets()` (e925b21)
## Phase 3: Track 3 - The Linear Orchestrator & Execution Clutch ## Phase 3: Track 3 - The Linear Orchestrator & Execution Clutch [checkpoint: e6c8d73]
- [ ] Task: The Engine Core - [x] Task: The Engine Core (7a30168)
- [ ] Create `multi_agent_conductor.py` containing `ConductorEngine` and `run_worker_lifecycle` - [x] Create `multi_agent_conductor.py` containing `ConductorEngine` and `run_worker_lifecycle` (7a30168)
- [ ] Task: Context Injection - [x] Task: Context Injection (9d6d174)
- [ ] Format context strings using `file_cache.py` target AST views - [x] Format context strings using `file_cache.py` target AST views (9d6d174)
- [ ] Task: The HITL Execution Clutch - [x] Task: The HITL Execution Clutch (1afd9c8)
- [ ] Before executing `write_file`/`shell_runner.py` tools in step-mode, prompt user for confirmation - [x] Before executing `write_file`/`shell_runner.py` tools in step-mode, prompt user for confirmation (1afd9c8)
- [ ] Provide functionality to mutate the history JSON before resuming execution - [x] Provide functionality to mutate the history JSON before resuming execution (1afd9c8)
## Phase 4: Track 4 - Tier 4 QA Interception ## Phase 4: Track 4 - Tier 4 QA Interception [checkpoint: 61d17ad]
- [ ] Task: The Interceptor Loop - [x] Task: The Interceptor Loop (bc654c2)
- [ ] Catch `subprocess.run()` execution errors inside `shell_runner.py` - [x] Catch `subprocess.run()` execution errors inside `shell_runner.py` (bc654c2)
- [ ] Task: Tier 4 Instantiation - [x] Task: Tier 4 Instantiation (8e4e326)
- [ ] Make a secondary API call to `default_cheap` model passing `stderr` and snippet - [x] Make a secondary API call to `default_cheap` model passing `stderr` and snippet (8e4e326)
- [ ] Task: Payload Formatting - [x] Task: Payload Formatting (fb3da4d)
- [ ] Inject the 20-word fix summary into the Tier 3 worker history - [x] Inject the 20-word fix summary into the Tier 3 worker history (fb3da4d)
## Phase 5: Track 5 - UI Decoupling & Tier 1/2 Routing (The Final Boss) ## Phase 5: Track 5 - UI Decoupling & Tier 1/2 Routing (The Final Boss) [checkpoint: 3982fda]
- [ ] Task: The Event Bus - [x] Task: The Event Bus (695cb4a)
- [ ] Implement an `asyncio.Queue` linking GUI actions to the backend engine - [x] Implement an `asyncio.Queue` linking GUI actions to the backend engine (695cb4a)
- [ ] Task: Tier 1 & 2 System Prompts - [x] Task: Tier 1 & 2 System Prompts (a28d71b)
- [ ] Create structured system prompts for Epic routing and Ticket creation - [x] Create structured system prompts for Epic routing and Ticket creation (a28d71b)
- [ ] Task: The Dispatcher Loop - [x] Task: The Dispatcher Loop (1dacd36)
- [ ] Read Tier 2 JSON flat-lists, construct Tickets, execute Stub resolution paths - [x] Read Tier 2 JSON flat-lists, construct Tickets, execute Stub resolution paths (1dacd36)
- [ ] Task: UI Component Update - [x] Task: UI Component Update (68861c0)
- [ ] Refactor `gui_2.py` to push `UserRequestEvent` instead of blocking on API generation - [x] Refactor `gui_2.py` to push `UserRequestEvent` instead of blocking on API generation (68861c0)
## Phase 6: Live & Headless Verification
- [x] Task: Headless Engine Verification
- [x] Run a comprehensive headless test scenario (e.g., using a mock or dedicated test script).
- [x] Verify Ticket execution, "Context Amnesia" (statelessness), and Tier 4 error interception.
- [x] Task: Live GUI Integration Verification
- [x] Launch `gui_2.py` and verify Event Bus responsiveness.
- [x] Confirm UI updates and async event handling during multi-model generation.
- [x] Task: Comprehensive Regression Suite
- [x] Run all tests in `tests/` related to MMA, Conductor, and Async Events.
- [x] Verify that no regressions were introduced in existing functionality.
## Phase 7: MMA Observability & UX
- [~] Task: MMA Dashboard Implementation
- [ ] Create a new dockable panel in `gui_2.py` for "MMA Dashboard".
- [ ] Display active `Track` and `Ticket` queue status.
- [ ] Task: Execution Clutch UI
- [ ] Implement Step Mode toggle and Pause Points logic in the GUI.
- [ ] Add `[Approve]`, `[Edit Payload]`, and `[Abort]` buttons for tool execution.
- [ ] Task: Memory Mutator Modal
- [ ] Create a modal for editing raw JSON conversation history of paused workers.
- [ ] Task: Tiered Metrics & Log Links
- [ ] Add visual indicators for the active model Tier.
- [ ] Provide clickable links to sub-agent logs.
+2 -2
View File
@@ -1,6 +1,6 @@
[ai] [ai]
provider = "gemini_cli" provider = "gemini"
model = "gemini-3-flash-preview" model = "gemini-2.5-flash-lite"
temperature = 0.0 temperature = 0.0
max_tokens = 8192 max_tokens = 8192
history_trunc_limit = 8000 history_trunc_limit = 8000
+18
View File
@@ -0,0 +1,18 @@
import tree_sitter
import tree_sitter_python
code = """def hot_func():
# [HOT]
print(1)"""
PY_LANGUAGE = tree_sitter.Language(tree_sitter_python.language())
parser = tree_sitter.Parser(PY_LANGUAGE)
tree = parser.parse(bytes(code, "utf8"))
def walk(node, indent=0):
content = code[node.start_byte:node.end_byte].strip()
print(f"{' ' * indent}{node.type} ({node.start_byte}-{node.end_byte}): {content[:20]}")
for child in node.children:
walk(child, indent + 1)
walk(tree.root_node)
+102
View File
@@ -0,0 +1,102 @@
import tree_sitter
import tree_sitter_python
class ASTParser:
def __init__(self, language: str):
self.language = tree_sitter.Language(tree_sitter_python.language())
self.parser = tree_sitter.Parser(self.language)
def parse(self, code: str) -> tree_sitter.Tree:
return self.parser.parse(bytes(code, "utf8"))
def get_curated_view(self, code: str) -> str:
tree = self.parse(code)
edits = []
def is_docstring(node):
if node.type == "expression_statement" and node.child_count > 0:
if node.children[0].type == "string":
return True
return False
def has_core_logic_decorator(node):
parent = node.parent
if parent and parent.type == "decorated_definition":
for child in parent.children:
if child.type == "decorator":
if "@core_logic" in code[child.start_byte:child.end_byte]:
return True
return False
def has_hot_comment(func_node):
print(f"Checking {code[func_node.start_byte:func_node.start_byte+20].strip()}...")
stack = [func_node]
while stack:
curr = stack.pop()
if curr.type == "comment":
comment_text = code[curr.start_byte:curr.end_byte]
print(f" Found comment: {comment_text}")
if "[HOT]" in comment_text:
print(" [HOT] FOUND!")
return True
for child in curr.children:
stack.append(child)
return False
def walk(node):
if node.type == "function_definition":
body = node.child_by_field_name("body")
if body and body.type == "block":
preserve = has_core_logic_decorator(node) or has_hot_comment(node)
print(f"Function {code[node.start_byte:node.start_byte+20].strip()}, preserve={preserve}")
if not preserve:
indent = " " * body.start_point.column
first_stmt = None
for child in body.children:
if child.type != "comment":
first_stmt = child
break
if first_stmt and is_docstring(first_stmt):
start_byte = first_stmt.end_byte
end_byte = body.end_byte
if end_byte > start_byte:
edits.append((start_byte, end_byte, "\\n" + indent + "..."))
else:
start_byte = body.start_byte
end_byte = body.end_byte
edits.append((start_byte, end_byte, "..."))
for child in node.children:
walk(child)
walk(tree.root_node)
edits.sort(key=lambda x: x[0], reverse=True)
code_bytes = bytearray(code, "utf8")
for start, end, replacement in edits:
code_bytes[start:end] = bytes(replacement, "utf8")
return code_bytes.decode("utf8")
parser = ASTParser("python")
code = '''
@core_logic
def core_func():
"""Core logic doc."""
print("this should be preserved")
return True
def hot_func():
# [HOT]
print("this should also be preserved")
return 42
def normal_func():
"""Normal doc."""
print("this should be stripped")
return None
'''
result = parser.get_curated_view(code)
print("\\n--- RESULT ---\\n")
print(result)
+49 -1
View File
@@ -1,7 +1,8 @@
""" """
Decoupled event emission system for cross-module communication. Decoupled event emission system for cross-module communication.
""" """
from typing import Callable, Any, Dict, List import asyncio
from typing import Callable, Any, Dict, List, Tuple
class EventEmitter: class EventEmitter:
""" """
@@ -35,3 +36,50 @@ class EventEmitter:
if event_name in self._listeners: if event_name in self._listeners:
for callback in self._listeners[event_name]: for callback in self._listeners[event_name]:
callback(*args, **kwargs) callback(*args, **kwargs)
class AsyncEventQueue:
"""
Asynchronous event queue for decoupled communication using asyncio.Queue.
"""
def __init__(self):
"""Initializes the AsyncEventQueue with an internal asyncio.Queue."""
self._queue: asyncio.Queue = asyncio.Queue()
async def put(self, event_name: str, payload: Any = None):
"""
Puts an event into the queue.
Args:
event_name: The name of the event.
payload: Optional data associated with the event.
"""
await self._queue.put((event_name, payload))
async def get(self) -> Tuple[str, Any]:
"""
Gets an event from the queue.
Returns:
A tuple containing (event_name, payload).
"""
return await self._queue.get()
class UserRequestEvent:
"""
Payload for a user request event.
"""
def __init__(self, prompt: str, stable_md: str, file_items: List[Any], disc_text: str, base_dir: str):
self.prompt = prompt
self.stable_md = stable_md
self.file_items = file_items
self.disc_text = disc_text
self.base_dir = base_dir
def to_dict(self) -> Dict[str, Any]:
return {
"prompt": self.prompt,
"stable_md": self.stable_md,
"file_items": self.file_items,
"disc_text": self.disc_text,
"base_dir": self.base_dir
}
+144
View File
@@ -7,6 +7,150 @@ This file is kept so that any stale imports do not break.
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
import tree_sitter
import tree_sitter_python
class ASTParser:
"""
Parser for extracting AST-based views of source code.
Currently supports Python.
"""
def __init__(self, language: str):
if language != "python":
raise ValueError(f"Language '{language}' not supported yet.")
self.language_name = language
# Load the tree-sitter language grammar
self.language = tree_sitter.Language(tree_sitter_python.language())
self.parser = tree_sitter.Parser(self.language)
def parse(self, code: str) -> tree_sitter.Tree:
"""Parse the given code and return the tree-sitter Tree."""
return self.parser.parse(bytes(code, "utf8"))
def get_skeleton(self, code: str) -> str:
"""
Returns a skeleton of a Python file (preserving docstrings, stripping function bodies).
"""
tree = self.parse(code)
edits = []
def is_docstring(node):
if node.type == "expression_statement" and node.child_count > 0:
if node.children[0].type == "string":
return True
return False
def walk(node):
if node.type == "function_definition":
body = node.child_by_field_name("body")
if body and body.type == "block":
indent = " " * body.start_point.column
first_stmt = None
for child in body.children:
if child.type != "comment":
first_stmt = child
break
if first_stmt and is_docstring(first_stmt):
start_byte = first_stmt.end_byte
end_byte = body.end_byte
if end_byte > start_byte:
edits.append((start_byte, end_byte, f"\n{indent}..."))
else:
start_byte = body.start_byte
end_byte = body.end_byte
edits.append((start_byte, end_byte, "..."))
for child in node.children:
walk(child)
walk(tree.root_node)
# Apply edits in reverse to maintain byte offsets
edits.sort(key=lambda x: x[0], reverse=True)
code_bytes = bytearray(code, "utf8")
for start, end, replacement in edits:
code_bytes[start:end] = bytes(replacement, "utf8")
return code_bytes.decode("utf8")
def get_curated_view(self, code: str) -> str:
"""
Returns a curated skeleton of a Python file.
Preserves function bodies if they have @core_logic decorator or # [HOT] comment.
Otherwise strips bodies but preserves docstrings.
"""
tree = self.parse(code)
edits = []
def is_docstring(node):
if node.type == "expression_statement" and node.child_count > 0:
if node.children[0].type == "string":
return True
return False
def has_core_logic_decorator(node):
# Check if parent is decorated_definition
parent = node.parent
if parent and parent.type == "decorated_definition":
for child in parent.children:
if child.type == "decorator":
# decorator -> ( '@', identifier ) or ( '@', call )
if "@core_logic" in code[child.start_byte:child.end_byte]:
return True
return False
def has_hot_comment(func_node):
# Check all descendants of the function_definition for a [HOT] comment
stack = [func_node]
while stack:
curr = stack.pop()
if curr.type == "comment":
comment_text = code[curr.start_byte:curr.end_byte]
if "[HOT]" in comment_text:
return True
for child in curr.children:
stack.append(child)
return False
def walk(node):
if node.type == "function_definition":
body = node.child_by_field_name("body")
if body and body.type == "block":
# Check if we should preserve it
preserve = has_core_logic_decorator(node) or has_hot_comment(node)
if not preserve:
indent = " " * body.start_point.column
first_stmt = None
for child in body.children:
if child.type != "comment":
first_stmt = child
break
if first_stmt and is_docstring(first_stmt):
start_byte = first_stmt.end_byte
end_byte = body.end_byte
if end_byte > start_byte:
edits.append((start_byte, end_byte, f"\n{indent}..."))
else:
start_byte = body.start_byte
end_byte = body.end_byte
edits.append((start_byte, end_byte, "..."))
for child in node.children:
walk(child)
walk(tree.root_node)
# Apply edits in reverse to maintain byte offsets
edits.sort(key=lambda x: x[0], reverse=True)
code_bytes = bytearray(code, "utf8")
for start, end, replacement in edits:
code_bytes[start:end] = bytes(replacement, "utf8")
return code_bytes.decode("utf8")
def reset_client(): def reset_client():
+86 -37
View File
@@ -1,6 +1,7 @@
# gui_2.py # gui_2.py
import tomli_w import tomli_w
import threading import threading
import asyncio
import time import time
import math import math
import json import json
@@ -10,6 +11,7 @@ import uuid
import requests import requests
from pathlib import Path from pathlib import Path
from tkinter import filedialog, Tk from tkinter import filedialog, Tk
from typing import Optional, Callable
import aggregate import aggregate
import ai_client import ai_client
from ai_client import ProviderError from ai_client import ProviderError
@@ -114,6 +116,10 @@ class App:
def __init__(self): def __init__(self):
self.config = load_config() self.config = load_config()
self.event_queue = events.AsyncEventQueue()
self._loop = asyncio.new_event_loop()
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()
ai_cfg = self.config.get("ai", {}) ai_cfg = self.config.get("ai", {})
self._current_provider: str = ai_cfg.get("provider", "gemini") self._current_provider: str = ai_cfg.get("provider", "gemini")
@@ -806,6 +812,21 @@ class App:
if action == "refresh_api_metrics": if action == "refresh_api_metrics":
self._refresh_api_metrics(task.get("payload", {})) self._refresh_api_metrics(task.get("payload", {}))
elif action == "handle_ai_response":
payload = task.get("payload", {})
self.ai_response = payload.get("text", "")
self.ai_status = payload.get("status", "done")
self._trigger_blink = True
if self.ui_auto_add_history:
role = payload.get("role", "AI")
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": self.ai_response,
"collapsed": False,
"ts": project_manager.now_ts()
})
elif action == "set_value": elif action == "set_value":
item = task.get("item") item = task.get("item")
value = task.get("value") value = task.get("value")
@@ -944,12 +965,6 @@ class App:
def _handle_generate_send(self): def _handle_generate_send(self):
"""Logic for the 'Gen + Send' action.""" """Logic for the 'Gen + Send' action."""
send_busy = False
with self._send_thread_lock:
if self.send_thread and self.send_thread.is_alive():
send_busy = True
if not send_busy:
try: try:
md, path, file_items, stable_md, disc_text = self._do_generate() md, path, file_items, stable_md, disc_text = self._do_generate()
self.last_md = md self.last_md = md
@@ -962,43 +977,77 @@ class App:
self.ai_status = "sending..." self.ai_status = "sending..."
user_msg = self.ui_ai_input user_msg = self.ui_ai_input
base_dir = self.ui_files_base_dir base_dir = self.ui_files_base_dir
# Prepare event payload
event_payload = events.UserRequestEvent(
prompt=user_msg,
stable_md=stable_md,
file_items=file_items,
disc_text=disc_text,
base_dir=base_dir
)
# Push to async queue
asyncio.run_coroutine_threadsafe(
self.event_queue.put("user_request", event_payload),
self._loop
)
def _run_event_loop(self):
"""Runs the internal asyncio event loop."""
asyncio.set_event_loop(self._loop)
self._loop.create_task(self._process_event_queue())
self._loop.run_forever()
async def _process_event_queue(self):
"""Listens for and processes events from the AsyncEventQueue."""
while True:
event_name, payload = await self.event_queue.get()
if event_name == "user_request":
# Handle the request (simulating what was previously in do_send thread)
self._handle_request_event(payload)
elif event_name == "response":
# Handle AI response event
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "handle_ai_response",
"payload": payload
})
def _handle_request_event(self, event: events.UserRequestEvent):
"""Processes a UserRequestEvent by calling the AI client."""
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "User",
"content": event.prompt,
"collapsed": False,
"ts": project_manager.now_ts()
})
csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()])
ai_client.set_custom_system_prompt("\n\n".join(csp)) ai_client.set_custom_system_prompt("\n\n".join(csp))
ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit) ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit)
ai_client.set_agent_tools(self.ui_agent_tools) ai_client.set_agent_tools(self.ui_agent_tools)
send_md = stable_md
send_disc = disc_text
def do_send():
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({"role": "User", "content": user_msg, "collapsed": False, "ts": project_manager.now_ts()})
try: try:
resp = ai_client.send(send_md, user_msg, base_dir, self.last_file_items, send_disc) resp = ai_client.send(event.stable_md, event.prompt, event.base_dir, event.file_items, event.disc_text)
self.ai_response = resp # Emit response event
self.ai_status = "done" asyncio.run_coroutine_threadsafe(
self._trigger_blink = True self.event_queue.put("response", {"text": resp, "status": "done"}),
if self.ui_auto_add_history: self._loop
with self._pending_history_adds_lock: )
self._pending_history_adds.append({"role": "AI", "content": resp, "collapsed": False, "ts": project_manager.now_ts()})
except ProviderError as e: except ProviderError as e:
self.ai_response = e.ui_message() asyncio.run_coroutine_threadsafe(
self.ai_status = "error" self.event_queue.put("response", {"text": e.ui_message(), "status": "error", "role": "Vendor API"}),
self._trigger_blink = True self._loop
if self.ui_auto_add_history: )
with self._pending_history_adds_lock:
self._pending_history_adds.append({"role": "Vendor API", "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts()})
except Exception as e: except Exception as e:
self.ai_response = f"ERROR: {e}" asyncio.run_coroutine_threadsafe(
self.ai_status = "error" self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"}),
self._trigger_blink = True self._loop
if self.ui_auto_add_history: )
with self._pending_history_adds_lock:
self._pending_history_adds.append({"role": "System", "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts()})
with self._send_thread_lock:
self.send_thread = threading.Thread(target=do_send, daemon=True)
self.send_thread.start()
def _test_callback_func_write_to_file(self, data: str): def _test_callback_func_write_to_file(self, data: str):
"""A dummy function that a custom_callback would execute for testing.""" """A dummy function that a custom_callback would execute for testing."""
@@ -1066,7 +1115,7 @@ class App:
self.is_viewing_prior_session = True self.is_viewing_prior_session = True
self.ai_status = f"viewing prior session: {Path(path).name} ({len(entries)} entries)" self.ai_status = f"viewing prior session: {Path(path).name} ({len(entries)} entries)"
def _confirm_and_run(self, script: str, base_dir: str) -> str | None: def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str | None:
print(f"[DEBUG] _confirm_and_run triggered for script length: {len(script)}") print(f"[DEBUG] _confirm_and_run triggered for script length: {len(script)}")
dialog = ConfirmDialog(script, base_dir) dialog = ConfirmDialog(script, base_dir)
@@ -1106,7 +1155,7 @@ class App:
self.ai_status = "running powershell..." self.ai_status = "running powershell..."
print(f"[DEBUG] Running powershell in {base_dir}") print(f"[DEBUG] Running powershell in {base_dir}")
output = shell_runner.run_powershell(final_script, base_dir) output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback)
self._append_tool_log(final_script, output) self._append_tool_log(final_script, output)
self.ai_status = "powershell done, awaiting AI..." self.ai_status = "powershell done, awaiting AI..."
return output return output
+3 -2
View File
@@ -18,6 +18,7 @@ import sys
import os import os
from pathlib import Path from pathlib import Path
from tkinter import filedialog, Tk from tkinter import filedialog, Tk
from typing import Optional, Callable
import aggregate import aggregate
import ai_client import ai_client
from ai_client import ProviderError from ai_client import ProviderError
@@ -910,7 +911,7 @@ class App:
# ---------------------------------------------------------------- tool execution # ---------------------------------------------------------------- tool execution
def _confirm_and_run(self, script: str, base_dir: str) -> str | None: def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str | None:
dialog = ConfirmDialog(script, base_dir) dialog = ConfirmDialog(script, base_dir)
with self._pending_dialog_lock: with self._pending_dialog_lock:
@@ -923,7 +924,7 @@ class App:
return None return None
self._update_status("running powershell...") self._update_status("running powershell...")
output = shell_runner.run_powershell(final_script, base_dir) output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback)
self._append_tool_log(final_script, output) self._append_tool_log(final_script, output)
self._update_status("powershell done, awaiting AI...") self._update_status("powershell done, awaiting AI...")
return output return output
+29
View File
@@ -0,0 +1,29 @@
import tree_sitter
import tree_sitter_python
language = tree_sitter.Language(tree_sitter_python.language())
parser = tree_sitter.Parser(language)
code = """
@core_logic
def decorated_func():
"""Docstring."""
print("core logic here")
def hot_func():
# [HOT]
print("hot logic here")
def normal_func():
print("normal logic here")
"""
tree = parser.parse(bytes(code, "utf8"))
def print_node(node, indent=0):
print(" " * indent + f"{node.type} [{node.start_byte}-{node.end_byte}] " + (f"'{code[node.start_byte:node.end_byte]}'" if node.type in ["decorator", "comment", "identifier"] else ""))
for child in node.children:
print_node(child, indent + 1)
print_node(tree.root_node)
+3 -50
View File
@@ -242,57 +242,10 @@ def get_python_skeleton(path: str) -> str:
return f"ERROR: not a python file: {path}" return f"ERROR: not a python file: {path}"
try: try:
# Use mma_exec's generator if possible, or a local simplified version from file_cache import ASTParser
# For now, we will use a dedicated script or just inline logic here.
# Given we have tree-sitter already installed in the env...
import tree_sitter
import tree_sitter_python
code = p.read_text(encoding="utf-8") code = p.read_text(encoding="utf-8")
PY_LANGUAGE = tree_sitter.Language(tree_sitter_python.language()) parser = ASTParser("python")
parser = tree_sitter.Parser(PY_LANGUAGE) return parser.get_skeleton(code)
tree = parser.parse(bytes(code, "utf8"))
edits = []
def is_docstring(node):
if node.type == "expression_statement" and node.child_count > 0:
if node.children[0].type == "string":
return True
return False
def walk(node):
if node.type == "function_definition":
body = node.child_by_field_name("body")
if body and body.type == "block":
indent = " " * body.start_point.column
first_stmt = None
for child in body.children:
if child.type != "comment":
first_stmt = child
break
if first_stmt and is_docstring(first_stmt):
start_byte = first_stmt.end_byte
end_byte = body.end_byte
if end_byte > start_byte:
edits.append((start_byte, end_byte, f"\\n{indent}..."))
else:
start_byte = body.start_byte
end_byte = body.end_byte
edits.append((start_byte, end_byte, "..."))
for child in node.children:
walk(child)
walk(tree.root_node)
edits.sort(key=lambda x: x[0], reverse=True)
code_bytes = bytearray(code, "utf8")
for start, end, replacement in edits:
code_bytes[start:end] = bytes(replacement, "utf8")
return code_bytes.decode("utf8")
except Exception as e: except Exception as e:
return f"ERROR generating skeleton for '{path}': {e}" return f"ERROR generating skeleton for '{path}': {e}"
+153
View File
@@ -0,0 +1,153 @@
"""
MMA Structured System Prompts for Tier 1 (PM) and Tier 2 (Tech Lead).
Contains templates and static strings for hierarchical orchestration.
"""
from typing import Dict
# --- Tier 1 (Strategic/Orchestration: PM) ---
TIER1_BASE_SYSTEM = """
You are the Tier 1 Orchestrator (Product Manager) for the Manual Slop project.
Your role is high-level strategic planning, architecture enforcement, and cross-module delegation.
You operate strictly on metadata, summaries, and executive-level directives.
NEVER request or attempt to read raw implementation code unless specifically provided in a Macro-Diff.
Maintain a "Godot ECS Flat List format" (JSON array of objects) for structural outputs.
"""
TIER1_EPIC_INIT = TIER1_BASE_SYSTEM + """
PATH: Epic Initialization (Project Planning)
GOAL: Break down a massive feature request into discrete Implementation Tracks.
CONSTRAINTS:
- IGNORE all source code, AST skeletons, and previous micro-task histories.
- FOCUS ONLY on the Repository Map and Project Meta-State.
OUTPUT REQUIREMENT:
Return a JSON array of 'Tracks'. Each track object must follow the Godot ECS Flat List format:
[
{
"id": "track_unique_id",
"type": "Track",
"module": "target_module_name",
"persona": "required_tech_lead_persona",
"severity": "Low|Medium|High",
"goal": "Descriptive goal",
"acceptance_criteria": ["criteria_1", "criteria_2"]
},
...
]
"""
TIER1_TRACK_DELEGATION = TIER1_BASE_SYSTEM + """
PATH: Track Delegation (Sprint Kickoff)
GOAL: Compile a 'Track Brief' for a Tier 2 Tech Lead.
CONSTRAINTS:
- IGNORE unrelated module docs and original massive user prompt.
- USE AST Skeleton Views (class/function definitions only) for allowed modules.
OUTPUT REQUIREMENT:
Generate a comprehensive 'Track Brief' (JSON or Markdown) which includes:
1. A tailored System Prompt for the Tier 2 Tech Lead.
2. A curated list of files (the "Allowlist") they are authorized to modify.
3. Explicit architectural constraints derived from the Skeleton View.
"""
TIER1_MACRO_MERGE = TIER1_BASE_SYSTEM + """
PATH: Macro-Merge & Acceptance Review
GOAL: Review high-severity changes and merge into the project history.
CONSTRAINTS:
- IGNORE Tier 3 trial-and-error histories and Tier 4 error logs.
- FOCUS on the Macro-Diff and the Executive Summary.
OUTPUT REQUIREMENT:
Return "Approved" (commits to memory) OR "Rejected".
If Rejected, provide specific architectural feedback focusing on integration breaks or logic violations.
"""
# --- Tier 2 (Architectural/Tech Lead: Conductor) ---
TIER2_BASE_SYSTEM = """
You are the Tier 2 Track Conductor (Tech Lead) for the Manual Slop project.
Your role is module-specific planning, code review, and worker management.
You bridge high-level architecture with code syntax using AST-aware Skeleton Views.
Enforce Interface-Driven Development (IDD) and manage Topological Dependency Graphs.
"""
TIER2_SPRINT_PLANNING = TIER2_BASE_SYSTEM + """
PATH: Sprint Planning (Task Delegation)
GOAL: Break down a Track Brief into discrete Tier 3 Tickets.
CONSTRAINTS:
- IGNORE the PM's overarching project-planning logic.
- USE Curated Implementation View (AST-extracted class structures + [HOT] function bodies) for target modules.
- USE Skeleton View (signatures only) for foreign modules.
OUTPUT REQUIREMENT:
Return a JSON array of 'Tickets' in Godot ECS Flat List format.
Include 'depends_on' pointers to construct an execution DAG (Directed Acyclic Graph).
[
{
"id": "ticket_id",
"type": "Ticket",
"goal": "Surgical implementation task",
"target_file": "path/to/file",
"depends_on": ["other_ticket_id"],
"context_requirements": ["list_of_needed_skeletons"]
},
...
]
"""
TIER2_CODE_REVIEW = TIER2_BASE_SYSTEM + """
PATH: Code Review (Local Integration)
GOAL: Review Tier 3 diffs and ensure they meet the Ticket's goals.
CONSTRAINTS:
- IGNORE the Contributor's internal trial-and-error chat history.
- FOCUS on the Proposed Diff and Tier 4 (QA) logs.
OUTPUT REQUIREMENT:
Return "Approve" (merges diff) OR "Reject" (sends technical critique back to Tier 3).
"""
TIER2_TRACK_FINALIZATION = TIER2_BASE_SYSTEM + """
PATH: Track Finalization (Upward Reporting)
GOAL: Summarize the completed Track for the Tier 1 PM.
CONSTRAINTS:
- IGNORE back-and-forth review cycles.
- FOCUS on the Aggregated Track Diff and Dependency Delta.
OUTPUT REQUIREMENT:
Provide an Executive Summary (~200 words) and the final Macro-Diff.
"""
TIER2_CONTRACT_FIRST = TIER2_BASE_SYSTEM + """
PATH: Contract-First Delegation (Stub-and-Resolve)
GOAL: Resolve cross-module dependencies via Interface-Driven Development (IDD).
TASK:
You have detected a dependency on an undefined signature.
EXECUTION PLAN:
1. Define the Interface Contract.
2. Generate a 'Stub Ticket' (signature, types, docstring).
3. Generate a 'Consumer Ticket' (codes against skeleton).
4. Generate an 'Implementation Ticket' (fills logic).
OUTPUT REQUIREMENT:
Return the Ticket set in Godot ECS Flat List format (JSON array).
"""
PROMPTS: Dict[str, str] = {
"tier1_epic_init": TIER1_EPIC_INIT,
"tier1_track_delegation": TIER1_TRACK_DELEGATION,
"tier1_macro_merge": TIER1_MACRO_MERGE,
"tier2_sprint_planning": TIER2_SPRINT_PLANNING,
"tier2_code_review": TIER2_CODE_REVIEW,
"tier2_track_finalization": TIER2_TRACK_FINALIZATION,
"tier2_contract_first": TIER2_CONTRACT_FIRST,
}
+67
View File
@@ -0,0 +1,67 @@
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class Ticket:
"""
Represents a discrete unit of work within a track.
"""
id: str
description: str
status: str
assigned_to: str
depends_on: List[str] = field(default_factory=list)
blocked_reason: Optional[str] = None
step_mode: bool = False
def mark_blocked(self, reason: str):
"""Sets the ticket status to 'blocked' and records the reason."""
self.status = "blocked"
self.blocked_reason = reason
def mark_complete(self):
"""Sets the ticket status to 'completed'."""
self.status = "completed"
@dataclass
class Track:
"""
Represents a collection of tickets that together form an architectural track or epic.
"""
id: str
description: str
tickets: List[Ticket] = field(default_factory=list)
def get_executable_tickets(self) -> List[Ticket]:
"""
Returns all 'todo' tickets whose dependencies are all 'completed'.
"""
# Map ticket IDs to their current status for efficient lookup
status_map = {t.id: t.status for t in self.tickets}
executable = []
for ticket in self.tickets:
if ticket.status != "todo":
continue
# Check if all dependencies are completed
all_deps_completed = True
for dep_id in ticket.depends_on:
# If a dependency is missing from the track, we treat it as not completed (or we could raise an error)
if status_map.get(dep_id) != "completed":
all_deps_completed = False
break
if all_deps_completed:
executable.append(ticket)
return executable
@dataclass
class WorkerContext:
"""
Represents the context provided to a Tier 3 Worker for a specific ticket.
"""
ticket_id: str
model_name: str
messages: List[dict]
+138
View File
@@ -0,0 +1,138 @@
import ai_client
import json
from typing import List, Optional
from models import Ticket, Track, WorkerContext
from file_cache import ASTParser
class ConductorEngine:
"""
Orchestrates the execution of tickets within a track.
"""
def __init__(self, track: Track):
self.track = track
def parse_json_tickets(self, json_str: str):
"""
Parses a JSON string of ticket definitions (Godot ECS Flat List format)
and populates the Track's ticket list.
"""
try:
data = json.loads(json_str)
if not isinstance(data, list):
print("Error: JSON input must be a list of ticket definitions.")
return
for ticket_data in data:
# Construct Ticket object, using defaults for optional fields
ticket = Ticket(
id=ticket_data["id"],
description=ticket_data["description"],
status=ticket_data.get("status", "todo"),
assigned_to=ticket_data.get("assigned_to", "unassigned"),
depends_on=ticket_data.get("depends_on", []),
step_mode=ticket_data.get("step_mode", False)
)
self.track.tickets.append(ticket)
except json.JSONDecodeError as e:
print(f"Error parsing JSON tickets: {e}")
except KeyError as e:
print(f"Missing required field in ticket definition: {e}")
def run_linear(self):
"""
Executes tickets sequentially according to their dependencies.
Iterates through the track's executable tickets until no more can be run.
Supports dynamic execution as tickets added during runtime will be picked up
in the next iteration of the main loop.
"""
while True:
executable = self.track.get_executable_tickets()
if not executable:
# Check if we are finished or blocked
all_done = all(t.status == "completed" for t in self.track.tickets)
if all_done:
print("Track completed successfully.")
else:
# If we have no executable tickets but some are not completed, we might be blocked
# or there are simply no more tickets to run at this moment.
incomplete = [t for t in self.track.tickets if t.status != "completed"]
if not incomplete:
print("Track completed successfully.")
else:
print(f"No more executable tickets. {len(incomplete)} tickets remain incomplete.")
break
for ticket in executable:
# We re-check status in case it was modified by a parallel/dynamic process
# (though run_linear is currently single-threaded)
if ticket.status != "todo":
continue
print(f"Executing ticket {ticket.id}: {ticket.description}")
# For now, we use a default model name or take it from config
context = WorkerContext(
ticket_id=ticket.id,
model_name="gemini-2.5-flash-lite",
messages=[]
)
run_worker_lifecycle(ticket, context)
def confirm_execution(payload: str) -> bool:
"""
Placeholder for external confirmation function.
In a real scenario, this might trigger a UI prompt.
"""
return True
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None):
"""
Simulates the lifecycle of a single agent working on a ticket.
Calls the AI client and updates the ticket status based on the response.
"""
# Enforce Context Amnesia: each ticket starts with a clean slate.
ai_client.reset_session()
context_injection = ""
if context_files:
parser = ASTParser(language="python")
for i, file_path in enumerate(context_files):
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if i == 0:
view = parser.get_curated_view(content)
else:
view = parser.get_skeleton(content)
context_injection += f"\nFile: {file_path}\n{view}\n"
except Exception as e:
context_injection += f"\nError reading {file_path}: {e}\n"
# Build a prompt for the worker
user_message = (
f"You are assigned to Ticket {ticket.id}.\n"
f"Task Description: {ticket.description}\n"
)
if context_injection:
user_message += f"\nContext Files:\n{context_injection}\n"
user_message += (
"Please complete this task. If you are blocked and cannot proceed, "
"start your response with 'BLOCKED' and explain why."
)
# In a real scenario, we would pass md_content from the aggregator
# and manage the conversation history in the context.
response = ai_client.send(
md_content="",
user_message=user_message,
base_dir=".",
pre_tool_callback=confirm_execution if ticket.step_mode else None,
qa_callback=ai_client.run_tier4_analysis
)
if "BLOCKED" in response.upper():
ticket.mark_blocked(response)
else:
ticket.mark_complete()
return response
+3
View File
@@ -18,6 +18,9 @@ paths = []
[gemini_cli] [gemini_cli]
binary_path = "gemini" binary_path = "gemini"
[deepseek]
reasoning_effort = "medium"
[agent.tools] [agent.tools]
run_powershell = true run_powershell = true
read_file = true read_file = true
+1 -1
View File
@@ -8,5 +8,5 @@ active = "main"
[discussions.main] [discussions.main]
git_commit = "" git_commit = ""
last_updated = "2026-02-25T21:53:52" last_updated = "2026-02-26T21:32:42"
history = [] history = []
+1
View File
@@ -20,6 +20,7 @@ dependencies = [
dev = [ dev = [
"pytest>=9.0.2", "pytest>=9.0.2",
"pytest-cov>=7.0.0", "pytest-cov>=7.0.0",
"pytest-asyncio>=0.25.3",
] ]
[tool.pytest.ini_options] [tool.pytest.ini_options]
+198 -1
View File
@@ -267,6 +267,100 @@ colorama==0.4.6 ; sys_platform == 'win32' \
# via # via
# click # click
# pytest # pytest
coverage==7.13.4 \
--hash=sha256:01d4cbc3c283a17fc1e42d614a119f7f438eabb593391283adca8dc86eff1246 \
--hash=sha256:02231499b08dabbe2b96612993e5fc34217cdae907a51b906ac7fca8027a4459 \
--hash=sha256:0dd7ab8278f0d58a0128ba2fca25824321f05d059c1441800e934ff2efa52129 \
--hash=sha256:0e086334e8537ddd17e5f16a344777c1ab8194986ec533711cbe6c41cde841b6 \
--hash=sha256:14375934243ee05f56c45393fe2ce81fe5cc503c07cee2bdf1725fb8bef3ffaf \
--hash=sha256:1731dc33dc276dafc410a885cbf5992f1ff171393e48a21453b78727d090de80 \
--hash=sha256:19bc3c88078789f8ef36acb014d7241961dbf883fd2533d18cb1e7a5b4e28b11 \
--hash=sha256:1af1641e57cf7ba1bd67d677c9abdbcd6cc2ab7da3bca7fa1e2b7e50e65f2ad0 \
--hash=sha256:1d4be36a5114c499f9f1f9195e95ebf979460dbe2d88e6816ea202010ba1c34b \
--hash=sha256:200dea7d1e8095cc6e98cdabe3fd1d21ab17d3cee6dab00cadbb2fe35d9c15b9 \
--hash=sha256:23e3f687cf945070d1c90f85db66d11e3025665d8dafa831301a0e0038f3db9b \
--hash=sha256:2421d591f8ca05b308cf0092807308b2facbefe54af7c02ac22548b88b95c98f \
--hash=sha256:245e37f664d89861cf2329c9afa2c1fe9e6d4e1a09d872c947e70718aeeac505 \
--hash=sha256:25381386e80ae727608e662474db537d4df1ecd42379b5ba33c84633a2b36d47 \
--hash=sha256:25a41c3104d08edb094d9db0d905ca54d0cd41c928bb6be3c4c799a54753af55 \
--hash=sha256:29e3220258d682b6226a9b0925bc563ed9a1ebcff3cad30f043eceea7eaf2689 \
--hash=sha256:2b0f6ccf3dbe577170bebfce1318707d0e8c3650003cb4b3a9dd744575daa8b5 \
--hash=sha256:2c048ea43875fbf8b45d476ad79f179809c590ec7b79e2035c662e7afa3192e3 \
--hash=sha256:2fa8d5f8de70688a28240de9e139fa16b153cc3cbb01c5f16d88d6505ebdadf9 \
--hash=sha256:300deaee342f90696ed186e3a00c71b5b3d27bffe9e827677954f4ee56969601 \
--hash=sha256:30b8d0512f2dc8c8747557e8fb459d6176a2c9e5731e2b74d311c03b78451997 \
--hash=sha256:3599eb3992d814d23b35c536c28df1a882caa950f8f507cef23d1cbf334995ac \
--hash=sha256:391ee8f19bef69210978363ca930f7328081c6a0152f1166c91f0b5fdd2a773c \
--hash=sha256:3998e5a32e62fdf410c0dbd3115df86297995d6e3429af80b8798aad894ca7aa \
--hash=sha256:3c06f0f1337c667b971ca2f975523347e63ec5e500b9aa5882d91931cd3ef750 \
--hash=sha256:40aa8808140e55dc022b15d8aa7f651b6b3d68b365ea0398f1441e0b04d859c3 \
--hash=sha256:40d74da8e6c4b9ac18b15331c4b5ebc35a17069410cad462ad4f40dcd2d50c0d \
--hash=sha256:4223b4230a376138939a9173f1bdd6521994f2aff8047fae100d6d94d50c5a12 \
--hash=sha256:48685fee12c2eb3b27c62f2658e7ea21e9c3239cba5a8a242801a0a3f6a8c62a \
--hash=sha256:4c7d3cc01e7350f2f0f6f7036caaf5673fb56b6998889ccfe9e1c1fe75a9c932 \
--hash=sha256:4e83efc079eb39480e6346a15a1bcb3e9b04759c5202d157e1dd4303cd619356 \
--hash=sha256:53d133df809c743eb8bce33b24bcababb371f4441340578cd406e084d94a6148 \
--hash=sha256:590c0ed4bf8e85f745e6b805b2e1c457b2e33d5255dd9729743165253bc9ad39 \
--hash=sha256:5b856a8ccf749480024ff3bd7310adaef57bf31fd17e1bfc404b7940b6986634 \
--hash=sha256:65dfcbe305c3dfe658492df2d85259e0d79ead4177f9ae724b6fb245198f55d6 \
--hash=sha256:6f01afcff62bf9a08fb32b2c1d6e924236c0383c02c790732b6537269e466a72 \
--hash=sha256:6fdef321fdfbb30a197efa02d48fcd9981f0d8ad2ae8903ac318adc653f5df98 \
--hash=sha256:71ca20079dd8f27fcf808817e281e90220475cd75115162218d0e27549f95fef \
--hash=sha256:725d985c5ab621268b2edb8e50dfe57633dc69bda071abc470fed55a14935fd3 \
--hash=sha256:75eab1ebe4f2f64d9509b984f9314d4aa788540368218b858dad56dc8f3e5eb9 \
--hash=sha256:75fcd519f2a5765db3f0e391eb3b7d150cce1a771bf4c9f861aeab86c767a3c0 \
--hash=sha256:76451d1978b95ba6507a039090ba076105c87cc76fc3efd5d35d72093964d49a \
--hash=sha256:784fc3cf8be001197b652d51d3fd259b1e2262888693a4636e18879f613a62a9 \
--hash=sha256:78cdf0d578b15148b009ccf18c686aa4f719d887e76e6b40c38ffb61d264a552 \
--hash=sha256:79be69cf7f3bf9b0deeeb062eab7ac7f36cd4cc4c4dd694bd28921ba4d8596cc \
--hash=sha256:79e73a76b854d9c6088fe5d8b2ebe745f8681c55f7397c3c0a016192d681045f \
--hash=sha256:7b322db1284a2ed3aa28ffd8ebe3db91c929b7a333c0820abec3d838ef5b3525 \
--hash=sha256:7d41eead3cc673cbd38a4417deb7fd0b4ca26954ff7dc6078e33f6ff97bed940 \
--hash=sha256:7eda778067ad7ffccd23ecffce537dface96212576a07924cbf0d8799d2ded5a \
--hash=sha256:7f57b33491e281e962021de110b451ab8a24182589be17e12a22c79047935e23 \
--hash=sha256:8248977c2e33aecb2ced42fef99f2d319e9904a36e55a8a68b69207fb7e43edc \
--hash=sha256:845f352911777a8e722bfce168958214951e07e47e5d5d9744109fa5fe77f79b \
--hash=sha256:85480adfb35ffc32d40918aad81b89c69c9cc5661a9b8a81476d3e645321a056 \
--hash=sha256:8e264226ec98e01a8e1054314af91ee6cde0eacac4f465cc93b03dbe0bce2fd7 \
--hash=sha256:8e798c266c378da2bd819b0677df41ab46d78065fb2a399558f3f6cae78b2fbb \
--hash=sha256:9181a3ccead280b828fae232df12b16652702b49d41e99d657f46cc7b1f6ec7a \
--hash=sha256:9351229c8c8407645840edcc277f4a2d44814d1bc34a2128c11c2a031d45a5dd \
--hash=sha256:93550784d9281e374fb5a12bf1324cc8a963fd63b2d2f223503ef0fd4aa339ea \
--hash=sha256:9401ebc7ef522f01d01d45532c68c5ac40fb27113019b6b7d8b208f6e9baa126 \
--hash=sha256:94eb63f9b363180aff17de3e7c8760c3ba94664ea2695c52f10111244d16a299 \
--hash=sha256:a3aa4e7b9e416774b21797365b358a6e827ffadaaca81b69ee02946852449f00 \
--hash=sha256:ad27098a189e5838900ce4c2a99f2fe42a0bf0c2093c17c69b45a71579e8d4a2 \
--hash=sha256:ae4578f8528569d3cf303fef2ea569c7f4c4059a38c8667ccef15c6e1f118aa5 \
--hash=sha256:b1ec7b6b6e93255f952e27ab58fbc68dcc468844b16ecbee881aeb29b6ab4d8d \
--hash=sha256:b507778ae8a4c915436ed5c2e05b4a6cecfa70f734e19c22a005152a11c7b6a9 \
--hash=sha256:b66a2da594b6068b48b2692f043f35d4d3693fb639d5ea8b39533c2ad9ac3ab9 \
--hash=sha256:b720ce6a88a2755f7c697c23268ddc47a571b88052e6b155224347389fdf6a3b \
--hash=sha256:b7b38448866e83176e28086674fe7368ab8590e4610fb662b44e345b86d63ffa \
--hash=sha256:b8eb931ee8e6d8243e253e5ed7336deea6904369d2fd8ae6e43f68abbf167092 \
--hash=sha256:bd60d4fe2f6fa7dff9223ca1bbc9f05d2b6697bc5961072e5d3b952d46e1b1ea \
--hash=sha256:c35eb28c1d085eb7d8c9b3296567a1bebe03ce72962e932431b9a61f28facf26 \
--hash=sha256:c4240e7eded42d131a2d2c4dec70374b781b043ddc79a9de4d55ca71f8e98aea \
--hash=sha256:caa421e2684e382c5d8973ac55e4f36bed6821a9bad5c953494de960c74595c9 \
--hash=sha256:d490ba50c3f35dd7c17953c68f3270e7ccd1c6642e2d2afe2d8e720b98f5a053 \
--hash=sha256:d65b2d373032411e86960604dc4edac91fdfb5dca539461cf2cbe78327d1e64f \
--hash=sha256:dae88bc0fc77edaa65c14be099bd57ee140cf507e6bfdeea7938457ab387efb0 \
--hash=sha256:de6defc1c9badbf8b9e67ae90fd00519186d6ab64e5cc5f3d21359c2a9b2c1d3 \
--hash=sha256:e2f25215f1a359ab17320b47bcdaca3e6e6356652e8256f2441e4ef972052903 \
--hash=sha256:e5c8f6ed1e61a8b2dcdf31eb0b9bbf0130750ca79c1c49eb898e2ad86f5ccc91 \
--hash=sha256:e6f70dec1cc557e52df5306d051ef56003f74d56e9c4dd7ddb07e07ef32a84dd \
--hash=sha256:e856bf6616714c3a9fbc270ab54103f4e685ba236fa98c054e8f87f266c93505 \
--hash=sha256:e87f6c587c3f34356c3759f0420693e35e7eb0e2e41e4c011cb6ec6ecbbf1db7 \
--hash=sha256:eb30bf180de3f632cd043322dad5751390e5385108b2807368997d1a92a509d0 \
--hash=sha256:eb88b316ec33760714a4720feb2816a3a59180fd58c1985012054fa7aebee4c2 \
--hash=sha256:eb9078108fbf0bcdde37c3f4779303673c2fa1fe8f7956e68d447d0dd426d38a \
--hash=sha256:ecae9737b72408d6a950f7e525f30aca12d4bd8dd95e37342e5beb3a2a8c4f71 \
--hash=sha256:ee756f00726693e5ba94d6df2bdfd64d4852d23b09bb0bc700e3b30e6f333985 \
--hash=sha256:f4594c67d8a7c89cf922d9df0438c7c7bb022ad506eddb0fdb2863359ff78242 \
--hash=sha256:f53d492307962561ac7de4cd1de3e363589b000ab69617c6156a16ba7237998d \
--hash=sha256:fb07dc5da7e849e2ad31a5d74e9bece81f30ecf5a42909d0a695f8bd1874d6af \
--hash=sha256:fb26a934946a6afe0e326aebe0730cdff393a8bc0bbb65a2f41e30feddca399c \
--hash=sha256:fdfc1e28e7c7cdce44985b3043bc13bbd9c747520f94a4d7164af8260b3d91f0
# via pytest-cov
cryptography==46.0.5 \ cryptography==46.0.5 \
--hash=sha256:02f547fce831f5096c9a567fd41bc12ca8f11df260959ecc7c3202555cc47a72 \ --hash=sha256:02f547fce831f5096c9a567fd41bc12ca8f11df260959ecc7c3202555cc47a72 \
--hash=sha256:039917b0dc418bb9f6edce8a906572d69e74bd330b0b3fea4f79dab7f8ddd235 \ --hash=sha256:039917b0dc418bb9f6edce8a906572d69e74bd330b0b3fea4f79dab7f8ddd235 \
@@ -782,7 +876,9 @@ packaging==26.0 \
pluggy==1.6.0 \ pluggy==1.6.0 \
--hash=sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3 \ --hash=sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3 \
--hash=sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746 --hash=sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746
# via pytest # via
# pytest
# pytest-cov
propcache==0.4.1 \ propcache==0.4.1 \
--hash=sha256:0013cb6f8dde4b2a2f66903b8ba740bdfe378c943c4377a200551ceb27f379e4 \ --hash=sha256:0013cb6f8dde4b2a2f66903b8ba740bdfe378c943c4377a200551ceb27f379e4 \
--hash=sha256:005f08e6a0529984491e37d8dbc3dd86f84bd78a8ceb5fa9a021f4c48d4984be \ --hash=sha256:005f08e6a0529984491e37d8dbc3dd86f84bd78a8ceb5fa9a021f4c48d4984be \
@@ -1019,6 +1115,15 @@ pygments==2.19.2 \
pytest==9.0.2 \ pytest==9.0.2 \
--hash=sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b \ --hash=sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b \
--hash=sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11 --hash=sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11
# via
# pytest-asyncio
# pytest-cov
pytest-asyncio==1.3.0 \
--hash=sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5 \
--hash=sha256:d7f52f36d231b80ee124cd216ffb19369aa168fc10095013c6b014a34d3ee9e5
pytest-cov==7.0.0 \
--hash=sha256:33c97eda2e049a0c5298e91f519302a1334c26ac65c1a483d6206fd458361af1 \
--hash=sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861
requests==2.32.5 \ requests==2.32.5 \
--hash=sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6 \ --hash=sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6 \
--hash=sha256:dbba0bac56e100853db0ea71b82b4dfd5fe2bf6d3754a8893c3af500cec7d7cf --hash=sha256:dbba0bac56e100853db0ea71b82b4dfd5fe2bf6d3754a8893c3af500cec7d7cf
@@ -1043,10 +1148,101 @@ tenacity==9.1.4 \
--hash=sha256:6095a360c919085f28c6527de529e76a06ad89b23659fa881ae0649b867a9d55 \ --hash=sha256:6095a360c919085f28c6527de529e76a06ad89b23659fa881ae0649b867a9d55 \
--hash=sha256:adb31d4c263f2bd041081ab33b498309a57c77f9acf2db65aadf0898179cf93a --hash=sha256:adb31d4c263f2bd041081ab33b498309a57c77f9acf2db65aadf0898179cf93a
# via google-genai # via google-genai
tomli==2.4.0 ; python_full_version <= '3.11' \
--hash=sha256:0408e3de5ec77cc7f81960c362543cbbd91ef883e3138e81b729fc3eea5b9729 \
--hash=sha256:0dc56fef0e2c1c470aeac5b6ca8cc7b640bb93e92d9803ddaf9ea03e198f5b0b \
--hash=sha256:0e0fe8a0b8312acf3a88077a0802565cb09ee34107813bba1c7cd591fa6cfc8d \
--hash=sha256:0f2e3955efea4d1cfbcb87bc321e00dc08d2bcb737fd1d5e398af111d86db5df \
--hash=sha256:133e93646ec4300d651839d382d63edff11d8978be23da4cc106f5a18b7d0576 \
--hash=sha256:1b168f2731796b045128c45982d3a4874057626da0e2ef1fdd722848b741361d \
--hash=sha256:1c8a885b370751837c029ef9bc014f27d80840e48bac415f3412e6593bbc18c1 \
--hash=sha256:1f776e7d669ebceb01dee46484485f43a4048746235e683bcdffacdf1fb4785a \
--hash=sha256:1fb2945cbe303b1419e2706e711b7113da57b7db31ee378d08712d678a34e51e \
--hash=sha256:20cedb4ee43278bc4f2fee6cb50daec836959aadaf948db5172e776dd3d993fc \
--hash=sha256:20ffd184fb1df76a66e34bd1b36b4a4641bd2b82954befa32fe8163e79f1a702 \
--hash=sha256:26ab906a1eb794cd4e103691daa23d95c6919cc2fa9160000ac02370cc9dd3f6 \
--hash=sha256:2add28aacc7425117ff6364fe9e06a183bb0251b03f986df0e78e974047571fd \
--hash=sha256:2b1e3b80e1d5e52e40e9b924ec43d81570f0e7d09d11081b797bc4692765a3d4 \
--hash=sha256:31d556d079d72db7c584c0627ff3a24c5d3fb4f730221d3444f3efb1b2514776 \
--hash=sha256:36b9d05b51e65b254ea6c2585b59d2c4cb91c8a3d91d0ed0f17591a29aaea54a \
--hash=sha256:39b0b5d1b6dd03684b3fb276407ebed7090bbec989fa55838c98560c01113b66 \
--hash=sha256:3cf226acb51d8f1c394c1b310e0e0e61fecdd7adcb78d01e294ac297dd2e7f87 \
--hash=sha256:3d895d56bd3f82ddd6faaff993c275efc2ff38e52322ea264122d72729dca2b2 \
--hash=sha256:413540dce94673591859c4c6f794dfeaa845e98bf35d72ed59636f869ef9f86f \
--hash=sha256:43e685b9b2341681907759cf3a04e14d7104b3580f808cfde1dfdb60ada85475 \
--hash=sha256:4cbcb367d44a1f0c2be408758b43e1ffb5308abe0ea222897d6bfc8e8281ef2f \
--hash=sha256:551e321c6ba03b55676970b47cb1b73f14a0a4dce6a3e1a9458fd6d921d72e95 \
--hash=sha256:5572e41282d5268eb09a697c89a7bee84fae66511f87533a6f88bd2f7b652da9 \
--hash=sha256:5aa48d7c2356055feef06a43611fc401a07337d5b006be13a30f6c58f869e3c3 \
--hash=sha256:5b5807f3999fb66776dbce568cc9a828544244a8eb84b84b9bafc080c99597b9 \
--hash=sha256:5e3f639a7a8f10069d0e15408c0b96a2a828cfdec6fca05296ebcdcc28ca7c76 \
--hash=sha256:685306e2cc7da35be4ee914fd34ab801a6acacb061b6a7abca922aaf9ad368da \
--hash=sha256:75c2f8bbddf170e8effc98f5e9084a8751f8174ea6ccf4fca5398436e0320bc8 \
--hash=sha256:7b438885858efd5be02a9a133caf5812b8776ee0c969fea02c45e8e3f296ba51 \
--hash=sha256:7d49c66a7d5e56ac959cb6fc583aff0651094ec071ba9ad43df785abc2320d86 \
--hash=sha256:7d6d9a4aee98fac3eab4952ad1d73aee87359452d1c086b5ceb43ed02ddb16b8 \
--hash=sha256:84d081fbc252d1b6a982e1870660e7330fb8f90f676f6e78b052ad4e64714bf0 \
--hash=sha256:8768715ffc41f0008abe25d808c20c3d990f42b6e2e58305d5da280ae7d1fa3b \
--hash=sha256:920b1de295e72887bafa3ad9f7a792f811847d57ea6b1215154030cf131f16b1 \
--hash=sha256:9a08144fa4cba33db5255f9b74f0b89888622109bd2776148f2597447f92a94e \
--hash=sha256:a26d7ff68dfdb9f87a016ecfd1e1c2bacbe3108f4e0f8bcd2228ef9a766c787d \
--hash=sha256:aa89c3f6c277dd275d8e243ad24f3b5e701491a860d5121f2cdd399fbb31fc9c \
--hash=sha256:b5ef256a3fd497d4973c11bf142e9ed78b150d36f5773f1ca6088c230ffc5867 \
--hash=sha256:b6c78bdf37764092d369722d9946cb65b8767bfa4110f902a1b2542d8d173c8a \
--hash=sha256:bbb1b10aa643d973366dc2cb1ad94f99c1726a02343d43cbc011edbfac579e7c \
--hash=sha256:c084ad935abe686bd9c898e62a02a19abfc9760b5a79bc29644463eaf2840cb0 \
--hash=sha256:c73add4bb52a206fd0c0723432db123c0c75c280cbd67174dd9d2db228ebb1b4 \
--hash=sha256:cae9c19ed12d4e8f3ebf46d1a75090e4c0dc16271c5bce1c833ac168f08fb614 \
--hash=sha256:d20b797a5c1ad80c516e41bc1fb0443ddb5006e9aaa7bda2d71978346aeb9132 \
--hash=sha256:d3d1654e11d724760cdb37a3d7691f0be9db5fbdaef59c9f532aabf87006dbaa \
--hash=sha256:d878f2a6707cc9d53a1be1414bbb419e629c3d6e67f69230217bb663e76b5087
# via coverage
tomli-w==1.2.0 \ tomli-w==1.2.0 \
--hash=sha256:188306098d013b691fcadc011abd66727d3c414c571bb01b1a174ba8c983cf90 \ --hash=sha256:188306098d013b691fcadc011abd66727d3c414c571bb01b1a174ba8c983cf90 \
--hash=sha256:2dd14fac5a47c27be9cd4c976af5a12d87fb1f0b4512f81d69cce3b35ae25021 --hash=sha256:2dd14fac5a47c27be9cd4c976af5a12d87fb1f0b4512f81d69cce3b35ae25021
# via manual-slop # via manual-slop
tree-sitter==0.25.2 \
--hash=sha256:0628671f0de69bb279558ef6b640bcfc97864fe0026d840f872728a86cd6b6cd \
--hash=sha256:0c8b6682cac77e37cfe5cf7ec388844957f48b7bd8d6321d0ca2d852994e10d5 \
--hash=sha256:1799609636c0193e16c38f366bda5af15b1ce476df79ddaae7dd274df9e44266 \
--hash=sha256:260586381b23be33b6191a07cea3d44ecbd6c01aa4c6b027a0439145fcbc3358 \
--hash=sha256:3e65ae456ad0d210ee71a89ee112ac7e72e6c2e5aac1b95846ecc7afa68a194c \
--hash=sha256:463c032bd02052d934daa5f45d183e0521ceb783c2548501cf034b0beba92c9b \
--hash=sha256:4973b718fcadfb04e59e746abfbb0288694159c6aeecd2add59320c03368c721 \
--hash=sha256:49ee3c348caa459244ec437ccc7ff3831f35977d143f65311572b8ba0a5f265f \
--hash=sha256:56ac6602c7d09c2c507c55e58dc7026b8988e0475bd0002f8a386cce5e8e8adc \
--hash=sha256:65d3c931013ea798b502782acab986bbf47ba2c452610ab0776cf4a8ef150fc0 \
--hash=sha256:6d0302550bbe4620a5dc7649517c4409d74ef18558276ce758419cf09e578897 \
--hash=sha256:7d2ee1acbacebe50ba0f85fff1bc05e65d877958f00880f49f9b2af38dce1af0 \
--hash=sha256:b3d11a3a3ac89bb8a2543d75597f905a9926f9c806f40fcca8242922d1cc6ad5 \
--hash=sha256:b3f63a1796886249bd22c559a5944d64d05d43f2be72961624278eff0dcc5cb8 \
--hash=sha256:b43a9e4c89d4d0839de27cd4d6902d33396de700e9ff4c5ab7631f277a85ead9 \
--hash=sha256:b878e296e63661c8e124177cc3084b041ba3f5936b43076d57c487822426f614 \
--hash=sha256:b8ca72d841215b6573ed0655b3a5cd1133f9b69a6fa561aecad40dca9029d75b \
--hash=sha256:b8d4429954a3beb3e844e2872610d2a4800ba4eb42bb1990c6a4b1949b18459f \
--hash=sha256:bd88fbb0f6c3a0f28f0a68d72df88e9755cf5215bae146f5a1bdc8362b772053 \
--hash=sha256:bda059af9d621918efb813b22fb06b3fe00c3e94079c6143fcb2c565eb44cb87 \
--hash=sha256:c0c0ab5f94938a23fe81928a21cc0fac44143133ccc4eb7eeb1b92f84748331c \
--hash=sha256:cc0351cfe5022cec5a77645f647f92a936b38850346ed3f6d6babfbeeeca4d26 \
--hash=sha256:d77605e0d353ba3fe5627e5490f0fbfe44141bafa4478d88ef7954a61a848dae \
--hash=sha256:dd12d80d91d4114ca097626eb82714618dcdfacd6a5e0955216c6485c350ef99 \
--hash=sha256:ddabfff809ffc983fc9963455ba1cecc90295803e06e140a4c83e94c1fa3d960 \
--hash=sha256:eac4e8e4c7060c75f395feec46421eb61212cb73998dbe004b7384724f3682ab \
--hash=sha256:f5ddcd3e291a749b62521f71fc953f66f5fd9743973fd6dd962b092773569601 \
--hash=sha256:fbb1706407c0e451c4f8cc016fec27d72d4b211fdd3173320b1ada7a6c74c3ac \
--hash=sha256:fe43c158555da46723b28b52e058ad444195afd1db3ca7720c59a254544e9c20
# via manual-slop
tree-sitter-python==0.25.0 \
--hash=sha256:0fbf6a3774ad7e89ee891851204c2e2c47e12b63a5edbe2e9156997731c128bb \
--hash=sha256:14a79a47ddef72f987d5a2c122d148a812169d7484ff5c75a3db9609d419f361 \
--hash=sha256:480c21dbd995b7fe44813e741d71fed10ba695e7caab627fb034e3828469d762 \
--hash=sha256:71959832fc5d9642e52c11f2f7d79ae520b461e63334927e93ca46cd61cd9683 \
--hash=sha256:86f118e5eecad616ecdb81d171a36dde9bef5a0b21ed71ea9c3e390813c3baf5 \
--hash=sha256:9bcde33f18792de54ee579b00e1b4fe186b7926825444766f849bf7181793a76 \
--hash=sha256:b13e090f725f5b9c86aa455a268553c65cadf325471ad5b65cd29cac8a1a68ac \
--hash=sha256:be71650ca2b93b6e9649e5d65c6811aad87a7614c8c1003246b303f6b150f61b \
--hash=sha256:e6d5b5799628cc0f24691ab2a172a8e676f668fe90dc60468bee14084a35c16d
# via manual-slop
typing-extensions==4.15.0 \ typing-extensions==4.15.0 \
--hash=sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466 \ --hash=sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466 \
--hash=sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548 --hash=sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548
@@ -1058,6 +1254,7 @@ typing-extensions==4.15.0 \
# google-genai # google-genai
# pydantic # pydantic
# pydantic-core # pydantic-core
# pytest-asyncio
# starlette # starlette
# typing-inspection # typing-inspection
typing-inspection==0.4.2 \ typing-inspection==0.4.2 \
+4
View File
@@ -69,6 +69,10 @@ def get_model_for_role(role: str) -> str:
return 'gemini-3.1-pro-preview' return 'gemini-3.1-pro-preview'
elif role == 'tier2-tech-lead' or role == 'tier2': elif role == 'tier2-tech-lead' or role == 'tier2':
return 'gemini-3-flash-preview' return 'gemini-3-flash-preview'
elif role == 'tier3-worker' or role == 'tier3':
return 'gemini-2.5-flash-lite'
elif role == 'tier4-qa' or role == 'tier4':
return 'gemini-2.5-flash-lite'
else: else:
return 'gemini-3-flash-preview' return 'gemini-3-flash-preview'
+11 -2
View File
@@ -1,14 +1,16 @@
# shell_runner.py # shell_runner.py
import subprocess, shutil import subprocess, shutil
from pathlib import Path from pathlib import Path
from typing import Callable, Optional
TIMEOUT_SECONDS = 60 TIMEOUT_SECONDS = 60
def run_powershell(script: str, base_dir: str) -> str: def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str:
""" """
Run a PowerShell script with working directory set to base_dir. Run a PowerShell script with working directory set to base_dir.
Returns a string combining stdout, stderr, and exit code. Returns a string combining stdout, stderr, and exit code.
Raises nothing - all errors are captured into the return string. If qa_callback is provided and the command fails or has stderr,
the callback is called with the stderr content and its result is appended.
""" """
safe_dir = str(base_dir).replace("'", "''") safe_dir = str(base_dir).replace("'", "''")
full_script = f"Set-Location -LiteralPath '{safe_dir}'\n{script}" full_script = f"Set-Location -LiteralPath '{safe_dir}'\n{script}"
@@ -25,6 +27,13 @@ def run_powershell(script: str, base_dir: str) -> str:
if r.stdout.strip(): parts.append(f"STDOUT:\n{r.stdout.strip()}") if r.stdout.strip(): parts.append(f"STDOUT:\n{r.stdout.strip()}")
if r.stderr.strip(): parts.append(f"STDERR:\n{r.stderr.strip()}") if r.stderr.strip(): parts.append(f"STDERR:\n{r.stderr.strip()}")
parts.append(f"EXIT CODE: {r.returncode}") parts.append(f"EXIT CODE: {r.returncode}")
# QA Interceptor logic
if (r.returncode != 0 or r.stderr.strip()) and qa_callback:
qa_analysis = qa_callback(r.stderr.strip())
if qa_analysis:
parts.append(f"\nQA ANALYSIS:\n{qa_analysis}")
return "\n".join(parts) return "\n".join(parts)
except subprocess.TimeoutExpired: return f"ERROR: timed out after {TIMEOUT_SECONDS}s" except subprocess.TimeoutExpired: return f"ERROR: timed out after {TIMEOUT_SECONDS}s"
except Exception as e: return f"ERROR: {e}" except Exception as e: return f"ERROR: {e}"
+2 -1
View File
@@ -18,7 +18,8 @@ files = [
"tests/test_headless_service.py", "tests/test_headless_service.py",
"tests/test_performance_monitor.py", "tests/test_performance_monitor.py",
"tests/test_token_usage.py", "tests/test_token_usage.py",
"tests/test_layout_reorganization.py" "tests/test_layout_reorganization.py",
"tests/test_async_events.py"
] ]
[categories.conductor] [categories.conductor]
+1
View File
@@ -79,6 +79,7 @@ def live_gui():
print(f"\n[Fixture] Finally block triggered: Shutting down {gui_script}...") print(f"\n[Fixture] Finally block triggered: Shutting down {gui_script}...")
# Reset the GUI state before shutting down # Reset the GUI state before shutting down
try: try:
client = ApiHookClient()
client.reset_session() client.reset_session()
time.sleep(0.5) time.sleep(0.5)
except: pass except: pass
+3
View File
@@ -22,6 +22,9 @@ paths = []
[gemini_cli] [gemini_cli]
binary_path = "gemini" binary_path = "gemini"
[deepseek]
reasoning_effort = "medium"
[agent.tools] [agent.tools]
run_powershell = true run_powershell = true
read_file = true read_file = true
+2 -1
View File
@@ -3,11 +3,12 @@ roles = [
"AI", "AI",
"Vendor API", "Vendor API",
"System", "System",
"Reasoning",
] ]
active = "main" active = "main"
auto_add = true auto_add = true
[discussions.main] [discussions.main]
git_commit = "" git_commit = ""
last_updated = "2026-02-25T21:54:43" last_updated = "2026-02-26T21:33:34"
history = [] history = []
+4 -3
View File
@@ -3,12 +3,13 @@ roles = [
"AI", "AI",
"Vendor API", "Vendor API",
"System", "System",
"Reasoning",
] ]
history = [] history = []
active = "TestDisc_1772074463" active = "TestDisc_1772159592"
auto_add = true auto_add = true
[discussions.TestDisc_1772074463] [discussions.TestDisc_1772159592]
git_commit = "" git_commit = ""
last_updated = "2026-02-25T21:54:37" last_updated = "2026-02-26T21:33:27"
history = [] history = []
+3
View File
@@ -22,6 +22,9 @@ paths = []
[gemini_cli] [gemini_cli]
binary_path = "gemini" binary_path = "gemini"
[deepseek]
reasoning_effort = "medium"
[agent.tools] [agent.tools]
run_powershell = true run_powershell = true
read_file = true read_file = true
+2 -1
View File
@@ -3,11 +3,12 @@ roles = [
"AI", "AI",
"Vendor API", "Vendor API",
"System", "System",
"Reasoning",
] ]
active = "main" active = "main"
auto_add = true auto_add = true
[discussions.main] [discussions.main]
git_commit = "" git_commit = ""
last_updated = "2026-02-25T21:55:13" last_updated = "2026-02-26T21:34:05"
history = [] history = []
+3
View File
@@ -22,6 +22,9 @@ paths = []
[gemini_cli] [gemini_cli]
binary_path = "gemini" binary_path = "gemini"
[deepseek]
reasoning_effort = "medium"
[agent.tools] [agent.tools]
run_powershell = true run_powershell = true
read_file = true read_file = true
+2 -1
View File
@@ -3,11 +3,12 @@ roles = [
"AI", "AI",
"Vendor API", "Vendor API",
"System", "System",
"Reasoning",
] ]
active = "main" active = "main"
auto_add = true auto_add = true
[discussions.main] [discussions.main]
git_commit = "" git_commit = ""
last_updated = "2026-02-25T21:55:00" last_updated = "2026-02-26T21:33:51"
history = [] history = []
+4 -1
View File
@@ -20,7 +20,10 @@ base_dir = "."
paths = [] paths = []
[gemini_cli] [gemini_cli]
binary_path = "\"C:\\projects\\manual_slop\\.venv\\Scripts\\python.exe\" \"C:\\projects\\manual_slop\\tests\\mock_gemini_cli.py\"" binary_path = "gemini"
[deepseek]
reasoning_effort = "medium"
[agent.tools] [agent.tools]
run_powershell = true run_powershell = true
+3 -17
View File
@@ -3,26 +3,12 @@ roles = [
"AI", "AI",
"Vendor API", "Vendor API",
"System", "System",
"Reasoning",
] ]
active = "testing gemini-cli" active = "main"
auto_add = true auto_add = true
[discussions.main] [discussions.main]
git_commit = "" git_commit = ""
last_updated = "2026-02-25T23:33:11" last_updated = "2026-02-26T21:34:07"
history = [
"@2026-02-25T22:34:11\nSystem:\n[PERFORMANCE ALERT] CPU usage high: 92.2%. Please consider optimizing recent changes or reducing load.",
]
[discussions."testing deepseek"]
git_commit = ""
last_updated = "2026-02-26T12:38:38"
history = [
"@2026-02-26T12:38:30\nUser:\nwasap deepseek?",
"@2026-02-26T12:38:34\nAI:\nHey there! 👋 I'm DeepSeek, an AI assistant created by DeepSeek Company. I'm here to help you with various tasks like coding, writing, analysis, problem-solving, and more!\n\nI can:\n- Write and debug code in multiple programming languages\n- Help with technical questions and explanations\n- Assist with writing and content creation\n- Analyze documents and data\n- Search the web for current information (when you enable it)\n- Work with files and directories\n\nWhat can I help you with today? Whether it's coding, learning something new, or just having a conversation, I'm ready to assist! 😊",
]
[discussions."testing gemini-cli"]
git_commit = ""
last_updated = "2026-02-26T13:08:53"
history = [] history = []
+105
View File
@@ -0,0 +1,105 @@
import pytest
import tree_sitter
from file_cache import ASTParser
def test_ast_parser_initialization():
"""Verify that ASTParser can be initialized with a language string."""
parser = ASTParser("python")
assert parser.language_name == "python"
def test_ast_parser_parse():
"""Verify that the parse method returns a tree_sitter.Tree."""
parser = ASTParser("python")
code = """def example_func():
return 42"""
tree = parser.parse(code)
assert isinstance(tree, tree_sitter.Tree)
# Basic check that it parsed something
assert tree.root_node.type == "module"
def test_ast_parser_get_skeleton_python():
"""Verify that get_skeleton replaces function bodies with '...' while preserving docstrings."""
parser = ASTParser("python")
code = '''
def complex_function(a, b):
"""
This is a docstring.
It should be preserved.
"""
result = a + b
if result > 0:
return result
return 0
class MyClass:
def method_without_docstring(self):
print("doing something")
return None
'''
skeleton = parser.get_skeleton(code)
# Check that signatures are preserved
assert "def complex_function(a, b):" in skeleton
assert "class MyClass:" in skeleton
assert "def method_without_docstring(self):" in skeleton
# Check that docstring is preserved
assert '"""' in skeleton
assert "This is a docstring." in skeleton
assert "It should be preserved." in skeleton
# Check that bodies are replaced with '...'
assert "..." in skeleton
assert "result = a + b" not in skeleton
assert "return result" not in skeleton
assert 'print("doing something")' not in skeleton
def test_ast_parser_invalid_language():
"""Verify handling of unsupported or invalid languages."""
# This might raise an error or return a default, depending on implementation
# For now, we expect it to either fail gracefully or raise an exception we can catch
with pytest.raises(Exception):
ASTParser("not-a-language")
def test_ast_parser_get_curated_view():
"""Verify that get_curated_view preserves function bodies with @core_logic or # [HOT]."""
parser = ASTParser("python")
code = '''
@core_logic
def core_func():
"""Core logic doc."""
print("this should be preserved")
return True
def hot_func():
# [HOT]
print("this should also be preserved")
return 42
def normal_func():
"""Normal doc."""
print("this should be stripped")
return None
class MyClass:
@core_logic
def core_method(self, x):
print("method preserved", x)
'''
curated = parser.get_curated_view(code)
# Check that core_func is preserved
assert 'print("this should be preserved")' in curated
assert 'return True' in curated
# Check that hot_func is preserved
assert '# [HOT]' in curated
assert 'print("this should also be preserved")' in curated
# Check that normal_func is stripped but docstring is preserved
assert '"""Normal doc."""' in curated
assert 'print("this should be stripped")' not in curated
assert '...' in curated
# Check that core_method is preserved
assert 'print("method preserved", x)' in curated
+44
View File
@@ -0,0 +1,44 @@
import pytest
from file_cache import ASTParser
def test_ast_parser_get_curated_view():
parser = ASTParser("python")
code = '''
@core_logic
def core_func():
"""Core logic doc."""
print("this should be preserved")
return True
def hot_func():
# [HOT]
print("this should also be preserved")
return 42
def normal_func():
"""Normal doc."""
print("this should be stripped")
return None
class MyClass:
@core_logic
def core_method(self):
print("method preserved")
'''
curated = parser.get_curated_view(code)
# Check that core_func is preserved
assert 'print("this should be preserved")' in curated
assert 'return True' in curated
# Check that hot_func is preserved
assert '# [HOT]' in curated
assert 'print("this should also be preserved")' in curated
# Check that normal_func is stripped but docstring is preserved
assert '"""Normal doc."""' in curated
assert 'print("this should be stripped")' not in curated
assert '...' in curated
# Check that core_method is preserved
assert 'print("method preserved")' in curated
+47
View File
@@ -0,0 +1,47 @@
import asyncio
import pytest
from events import AsyncEventQueue
def test_async_event_queue_put_get():
"""Verify that an event can be asynchronously put and retrieved from the queue."""
async def run_test():
queue = AsyncEventQueue()
event_name = "test_event"
payload = {"data": "hello"}
await queue.put(event_name, payload)
ret_name, ret_payload = await queue.get()
assert ret_name == event_name
assert ret_payload == payload
asyncio.run(run_test())
def test_async_event_queue_multiple():
"""Verify that multiple events can be asynchronously put and retrieved in order."""
async def run_test():
queue = AsyncEventQueue()
await queue.put("event1", 1)
await queue.put("event2", 2)
name1, val1 = await queue.get()
name2, val2 = await queue.get()
assert name1 == "event1"
assert val1 == 1
assert name2 == "event2"
assert val2 == 2
asyncio.run(run_test())
def test_async_event_queue_none_payload():
"""Verify that an event with None payload works correctly."""
async def run_test():
queue = AsyncEventQueue()
await queue.put("no_payload")
name, payload = await queue.get()
assert name == "no_payload"
assert payload is None
asyncio.run(run_test())
+258
View File
@@ -0,0 +1,258 @@
import pytest
from unittest.mock import MagicMock, patch
from models import Ticket, Track, WorkerContext
# These tests define the expected interface for multi_agent_conductor.py
# which will be implemented in the next phase of TDD.
def test_conductor_engine_initialization():
"""
Test that ConductorEngine can be initialized with a Track.
"""
track = Track(id="test_track", description="Test Track")
from multi_agent_conductor import ConductorEngine
engine = ConductorEngine(track=track)
assert engine.track == track
def test_conductor_engine_run_linear_executes_tickets_in_order():
"""
Test that run_linear iterates through executable tickets and calls the worker lifecycle.
"""
ticket1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
ticket2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker2", depends_on=["T1"])
track = Track(id="track1", description="Track 1", tickets=[ticket1, ticket2])
from multi_agent_conductor import ConductorEngine
engine = ConductorEngine(track=track)
# We mock run_worker_lifecycle as it is expected to be in the same module
with patch("multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle:
# Mocking lifecycle to mark ticket as complete so dependencies can be resolved
def side_effect(ticket, context):
ticket.mark_complete()
return "Success"
mock_lifecycle.side_effect = side_effect
engine.run_linear()
# Track.get_executable_tickets() should be called repeatedly until all are done
# T1 should run first, then T2.
assert mock_lifecycle.call_count == 2
assert ticket1.status == "completed"
assert ticket2.status == "completed"
# Verify sequence: T1 before T2
calls = mock_lifecycle.call_args_list
assert calls[0][0][0].id == "T1"
assert calls[1][0][0].id == "T2"
def test_run_worker_lifecycle_calls_ai_client_send():
"""
Test that run_worker_lifecycle triggers the AI client and updates ticket status on success.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from multi_agent_conductor import run_worker_lifecycle
with patch("ai_client.send") as mock_send:
mock_send.return_value = "Task complete. I have updated the file."
result = run_worker_lifecycle(ticket, context)
assert result == "Task complete. I have updated the file."
assert ticket.status == "completed"
mock_send.assert_called_once()
# Check if description was passed to send()
args, kwargs = mock_send.call_args
# user_message is passed as a keyword argument
assert ticket.description in kwargs["user_message"]
def test_run_worker_lifecycle_context_injection():
"""
Test that run_worker_lifecycle can take a context_files list and injects AST views into the prompt.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
context_files = ["primary.py", "secondary.py"]
from multi_agent_conductor import run_worker_lifecycle
# We mock ASTParser which is expected to be imported in multi_agent_conductor
with patch("ai_client.send") as mock_send, \
patch("multi_agent_conductor.ASTParser") as mock_ast_parser_class, \
patch("builtins.open", new_callable=MagicMock) as mock_open:
# Setup open mock to return different content for different files
file_contents = {
"primary.py": "def primary(): pass",
"secondary.py": "def secondary(): pass"
}
def mock_open_side_effect(file, *args, **kwargs):
content = file_contents.get(file, "")
mock_file = MagicMock()
mock_file.read.return_value = content
mock_file.__enter__.return_value = mock_file
return mock_file
mock_open.side_effect = mock_open_side_effect
# Setup ASTParser mock
mock_ast_parser = mock_ast_parser_class.return_value
mock_ast_parser.get_curated_view.return_value = "CURATED VIEW"
mock_ast_parser.get_skeleton.return_value = "SKELETON VIEW"
mock_send.return_value = "Success"
run_worker_lifecycle(ticket, context, context_files=context_files)
# Verify ASTParser calls:
# First file (primary) should get curated view, others (secondary) get skeleton
mock_ast_parser.get_curated_view.assert_called_once_with("def primary(): pass")
mock_ast_parser.get_skeleton.assert_called_once_with("def secondary(): pass")
# Verify user_message contains the views
_, kwargs = mock_send.call_args
user_message = kwargs["user_message"]
assert "CURATED VIEW" in user_message
assert "SKELETON VIEW" in user_message
assert "primary.py" in user_message
assert "secondary.py" in user_message
def test_run_worker_lifecycle_handles_blocked_response():
"""
Test that run_worker_lifecycle marks the ticket as blocked if the AI indicates it cannot proceed.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from multi_agent_conductor import run_worker_lifecycle
with patch("ai_client.send") as mock_send:
# Simulate a response indicating a block
mock_send.return_value = "I am BLOCKED because I don't have enough information."
run_worker_lifecycle(ticket, context)
assert ticket.status == "blocked"
assert "BLOCKED" in ticket.blocked_reason
def test_run_worker_lifecycle_step_mode_confirmation():
"""
Test that run_worker_lifecycle passes confirm_execution to ai_client.send when step_mode is True.
Verify that if confirm_execution is called (simulated by mocking ai_client.send to call its callback),
the flow works as expected.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1", step_mode=True)
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from multi_agent_conductor import run_worker_lifecycle, confirm_execution
with patch("ai_client.send") as mock_send, \
patch("multi_agent_conductor.confirm_execution") as mock_confirm:
# We simulate ai_client.send by making it call the pre_tool_callback it received
def mock_send_side_effect(*args, **kwargs):
callback = kwargs.get("pre_tool_callback")
if callback:
# Simulate calling it with some payload
callback('{"tool": "read_file", "args": {"path": "test.txt"}}')
return "Success"
mock_send.side_effect = mock_send_side_effect
mock_confirm.return_value = True
run_worker_lifecycle(ticket, context)
# Verify confirm_execution was called
mock_confirm.assert_called_once()
assert ticket.status == "completed"
def test_run_worker_lifecycle_step_mode_rejection():
"""
Verify that if confirm_execution returns False, the logic (in ai_client, which we simulate here)
would prevent execution. In run_worker_lifecycle, we just check if it's passed.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1", step_mode=True)
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from multi_agent_conductor import run_worker_lifecycle
with patch("ai_client.send") as mock_send, \
patch("multi_agent_conductor.confirm_execution") as mock_confirm:
mock_confirm.return_value = False
mock_send.return_value = "Task failed because tool execution was rejected."
run_worker_lifecycle(ticket, context)
# Verify it was passed to send
args, kwargs = mock_send.call_args
assert kwargs["pre_tool_callback"] is not None
# Since we've already tested ai_client's implementation of pre_tool_callback (mentally or via other tests),
# here we just verify the wiring.
def test_conductor_engine_dynamic_parsing_and_execution():
"""
Test that parse_json_tickets correctly populates the track and run_linear executes them in dependency order.
"""
import json
from multi_agent_conductor import ConductorEngine
track = Track(id="dynamic_track", description="Dynamic Track")
engine = ConductorEngine(track=track)
tickets_json = json.dumps([
{
"id": "T1",
"description": "Initial task",
"status": "todo",
"assigned_to": "worker1",
"depends_on": []
},
{
"id": "T2",
"description": "Dependent task",
"status": "todo",
"assigned_to": "worker2",
"depends_on": ["T1"]
},
{
"id": "T3",
"description": "Another initial task",
"status": "todo",
"assigned_to": "worker3",
"depends_on": []
}
])
engine.parse_json_tickets(tickets_json)
assert len(engine.track.tickets) == 3
assert engine.track.tickets[0].id == "T1"
assert engine.track.tickets[1].id == "T2"
assert engine.track.tickets[2].id == "T3"
# Mock run_worker_lifecycle to mark tickets as complete
with patch("multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle:
def side_effect(ticket, context):
ticket.mark_complete()
return "Success"
mock_lifecycle.side_effect = side_effect
engine.run_linear()
assert mock_lifecycle.call_count == 3
# Verify dependency order: T1 must be called before T2
calls = [call[0][0].id for call in mock_lifecycle.call_args_list]
t1_idx = calls.index("T1")
t2_idx = calls.index("T2")
assert t1_idx < t2_idx
# T3 can be anywhere relative to T1 and T2, but T1 < T2 is mandatory
assert "T3" in calls
+80
View File
@@ -0,0 +1,80 @@
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
import asyncio
from gui_2 import App
from events import UserRequestEvent
@pytest.fixture
def mock_gui():
with (
patch('gui_2.load_config', return_value={
"ai": {"provider": "gemini", "model": "model-1"},
"projects": {"paths": [], "active": ""},
"gui": {"show_windows": {}}
}),
patch('gui_2.project_manager.load_project', return_value={}),
patch('gui_2.project_manager.migrate_from_legacy_config', return_value={}),
patch('gui_2.project_manager.save_project'),
patch('gui_2.session_logger.open_session'),
patch('gui_2.App._init_ai_and_hooks'),
patch('gui_2.App._fetch_models')
):
gui = App()
return gui
def test_handle_generate_send_pushes_event(mock_gui):
# Mock _do_generate to return sample data
mock_gui._do_generate = MagicMock(return_value=(
"full_md", "path", [], "stable_md", "disc_text"
))
mock_gui.ui_ai_input = "test prompt"
mock_gui.ui_files_base_dir = "."
# Mock event_queue.put
mock_gui.event_queue.put = MagicMock()
# We need to mock asyncio.run_coroutine_threadsafe to immediately execute
with patch('asyncio.run_coroutine_threadsafe') as mock_run:
mock_gui._handle_generate_send()
# Verify run_coroutine_threadsafe was called
assert mock_run.called
# Verify the call to event_queue.put was correct
# This is a bit tricky since the first arg to run_coroutine_threadsafe
# is the coroutine returned by event_queue.put().
# Let's verify that the call to put occurred.
mock_gui.event_queue.put.assert_called_once()
args, kwargs = mock_gui.event_queue.put.call_args
assert args[0] == "user_request"
event = args[1]
assert isinstance(event, UserRequestEvent)
assert event.prompt == "test prompt"
assert event.stable_md == "stable_md"
assert event.disc_text == "disc_text"
assert event.base_dir == "."
def test_user_request_event_payload():
payload = UserRequestEvent(
prompt="hello",
stable_md="md",
file_items=[],
disc_text="disc",
base_dir="."
)
d = payload.to_dict()
assert d["prompt"] == "hello"
assert d["stable_md"] == "md"
assert d["file_items"] == []
assert d["disc_text"] == "disc"
assert d["base_dir"] == "."
@pytest.mark.asyncio
async def test_async_event_queue():
from events import AsyncEventQueue
q = AsyncEventQueue()
await q.put("test_event", {"data": 123})
name, payload = await q.get()
assert name == "test_event"
assert payload["data"] == 123
+136
View File
@@ -0,0 +1,136 @@
import pytest
from unittest.mock import MagicMock, patch, call
from models import Ticket, Track, WorkerContext
from multi_agent_conductor import ConductorEngine
import ai_client
import json
def test_headless_verification_full_run():
"""
1. Initialize a ConductorEngine with a Track containing multiple dependent Tickets.
2. Simulate a full execution run using engine.run_linear().
3. Mock ai_client.send to simulate successful tool calls and final responses.
4. Specifically verify that 'Context Amnesia' is maintained.
"""
t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker1", depends_on=["T1"])
track = Track(id="track_verify", description="Verification Track", tickets=[t1, t2])
engine = ConductorEngine(track=track)
with patch("ai_client.send") as mock_send, \
patch("ai_client.reset_session") as mock_reset:
# We need mock_send to return something that doesn't contain "BLOCKED"
mock_send.return_value = "Task completed successfully."
engine.run_linear()
# Verify both tickets are completed
assert t1.status == "completed"
assert t2.status == "completed"
# Verify that ai_client.send was called twice (once for each ticket)
assert mock_send.call_count == 2
# Verify Context Amnesia: reset_session should be called for each ticket
# This confirms that each worker call starts with a clean slate.
assert mock_reset.call_count == 2
def test_headless_verification_error_and_qa_interceptor():
"""
5. Simulate a shell error and verify that the Tier 4 QA interceptor is triggered
and its summary is injected into the worker's history for the next retry.
"""
t1 = Ticket(id="T1", description="Task with error", status="todo", assigned_to="worker1")
track = Track(id="track_error", description="Error Track", tickets=[t1])
engine = ConductorEngine(track=track)
# We need to simulate the tool loop inside ai_client._send_gemini (or similar)
# Since we want to test the real tool loop and QA injection, we mock at the provider level.
with patch("ai_client._provider", "gemini"), \
patch("ai_client._gemini_client") as mock_genai_client, \
patch("ai_client.confirm_and_run_callback") as mock_run, \
patch("ai_client.run_tier4_analysis") as mock_qa, \
patch("ai_client._ensure_gemini_client") as mock_ensure, \
patch("ai_client._gemini_tool_declaration", return_value=None):
# Ensure _gemini_client is restored by the mock ensure function
import ai_client
def restore_client():
ai_client._gemini_client = mock_genai_client
mock_ensure.side_effect = restore_client
ai_client._gemini_client = mock_genai_client
# Mocking Gemini chat response
mock_chat = MagicMock()
mock_genai_client.chats.create.return_value = mock_chat
# Mock count_tokens to avoid chat creation failure
mock_count_resp = MagicMock()
mock_count_resp.total_tokens = 100
mock_genai_client.models.count_tokens.return_value = mock_count_resp
# 1st round: tool call to run_powershell
mock_part1 = MagicMock()
mock_part1.text = "I will run a command."
mock_part1.function_call = MagicMock()
mock_part1.function_call.name = "run_powershell"
mock_part1.function_call.args = {"script": "dir"}
mock_resp1 = MagicMock()
mock_resp1.candidates = [MagicMock(content=MagicMock(parts=[mock_part1]), finish_reason=MagicMock(name="STOP"))]
mock_resp1.usage_metadata.prompt_token_count = 10
mock_resp1.usage_metadata.candidates_token_count = 5
# 2nd round: Final text after tool result
mock_part2 = MagicMock()
mock_part2.text = "The command failed but I understand why. Task done."
mock_part2.function_call = None
mock_resp2 = MagicMock()
mock_resp2.candidates = [MagicMock(content=MagicMock(parts=[mock_part2]), finish_reason=MagicMock(name="STOP"))]
mock_resp2.usage_metadata.prompt_token_count = 20
mock_resp2.usage_metadata.candidates_token_count = 10
mock_chat.send_message.side_effect = [mock_resp1, mock_resp2]
# Mock run_powershell behavior: it should call the qa_callback on error
def run_side_effect(script, base_dir, qa_callback):
if qa_callback:
analysis = qa_callback("Error: file not found")
return f"""STDERR: Error: file not found
QA ANALYSIS:
{analysis}"""
return "Error: file not found"
mock_run.side_effect = run_side_effect
mock_qa.return_value = "FIX: Check if path exists."
engine.run_linear()
# Verify QA analysis was triggered
mock_qa.assert_called_once_with("Error: file not found")
# Verify the 2nd send_message call includes the QA ANALYSIS in its payload (f_resps)
# The first call is the user message, the second is the tool response.
assert mock_chat.send_message.call_count == 2
args, kwargs = mock_chat.send_message.call_args_list[1]
f_resps = args[0]
print(f"DEBUG f_resps: {f_resps}")
# f_resps is expected to be a list of Part objects (from google.genai.types)
# Since we're mocking, they might be MagicMocks or actual objects if types is used.
# In our case, ai_client.Part.from_function_response is used.
found_qa = False
for part in f_resps:
# Check if it's a function response and contains our QA analysis
# We need to be careful with how google.genai.types.Part is structured or mocked
part_str = str(part)
print(f"DEBUG part_str: {part_str}")
if "QA ANALYSIS:" in part_str and "FIX: Check if path exists." in part_str:
found_qa = True
assert found_qa, "QA Analysis was not injected into the next round"
+127
View File
@@ -0,0 +1,127 @@
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
import asyncio
import time
from gui_2 import App
from events import UserRequestEvent
import ai_client
@pytest.fixture
def mock_app():
with (
patch('gui_2.load_config', return_value={
"ai": {"provider": "gemini", "model": "model-1", "temperature": 0.0, "max_tokens": 100, "history_trunc_limit": 1000},
"projects": {"paths": [], "active": ""},
"gui": {"show_windows": {}}
}),
patch('gui_2.project_manager.load_project', return_value={
"project": {"name": "test_proj"},
"discussion": {"active": "main", "discussions": {"main": {"history": []}}},
"files": {"paths": [], "base_dir": "."},
"screenshots": {"paths": [], "base_dir": "."},
"agent": {"tools": {}}
}),
patch('gui_2.project_manager.migrate_from_legacy_config', return_value={}),
patch('gui_2.project_manager.save_project'),
patch('gui_2.session_logger.open_session'),
patch('gui_2.App._init_ai_and_hooks'),
patch('gui_2.App._fetch_models')
):
app = App()
yield app
# We don't have a clean way to stop the loop thread in gui_2.py App
# so we just let it daemon-exit.
@pytest.mark.timeout(10)
def test_user_request_integration_flow(mock_app):
"""
Verifies that pushing a UserRequestEvent to the event_queue:
1. Triggers ai_client.send
2. Results in a 'response' event back to the queue
3. Eventually updates the UI state (ai_response, ai_status) after processing GUI tasks.
"""
app = mock_app
# Mock all ai_client methods called during _handle_request_event
mock_response = "This is a test AI response"
with (
patch('ai_client.send', return_value=mock_response) as mock_send,
patch('ai_client.set_custom_system_prompt'),
patch('ai_client.set_model_params'),
patch('ai_client.set_agent_tools')
):
# 1. Create and push a UserRequestEvent
event = UserRequestEvent(
prompt="Hello AI",
stable_md="Context",
file_items=[],
disc_text="History",
base_dir="."
)
# 2. Push event to the app's internal loop
asyncio.run_coroutine_threadsafe(
app.event_queue.put("user_request", event),
app._loop
)
# 3. Wait for ai_client.send to be called (polling background thread)
start_time = time.time()
while not mock_send.called and time.time() - start_time < 5:
time.sleep(0.1)
assert mock_send.called, "ai_client.send was not called within timeout"
mock_send.assert_called_once_with("Context", "Hello AI", ".", [], "History")
# 4. Wait for the response to propagate to _pending_gui_tasks and update UI
# We call _process_pending_gui_tasks manually to simulate a GUI frame update.
start_time = time.time()
success = False
while time.time() - start_time < 3:
app._process_pending_gui_tasks()
if app.ai_response == mock_response and app.ai_status == "done":
success = True
break
time.sleep(0.1)
assert success, f"UI state was not updated. ai_response: '{app.ai_response}', status: '{app.ai_status}'"
assert app.ai_response == mock_response
assert app.ai_status == "done"
@pytest.mark.timeout(10)
def test_user_request_error_handling(mock_app):
"""
Verifies that if ai_client.send raises an exception, the UI is updated with the error state.
"""
app = mock_app
with (
patch('ai_client.send', side_effect=Exception("API Failure")) as mock_send,
patch('ai_client.set_custom_system_prompt'),
patch('ai_client.set_model_params'),
patch('ai_client.set_agent_tools')
):
event = UserRequestEvent(
prompt="Trigger Error",
stable_md="",
file_items=[],
disc_text="",
base_dir="."
)
asyncio.run_coroutine_threadsafe(
app.event_queue.put("user_request", event),
app._loop
)
# Poll for error state by processing GUI tasks
start_time = time.time()
success = False
while time.time() - start_time < 5:
app._process_pending_gui_tasks()
if app.ai_status == "error" and "ERROR: API Failure" in app.ai_response:
success = True
break
time.sleep(0.1)
assert success, f"Error state was not reflected in UI. status: {app.ai_status}, response: {app.ai_response}"
+4 -4
View File
@@ -24,26 +24,26 @@ from gui_2 import App
@pytest.fixture @pytest.fixture
def mock_config(tmp_path): def mock_config(tmp_path):
config_path = tmp_path / "config.toml" config_path = tmp_path / "config.toml"
config_path.write_text("[projects] config_path.write_text("""[projects]
paths = [] paths = []
active = "" active = ""
[ai] [ai]
provider = "gemini" provider = "gemini"
model = "model" model = "model"
", encoding="utf-8") """, encoding="utf-8")
return config_path return config_path
@pytest.fixture @pytest.fixture
def mock_project(tmp_path): def mock_project(tmp_path):
project_path = tmp_path / "project.toml" project_path = tmp_path / "project.toml"
project_path.write_text("[project] project_path.write_text("""[project]
name = "test" name = "test"
[discussion] [discussion]
roles = ["User", "AI"] roles = ["User", "AI"]
active = "main" active = "main"
[discussion.discussions.main] [discussion.discussions.main]
history = [] history = []
", encoding="utf-8") """, encoding="utf-8")
return project_path return project_path
def test_log_management_init(mock_config, mock_project, monkeypatch): def test_log_management_init(mock_config, mock_project, monkeypatch):
+176
View File
@@ -0,0 +1,176 @@
import pytest
from models import Ticket, Track, WorkerContext
def test_ticket_instantiation():
"""
Verifies that a Ticket can be instantiated with its required fields:
id, description, status, assigned_to.
"""
ticket_id = "T1"
description = "Implement surgical code changes"
status = "todo"
assigned_to = "tier3-worker"
ticket = Ticket(
id=ticket_id,
description=description,
status=status,
assigned_to=assigned_to
)
assert ticket.id == ticket_id
assert ticket.description == description
assert ticket.status == status
assert ticket.assigned_to == assigned_to
assert ticket.depends_on == []
def test_ticket_with_dependencies():
"""
Verifies that a Ticket can store dependencies.
"""
ticket = Ticket(
id="T2",
description="Write code",
status="todo",
assigned_to="worker-1",
depends_on=["T1"]
)
assert ticket.depends_on == ["T1"]
def test_track_instantiation():
"""
Verifies that a Track can be instantiated with its required fields:
id, description, and a list of Tickets.
"""
ticket1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="a")
ticket2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="b")
track_id = "TRACK-1"
track_desc = "Implement MMA Models"
tickets = [ticket1, ticket2]
track = Track(
id=track_id,
description=track_desc,
tickets=tickets
)
assert track.id == track_id
assert track.description == track_desc
assert len(track.tickets) == 2
assert track.tickets[0].id == "T1"
assert track.tickets[1].id == "T2"
def test_track_can_handle_empty_tickets():
"""
Verifies that a Track can be instantiated with an empty list of tickets.
"""
track = Track(id="TRACK-2", description="Empty Track", tickets=[])
assert track.tickets == []
def test_worker_context_instantiation():
"""
Verifies that a WorkerContext can be instantiated with ticket_id,
model_name, and messages.
"""
ticket_id = "T1"
model_name = "gemini-2.0-flash-lite"
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"}
]
context = WorkerContext(
ticket_id=ticket_id,
model_name=model_name,
messages=messages
)
assert context.ticket_id == ticket_id
assert context.model_name == model_name
assert context.messages == messages
def test_ticket_mark_blocked():
"""
Verifies that ticket.mark_blocked(reason) sets the status to 'blocked'.
Note: The reason field might need to be added to the Ticket class.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="a")
ticket.mark_blocked("Waiting for API key")
assert ticket.status == "blocked"
def test_ticket_mark_complete():
"""
Verifies that ticket.mark_complete() sets the status to 'completed'.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="a")
ticket.mark_complete()
assert ticket.status == "completed"
def test_track_get_executable_tickets():
"""
Verifies that track.get_executable_tickets() returns only 'todo' tickets
whose dependencies are all 'completed'.
"""
# T1: todo, no deps -> executable
t1 = Ticket(id="T1", description="T1", status="todo", assigned_to="a")
# T2: todo, deps [T1] -> not executable (T1 is todo)
t2 = Ticket(id="T2", description="T2", status="todo", assigned_to="a", depends_on=["T1"])
# T3: todo, deps [T4] -> not executable (T4 is blocked)
t3 = Ticket(id="T3", description="T3", status="todo", assigned_to="a", depends_on=["T4"])
# T4: blocked, no deps -> not executable (not 'todo')
t4 = Ticket(id="T4", description="T4", status="blocked", assigned_to="a")
# T5: completed, no deps -> not executable (not 'todo')
t5 = Ticket(id="T5", description="T5", status="completed", assigned_to="a")
# T6: todo, deps [T5] -> executable (T5 is completed)
t6 = Ticket(id="T6", description="T6", status="todo", assigned_to="a", depends_on=["T5"])
track = Track(id="TR1", description="Track 1", tickets=[t1, t2, t3, t4, t5, t6])
executable = track.get_executable_tickets()
executable_ids = [t.id for t in executable]
assert "T1" in executable_ids
assert "T6" in executable_ids
assert len(executable_ids) == 2
def test_track_get_executable_tickets_complex():
"""
Verifies executable tickets with complex dependency chains.
Chain: T1 (comp) -> T2 (todo) -> T3 (todo)
T4 (comp) -> T3
T5 (todo) -> T3
"""
t1 = Ticket(id="T1", description="T1", status="completed", assigned_to="a")
t2 = Ticket(id="T2", description="T2", status="todo", assigned_to="a", depends_on=["T1"])
t3 = Ticket(id="T3", description="T3", status="todo", assigned_to="a", depends_on=["T2", "T4", "T5"])
t4 = Ticket(id="T4", description="T4", status="completed", assigned_to="a")
t5 = Ticket(id="T5", description="T5", status="todo", assigned_to="a")
track = Track(id="TR1", description="Track 1", tickets=[t1, t2, t3, t4, t5])
# At this point:
# T1 is completed
# T4 is completed
# T2 is todo, depends on T1 (completed) -> Executable
# T5 is todo, no deps -> Executable
# T3 is todo, depends on T2 (todo), T4 (completed), T5 (todo) -> Not executable
executable = track.get_executable_tickets()
executable_ids = sorted([t.id for t in executable])
assert executable_ids == ["T2", "T5"]
# Mark T2 complete
t2.mark_complete()
# T3 still depends on T5
executable = track.get_executable_tickets()
executable_ids = sorted([t.id for t in executable])
assert executable_ids == ["T5"]
# Mark T5 complete
t5.mark_complete()
# Now T3 should be executable
executable = track.get_executable_tickets()
executable_ids = sorted([t.id for t in executable])
assert executable_ids == ["T3"]
+52
View File
@@ -0,0 +1,52 @@
import pytest
from mma_prompts import PROMPTS
def test_tier1_epic_init_constraints():
prompt = PROMPTS["tier1_epic_init"]
assert "Godot ECS Flat List format" in prompt
assert "JSON array" in prompt
assert "Tracks" in prompt
assert "severity" in prompt
assert "IGNORE all source code" in prompt
def test_tier1_track_delegation_constraints():
prompt = PROMPTS["tier1_track_delegation"]
assert "Track Brief" in prompt
assert "AST Skeleton View" in prompt
assert "IGNORE unrelated module docs" in prompt
def test_tier1_macro_merge_constraints():
prompt = PROMPTS["tier1_macro_merge"]
assert "Macro-Merge" in prompt
assert "Macro-Diff" in prompt
assert "IGNORE Tier 3 trial-and-error" in prompt
def test_tier2_sprint_planning_constraints():
prompt = PROMPTS["tier2_sprint_planning"]
assert "Tickets" in prompt
assert "Godot ECS Flat List format" in prompt
assert "depends_on" in prompt
assert "DAG" in prompt
assert "Skeleton View" in prompt
assert "Curated Implementation View" in prompt
def test_tier2_code_review_constraints():
prompt = PROMPTS["tier2_code_review"]
assert "Code Review" in prompt
assert "IGNORE the Contributor's internal trial-and-error" in prompt
assert "Tier 4 (QA) logs" in prompt
def test_tier2_track_finalization_constraints():
prompt = PROMPTS["tier2_track_finalization"]
assert "Track Finalization" in prompt
assert "Executive Summary" in prompt
assert "Macro-Diff" in prompt
assert "Dependency Delta" in prompt
def test_tier2_contract_first_constraints():
prompt = PROMPTS["tier2_contract_first"]
assert "Stub Ticket" in prompt
assert "Consumer Ticket" in prompt
assert "Implementation Ticket" in prompt
assert "Interface-Driven Development" in prompt
assert "Godot ECS Flat List format" in prompt
+1 -1
View File
@@ -27,7 +27,7 @@ def test_redundant_calls_in_process_pending_gui_tasks(app_instance):
{'action': 'set_value', 'item': 'current_provider', 'value': 'anthropic'} {'action': 'set_value', 'item': 'current_provider', 'value': 'anthropic'}
] ]
with patch('ai_client.set_provider') as mock_set_provider, with patch('ai_client.set_provider') as mock_set_provider, \
patch('ai_client.reset_session') as mock_reset_session: patch('ai_client.reset_session') as mock_reset_session:
# We need to make sure the property setter's internal calls are also tracked or mocked. # We need to make sure the property setter's internal calls are also tracked or mocked.
+224
View File
@@ -0,0 +1,224 @@
import pytest
from unittest.mock import MagicMock, patch
import subprocess
from shell_runner import run_powershell
def test_run_powershell_qa_callback_on_failure():
"""
Test that qa_callback is called when a powershell command fails (non-zero exit code).
The result of the callback should be appended to the output.
"""
script = "Write-Error 'something went wrong'; exit 1"
base_dir = "."
# Mocking subprocess.run to simulate failure
mock_result = MagicMock()
mock_result.stdout = ""
mock_result.stderr = "something went wrong"
mock_result.returncode = 1
qa_callback = MagicMock(return_value="QA ANALYSIS: This looks like a syntax error.")
with patch("subprocess.run", return_value=mock_result), \
patch("shutil.which", return_value="powershell.exe"):
# We expect run_powershell to accept qa_callback
output = run_powershell(script, base_dir, qa_callback=qa_callback)
# Verify callback was called with stderr
qa_callback.assert_called_once_with("something went wrong")
# Verify output contains the callback result
assert "QA ANALYSIS: This looks like a syntax error." in output
assert "STDERR:\nsomething went wrong" in output
assert "EXIT CODE: 1" in output
def test_run_powershell_qa_callback_on_stderr_only():
"""
Test that qa_callback is called when a command has stderr even if exit code is 0.
"""
script = "Write-Error 'non-fatal error'"
base_dir = "."
mock_result = MagicMock()
mock_result.stdout = "Success"
mock_result.stderr = "non-fatal error"
mock_result.returncode = 0
qa_callback = MagicMock(return_value="QA ANALYSIS: Ignorable warning.")
with patch("subprocess.run", return_value=mock_result), \
patch("shutil.which", return_value="powershell.exe"):
output = run_powershell(script, base_dir, qa_callback=qa_callback)
qa_callback.assert_called_once_with("non-fatal error")
assert "QA ANALYSIS: Ignorable warning." in output
assert "STDOUT:\nSuccess" in output
def test_run_powershell_no_qa_callback_on_success():
"""
Test that qa_callback is NOT called when the command succeeds without stderr.
"""
script = "Write-Output 'All good'"
base_dir = "."
mock_result = MagicMock()
mock_result.stdout = "All good"
mock_result.stderr = ""
mock_result.returncode = 0
qa_callback = MagicMock()
with patch("subprocess.run", return_value=mock_result), \
patch("shutil.which", return_value="powershell.exe"):
output = run_powershell(script, base_dir, qa_callback=qa_callback)
qa_callback.assert_not_called()
assert "STDOUT:\nAll good" in output
assert "EXIT CODE: 0" in output
assert "QA ANALYSIS" not in output
def test_run_powershell_optional_qa_callback():
"""
Test that run_powershell still works without providing a qa_callback.
"""
script = "Write-Error 'error'"
base_dir = "."
mock_result = MagicMock()
mock_result.stdout = ""
mock_result.stderr = "error"
mock_result.returncode = 1
with patch("subprocess.run", return_value=mock_result), \
patch("shutil.which", return_value="powershell.exe"):
# Should not raise TypeError even if qa_callback is not provided
output = run_powershell(script, base_dir)
assert "STDERR:\nerror" in output
assert "EXIT CODE: 1" in output
def test_end_to_end_tier4_integration():
"""
Verifies that shell_runner.run_powershell correctly uses ai_client.run_tier4_analysis.
"""
import ai_client
script = "Invoke-Item non_existent_file"
base_dir = "."
stderr_content = "Invoke-Item : Cannot find path 'C:\\non_existent_file' because it does not exist."
mock_result = MagicMock()
mock_result.stdout = ""
mock_result.stderr = stderr_content
mock_result.returncode = 1
expected_analysis = "Path does not exist. Verify the file path and ensure the file is present before invoking."
with patch("subprocess.run", return_value=mock_result), \
patch("shutil.which", return_value="powershell.exe"), \
patch("ai_client.run_tier4_analysis", return_value=expected_analysis) as mock_analysis:
output = run_powershell(script, base_dir, qa_callback=ai_client.run_tier4_analysis)
mock_analysis.assert_called_once_with(stderr_content)
assert f"QA ANALYSIS:\n{expected_analysis}" in output
def test_ai_client_passes_qa_callback():
"""
Verifies that ai_client.send passes the qa_callback down to the provider function.
"""
import ai_client
# Mocking a provider function to avoid actual API calls
mock_send_gemini = MagicMock(return_value="AI Response")
qa_callback = MagicMock(return_value="QA Analysis")
# Force provider to gemini and mock its send function
with patch("ai_client._provider", "gemini"), \
patch("ai_client._send_gemini", mock_send_gemini):
ai_client.send(
md_content="Context",
user_message="Hello",
qa_callback=qa_callback
)
# Verify provider received the qa_callback
mock_send_gemini.assert_called_once()
args, kwargs = mock_send_gemini.call_args
# qa_callback is the 7th positional argument in _send_gemini
assert args[6] == qa_callback
def test_gemini_provider_passes_qa_callback_to_run_script():
"""
Verifies that _send_gemini passes the qa_callback to _run_script.
"""
import ai_client
# Mock Gemini chat and client
mock_client = MagicMock()
mock_chat = MagicMock()
# Simulate a tool call response
mock_part = MagicMock()
mock_part.text = ""
mock_part.function_call = MagicMock()
mock_part.function_call.name = "run_powershell"
mock_part.function_call.args = {"script": "dir"}
mock_candidate = MagicMock()
mock_candidate.content.parts = [mock_part]
mock_candidate.finish_reason.name = "STOP"
mock_response = MagicMock()
mock_response.candidates = [mock_candidate]
mock_response.usage_metadata.prompt_token_count = 10
mock_response.usage_metadata.candidates_token_count = 5
# Second call returns a stop response to break the loop
mock_stop_part = MagicMock()
mock_stop_part.text = "Done"
mock_stop_part.function_call = None
mock_stop_candidate = MagicMock()
mock_stop_candidate.content.parts = [mock_stop_part]
mock_stop_candidate.finish_reason.name = "STOP"
mock_stop_response = MagicMock()
mock_stop_response.candidates = [mock_stop_candidate]
mock_stop_response.usage_metadata.prompt_token_count = 5
mock_stop_response.usage_metadata.candidates_token_count = 2
mock_chat.send_message.side_effect = [mock_response, mock_stop_response]
# Mock count_tokens to avoid chat creation failure
mock_count_resp = MagicMock()
mock_count_resp.total_tokens = 100
mock_client.models.count_tokens.return_value = mock_count_resp
qa_callback = MagicMock()
# Set global state for the test
with patch("ai_client._gemini_client", mock_client), \
patch("ai_client._gemini_chat", None), \
patch("ai_client._ensure_gemini_client"), \
patch("ai_client._run_script", return_value="output") as mock_run_script, \
patch("ai_client._get_gemini_history_list", return_value=[]):
# Ensure chats.create returns our mock_chat
mock_client.chats.create.return_value = mock_chat
ai_client._send_gemini(
md_content="Context",
user_message="Run dir",
base_dir=".",
qa_callback=qa_callback
)
# Verify _run_script received the qa_callback
mock_run_script.assert_called_once_with("dir", ".", qa_callback)
+30
View File
@@ -0,0 +1,30 @@
import tree_sitter_python as tspython
from tree_sitter import Language, Parser
def test_tree_sitter_python_setup():
"""
Verifies that tree-sitter and tree-sitter-python are correctly installed
and can parse a simple Python function string.
"""
# Initialize the Python language and parser
PY_LANGUAGE = Language(tspython.language())
parser = Parser(PY_LANGUAGE)
# Simple Python code to parse
code = """def hello():
print('world')"""
# Parse the code
tree = parser.parse(bytes(code, "utf8"))
# Assert that the root node is a 'module'
assert tree.root_node.type == "module"
# Verify we can find a function definition
found_function = False
for child in tree.root_node.children:
if child.type == "function_definition":
found_function = True
break
assert found_function, "Should have found a function_definition node"