Compare commits

..

7 Commits

Author SHA1 Message Date
Ed_
90670b9671 feat(tier4): Integrate patch generation into GUI workflow
- Add patch_callback parameter throughout the tool execution chain
- Add _render_patch_modal() to gui_2.py with colored diff display
- Add patch modal state variables to App.__init__
- Add request_patch_from_tier4() to trigger patch generation
- Add run_tier4_patch_callback() to ai_client.py
- Update shell_runner to accept and execute patch_callback
- Diff colors: green for additions, red for deletions, cyan for headers
- 36 tests passing
2026-03-07 00:26:34 -05:00
Ed_
72a71706e3 conductor(plan): Mark Phase 5 complete - all phases done
Summary of implementation:
- Phase 1: Tier 4 patch generation (run_tier4_patch_generation)
- Phase 2: Diff parser and renderer (src/diff_viewer.py)
- Phase 3: Patch application with backup/rollback
- Phase 4: Patch modal manager for approval workflow
- Phase 5: 29 unit tests passing
2026-03-07 00:15:42 -05:00
Ed_
d58816620a feat(modal): Add patch approval modal manager
- Create src/patch_modal.py with PatchModalManager class
- Manage patch approval workflow: request, apply, reject
- Provide singleton access via get_patch_modal_manager()
- Add 8 unit tests for modal manager
2026-03-07 00:15:06 -05:00
Ed_
125cbc6dd0 feat(patch): Add patch application and backup functions
- Add create_backup() to backup files before patching
- Add apply_patch_to_file() to apply unified diff
- Add restore_from_backup() for rollback
- Add cleanup_backup() to remove backup files
- Add 15 unit tests for all patch operations
2026-03-07 00:14:23 -05:00
Ed_
99a5d7045f feat(diff): Add diff rendering helpers for GUI
- Add get_line_color() to classify diff lines
- Add render_diff_text_immediate() for immediate mode rendering
- Add tests for rendering functions
2026-03-07 00:13:10 -05:00
Ed_
130001c0ba feat(diff): Add diff parser for unified diff format
- Create src/diff_viewer.py with parse_diff function
- Parse unified diff into DiffFile and DiffHunk dataclasses
- Extract file paths, hunk headers, and line changes
- Add unit tests for diff parser
2026-03-07 00:12:06 -05:00
Ed_
da58f46e89 conductor(plan): Mark Phase 1 tasks complete 2026-03-07 00:11:17 -05:00
12 changed files with 711 additions and 38 deletions

View File

@@ -26,7 +26,7 @@ This file tracks all major tracks for the project. Each track has its own detail
3. [x] **Track: Visual DAG & Interactive Ticket Editing**
*Link: [./tracks/visual_dag_ticket_editing_20260306/](./tracks/visual_dag_ticket_editing_20260306/)*
4. [ ] **Track: Advanced Tier 4 QA Auto-Patching**
4. [~] **Track: Advanced Tier 4 QA Auto-Patching**
*Link: [./tracks/tier4_auto_patching_20260306/](./tracks/tier4_auto_patching_20260306/)*
5. [ ] **Track: Transitioning to Native Orchestrator**

View File

@@ -6,7 +6,7 @@
Focus: Generate unified diff on test failure
- [x] Task 1.1: Initialize MMA Environment
- [~] Task 1.2: Extend Tier 4 prompt for patch generation
- [x] Task 1.2: Extend Tier 4 prompt for patch generation
- WHERE: `src/mma_prompts.py` or inline in `ai_client.py`
- WHAT: Prompt to generate unified diff
- HOW:
@@ -24,7 +24,7 @@ Focus: Generate unified diff on test failure
"""
```
- [ ] Task 1.3: Add patch generation function
- [x] Task 1.3: Add patch generation function
- WHERE: `src/ai_client.py`
- WHAT: Generate patch from error
- HOW:
@@ -119,5 +119,5 @@ Focus: Approval modal for patches
```
## Phase 5: Testing
- [ ] Task 5.1: Write unit tests
- [ ] Task 5.2: Conductor - Phase Verification
- [x] Task 5.1: Write unit tests
- [x] Task 5.2: Conductor - Phase Verification

View File

@@ -85,7 +85,7 @@ _send_lock: threading.Lock = threading.Lock()
_gemini_cli_adapter: Optional[GeminiCliAdapter] = None
# Injected by gui.py - called when AI wants to run a command.
confirm_and_run_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None
confirm_and_run_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]], Optional[Callable[[str, str], Optional[str]]]], Optional[str]]] = None
# Injected by gui.py - called whenever a comms entry is appended.
comms_log_callback: Optional[Callable[[dict[str, Any]], None]] = None
@@ -558,7 +558,8 @@ async def _execute_tool_calls_concurrently(
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]],
qa_callback: Optional[Callable[[str], str]],
r_idx: int,
provider: str
provider: str,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None
) -> list[tuple[str, str, str, str]]: # tool_name, call_id, output, original_name
"""
Executes multiple tool calls concurrently using asyncio.gather.
@@ -589,7 +590,7 @@ async def _execute_tool_calls_concurrently(
else:
continue
tasks.append(_execute_single_tool_call_async(name, args, call_id, base_dir, pre_tool_callback, qa_callback, r_idx))
tasks.append(_execute_single_tool_call_async(name, args, call_id, base_dir, pre_tool_callback, qa_callback, r_idx, patch_callback))
results = await asyncio.gather(*tasks)
return results
@@ -601,7 +602,8 @@ async def _execute_single_tool_call_async(
base_dir: str,
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]],
qa_callback: Optional[Callable[[str], str]],
r_idx: int
r_idx: int,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None
) -> tuple[str, str, str, str]:
out = ""
tool_executed = False
@@ -631,16 +633,16 @@ async def _execute_single_tool_call_async(
elif name == TOOL_NAME:
scr = cast(str, args.get("script", ""))
_append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": call_id, "script": scr})
out = await asyncio.to_thread(_run_script, scr, base_dir, qa_callback)
out = await asyncio.to_thread(_run_script, scr, base_dir, qa_callback, patch_callback)
else:
out = f"ERROR: unknown tool '{name}'"
return (name, call_id, out, name)
def _run_script(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str:
def _run_script(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> str:
if confirm_and_run_callback is None:
return "ERROR: no confirmation handler registered"
result = confirm_and_run_callback(script, base_dir, qa_callback)
result = confirm_and_run_callback(script, base_dir, qa_callback, patch_callback)
if result is None:
output = "USER REJECTED: command was not executed"
else:
@@ -799,7 +801,8 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None,
qa_callback: Optional[Callable[[str], str]] = None,
enable_tools: bool = True,
stream_callback: Optional[Callable[[str], None]] = None) -> str:
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> str:
global _gemini_chat, _gemini_cache, _gemini_cache_md_hash, _gemini_cache_created_at
try:
_ensure_gemini_client(); mcp_client.configure(file_items or [], [base_dir])
@@ -979,11 +982,11 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
try:
loop = asyncio.get_running_loop()
results = asyncio.run_coroutine_threadsafe(
_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini"),
_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini", patch_callback),
loop
).result()
except RuntimeError:
results = asyncio.run(_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini"))
results = asyncio.run(_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini", patch_callback))
for i, (name, call_id, out, _) in enumerate(results):
# Check if this is the last tool to trigger file refresh
@@ -1079,11 +1082,11 @@ def _send_gemini_cli(md_content: str, user_message: str, base_dir: str,
try:
loop = asyncio.get_running_loop()
results = asyncio.run_coroutine_threadsafe(
_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini_cli"),
_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini_cli", patch_callback),
loop
).result()
except RuntimeError:
results = asyncio.run(_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini_cli"))
results = asyncio.run(_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini_cli", patch_callback))
tool_results_for_cli: list[dict[str, Any]] = []
for i, (name, call_id, out, _) in enumerate(results):
@@ -1404,7 +1407,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
loop
).result()
except RuntimeError:
results = asyncio.run(_execute_tool_calls_concurrently(response.content, base_dir, pre_tool_callback, qa_callback, round_idx, "anthropic"))
results = asyncio.run(_execute_tool_calls_concurrently(response.content, base_dir, pre_tool_callback, qa_callback, round_idx, "anthropic", patch_callback))
tool_results: list[dict[str, Any]] = []
for i, (name, call_id, out, _) in enumerate(results):
@@ -1675,11 +1678,11 @@ def _send_deepseek(md_content: str, user_message: str, base_dir: str,
try:
loop = asyncio.get_running_loop()
results = asyncio.run_coroutine_threadsafe(
_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "deepseek"),
_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "deepseek", patch_callback),
loop
).result()
except RuntimeError:
results = asyncio.run(_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "deepseek"))
results = asyncio.run(_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "deepseek", patch_callback))
tool_results_for_history: list[dict[str, Any]] = []
for i, (name, call_id, out, _) in enumerate(results):
@@ -1891,11 +1894,11 @@ def _send_minimax(md_content: str, user_message: str, base_dir: str,
try:
loop = asyncio.get_running_loop()
results = asyncio.run_coroutine_threadsafe(
_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "minimax"),
_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "minimax", patch_callback),
loop
).result()
except RuntimeError:
results = asyncio.run(_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "minimax"))
results = asyncio.run(_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "minimax", patch_callback))
tool_results_for_history: list[dict[str, Any]] = []
for i, (name, call_id, out, _) in enumerate(results):
@@ -1962,6 +1965,23 @@ def run_tier4_analysis(stderr: str) -> str:
return f"[QA ANALYSIS FAILED] {e}"
def run_tier4_patch_callback(stderr: str, base_dir: str) -> Optional[str]:
try:
from src import project_manager
file_items = project_manager.get_current_file_items()
file_context = ""
for item in file_items[:5]:
path = item.get("path", "")
content = item.get("content", "")[:2000]
file_context += f"\n\nFile: {path}\n```\n{content}\n```\n"
patch = run_tier4_patch_generation(stderr, file_context)
if patch and "---" in patch and "+++" in patch:
return patch
return None
except Exception as e:
return None
def run_tier4_patch_generation(error: str, file_context: str) -> str:
if not error or not error.strip():
return ""
@@ -2032,32 +2052,33 @@ def send(
qa_callback: Optional[Callable[[str], str]] = None,
enable_tools: bool = True,
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None,
) -> str:
with _send_lock:
if _provider == "gemini":
return _send_gemini(
md_content, user_message, base_dir, file_items, discussion_history,
pre_tool_callback, qa_callback, enable_tools, stream_callback
pre_tool_callback, qa_callback, enable_tools, stream_callback, patch_callback
)
elif _provider == "gemini_cli":
return _send_gemini_cli(
md_content, user_message, base_dir, file_items, discussion_history,
pre_tool_callback, qa_callback, stream_callback
pre_tool_callback, qa_callback, stream_callback, patch_callback
)
elif _provider == "anthropic":
return _send_anthropic(
md_content, user_message, base_dir, file_items, discussion_history,
pre_tool_callback, qa_callback, stream_callback=stream_callback
pre_tool_callback, qa_callback, stream_callback=stream_callback, patch_callback=patch_callback
)
elif _provider == "deepseek":
return _send_deepseek(
md_content, user_message, base_dir, file_items, discussion_history,
stream, pre_tool_callback, qa_callback, stream_callback
stream, pre_tool_callback, qa_callback, stream_callback, patch_callback
)
elif _provider == "minimax":
return _send_minimax(
md_content, user_message, base_dir, file_items, discussion_history,
stream, pre_tool_callback, qa_callback, stream_callback
stream, pre_tool_callback, qa_callback, stream_callback, patch_callback
)
else:
raise ValueError(f"Unknown provider: {_provider}")
@@ -2079,16 +2100,16 @@ def _is_mutating_tool(name: str) -> bool:
"""Returns True if the tool name is considered a mutating tool."""
return name in mcp_client.MUTATING_TOOLS or name == TOOL_NAME
def _confirm_and_run(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> Optional[str]:
def _confirm_and_run(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Optional[str]:
"""
Wrapper for the confirm_and_run_callback.
This is what the providers call to trigger HITL approval.
"""
if confirm_and_run_callback:
return confirm_and_run_callback(script, base_dir, qa_callback)
return confirm_and_run_callback(script, base_dir, qa_callback, patch_callback)
# Fallback to direct execution if no callback registered (headless default)
from src import shell_runner
return shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback)
return shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback)
def get_history_bleed_stats(md_content: Optional[str] = None) -> dict[str, Any]:
if _provider == "anthropic":

View File

@@ -907,7 +907,8 @@ class AppController:
stream=True,
stream_callback=lambda text: self._on_ai_stream(text),
pre_tool_callback=self._confirm_and_run,
qa_callback=ai_client.run_tier4_analysis
qa_callback=ai_client.run_tier4_analysis,
patch_callback=ai_client.run_tier4_patch_callback
)
self.event_queue.put("response", {"text": resp, "status": "done", "role": "AI"})
except ai_client.ProviderError as e:
@@ -988,14 +989,14 @@ class AppController:
"ts": project_manager.now_ts()
})
def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> Optional[str]:
def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Optional[str]:
sys.stderr.write(f"[DEBUG] _confirm_and_run called. test_hooks={self.test_hooks_enabled}, manual_approve={getattr(self, 'ui_manual_approve', False)}\n")
sys.stderr.flush()
if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False):
sys.stderr.write("[DEBUG] Auto-approving script.\n")
sys.stderr.flush()
self._set_status("running powershell...")
output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback)
output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback)
self._append_tool_log(script, output)
self._set_status("powershell done, awaiting AI...")
return output
@@ -1033,7 +1034,7 @@ class AppController:
self._append_tool_log(final_script, "REJECTED by user")
return None
self._set_status("running powershell...")
output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback)
output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback)
self._append_tool_log(final_script, output)
self._set_status("powershell done, awaiting AI...")
return output

220
src/diff_viewer.py Normal file
View File

@@ -0,0 +1,220 @@
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
import shutil
import os
from pathlib import Path
@dataclass
class DiffHunk:
header: str
lines: List[str]
old_start: int
old_count: int
new_start: int
new_count: int
@dataclass
class DiffFile:
old_path: str
new_path: str
hunks: List[DiffHunk]
def parse_diff_header(line: str) -> tuple[Optional[str], Optional[str], Optional[tuple[int, int, int, int]]]:
if not line.startswith(("--- ", "+++ ")):
return None, None, None
if line.startswith("--- "):
path = line[4:]
if path.startswith("a/"):
path = path[2:]
return path, None, None
elif line.startswith("+++ "):
path = line[4:]
if path.startswith("b/"):
path = path[2:]
return None, path, None
return None, None, None
def parse_hunk_header(line: str) -> Optional[tuple[int, int, int, int]]:
if not line.startswith("@@"):
return None
parts = line.split()
if len(parts) < 2:
return None
old_part = parts[1][1:]
new_part = parts[2][1:]
old_parts = old_part.split(",")
new_parts = new_part.split(",")
old_start = int(old_parts[0])
old_count = int(old_parts[1]) if len(old_parts) > 1 else 1
new_start = int(new_parts[0])
new_count = int(new_parts[1]) if len(new_parts) > 1 else 1
return (old_start, old_count, new_start, new_count)
def parse_diff(diff_text: str) -> List[DiffFile]:
if not diff_text or not diff_text.strip():
return []
files: List[DiffFile] = []
current_file: Optional[DiffFile] = None
current_hunk: Optional[DiffHunk] = None
for line in diff_text.split("\n"):
if line.startswith("--- "):
if current_file:
if current_hunk:
current_file.hunks.append(current_hunk)
current_hunk = None
files.append(current_file)
path = line[4:]
if path.startswith("a/"):
path = path[2:]
current_file = DiffFile(old_path=path, new_path="", hunks=[])
elif line.startswith("+++ ") and current_file:
path = line[4:]
if path.startswith("b/"):
path = path[2:]
current_file.new_path = path
elif line.startswith("@@") and current_file:
if current_hunk:
current_file.hunks.append(current_hunk)
hunk_info = parse_hunk_header(line)
if hunk_info:
old_start, old_count, new_start, new_count = hunk_info
current_hunk = DiffHunk(
header=line,
lines=[],
old_start=old_start,
old_count=old_count,
new_start=new_start,
new_count=new_count
)
else:
current_hunk = DiffHunk(
header=line,
lines=[],
old_start=0,
old_count=0,
new_start=0,
new_count=0
)
elif current_hunk is not None:
current_hunk.lines.append(line)
elif line and not line.startswith("diff ") and not line.startswith("index "):
pass
if current_file:
if current_hunk:
current_file.hunks.append(current_hunk)
files.append(current_file)
return files
def format_diff_for_display(diff_files: List[DiffFile]) -> str:
output = []
for df in diff_files:
output.append(f"File: {df.old_path}")
for hunk in df.hunks:
output.append(f" {hunk.header}")
for line in hunk.lines:
output.append(f" {line}")
return "\n".join(output)
def get_line_color(line: str) -> Optional[str]:
if line.startswith("+"):
return "green"
elif line.startswith("-"):
return "red"
elif line.startswith("@@"):
return "cyan"
return None
def render_diff_text_immediate(diff_files: List[DiffFile]) -> List[tuple[str, Optional[str]]]:
output: List[tuple[str, Optional[str]]] = []
for df in diff_files:
output.append((f"File: {df.old_path}", "white"))
for hunk in df.hunks:
output.append((hunk.header, "cyan"))
for line in hunk.lines:
color = get_line_color(line)
output.append((line, color))
return output
def create_backup(file_path: str) -> Optional[str]:
path = Path(file_path)
if not path.exists():
return None
backup_path = path.with_suffix(path.suffix + ".backup")
shutil.copy2(path, backup_path)
return str(backup_path)
def apply_patch_to_file(patch_text: str, base_dir: str = ".") -> Tuple[bool, str]:
import difflib
diff_files = parse_diff(patch_text)
if not diff_files:
return False, "No valid diff found"
results = []
for df in diff_files:
file_path = Path(base_dir) / df.old_path
if not file_path.exists():
results.append(f"File not found: {file_path}")
continue
try:
with open(file_path, "r", encoding="utf-8") as f:
original_lines = f.read().splitlines(keepends=True)
new_lines = original_lines.copy()
offset = 0
for hunk in df.hunks:
hunk_old_start = hunk.old_start - 1
hunk_old_count = hunk.old_count
replace_start = hunk_old_start + offset
replace_count = hunk_old_count
hunk_new_content: List[str] = []
for line in hunk.lines:
if line.startswith("+") and not line.startswith("+++"):
hunk_new_content.append(line[1:] + "\n")
elif line.startswith(" ") or (line and not line.startswith(("-", "+", "@@"))):
hunk_new_content.append(line + "\n")
new_lines = new_lines[:replace_start] + hunk_new_content + new_lines[replace_start + replace_count:]
offset += len(hunk_new_content) - replace_count
with open(file_path, "w", encoding="utf-8", newline="") as f:
f.writelines(new_lines)
results.append(f"Patched: {file_path}")
except Exception as e:
return False, f"Error patching {file_path}: {e}"
return True, "\n".join(results)
def restore_from_backup(file_path: str) -> bool:
backup_path = Path(str(file_path) + ".backup")
if not backup_path.exists():
return False
shutil.copy2(backup_path, file_path)
return True
def cleanup_backup(file_path: str) -> None:
backup_path = Path(str(file_path) + ".backup")
if backup_path.exists():
backup_path.unlink()

View File

@@ -1,4 +1,4 @@
# gui_2.py
# gui_2.py
from __future__ import annotations
import tomli_w
import time
@@ -114,6 +114,11 @@ class App:
self._tool_log_dirty: bool = True
self._last_ui_focus_agent: Optional[str] = None
self._log_registry: Optional[log_registry.LogRegistry] = None
# Patch viewer state for Tier 4 auto-patching
self._pending_patch_text: Optional[str] = None
self._pending_patch_files: list[str] = []
self._show_patch_modal: bool = False
self._patch_error_message: Optional[str] = None
def _handle_approve_tool(self, user_data=None) -> None:
"""UI-level wrapper for approving a pending tool execution ask."""
@@ -254,6 +259,7 @@ class App:
self._process_pending_gui_tasks()
self._process_pending_history_adds()
self._render_track_proposal_modal()
self._render_patch_modal()
# Auto-save (every 60s)
now = time.time()
if now - self._last_autosave >= self._autosave_interval:
@@ -873,6 +879,82 @@ class App:
imgui.close_current_popup()
imgui.end_popup()
def _render_patch_modal(self) -> None:
if not self._show_patch_modal:
return
imgui.open_popup("Apply Patch?")
if imgui.begin_popup_modal("Apply Patch?", True, imgui.ImVec2(600, 500))[0]:
imgui.text_colored(imgui.ImVec4(1, 0.9, 0.3, 1), "Tier 4 QA Generated a Patch")
imgui.separator()
if self._pending_patch_files:
imgui.text("Files to modify:")
for f in self._pending_patch_files:
imgui.text(f" - {f}")
imgui.separator()
if self._patch_error_message:
imgui.text_colored(imgui.ImVec4(1, 0.3, 0.3, 1), f"Error: {self._patch_error_message}")
imgui.separator()
imgui.text("Diff Preview:")
imgui.begin_child("patch_diff_scroll", imgui.ImVec2(-1, 280), True)
if self._pending_patch_text:
diff_lines = self._pending_patch_text.split("\n")
for line in diff_lines:
if line.startswith("+++") or line.startswith("---") or line.startswith("@@"):
imgui.text_colored(imgui.ImVec4(0.3, 0.7, 1, 1), line)
elif line.startswith("+"):
imgui.text_colored(imgui.ImVec4(0.2, 0.9, 0.2, 1), line)
elif line.startswith("-"):
imgui.text_colored(imgui.ImVec4(0.9, 0.2, 0.2, 1), line)
else:
imgui.text(line)
imgui.end_child()
imgui.separator()
if imgui.button("Apply Patch", imgui.ImVec2(150, 0)):
self._apply_pending_patch()
imgui.same_line()
if imgui.button("Reject", imgui.ImVec2(150, 0)):
self._show_patch_modal = False
self._pending_patch_text = None
self._pending_patch_files = []
self._patch_error_message = None
imgui.close_current_popup()
imgui.end_popup()
def _apply_pending_patch(self) -> None:
if not self._pending_patch_text:
self._patch_error_message = "No patch to apply"
return
try:
from src.diff_viewer import apply_patch_to_file
base_dir = str(self.controller.current_project_dir) if hasattr(self.controller, 'current_project_dir') else "."
success, msg = apply_patch_to_file(self._pending_patch_text, base_dir)
if success:
self._show_patch_modal = False
self._pending_patch_text = None
self._pending_patch_files = []
self._patch_error_message = None
imgui.close_current_popup()
else:
self._patch_error_message = msg
except Exception as e:
self._patch_error_message = str(e)
def request_patch_from_tier4(self, error: str, file_context: str) -> None:
try:
from src import ai_client
from src.diff_viewer import parse_diff
patch_text = ai_client.run_tier4_patch_generation(error, file_context)
if patch_text and "---" in patch_text and "+++" in patch_text:
diff_files = parse_diff(patch_text)
file_paths = [df.old_path for df in diff_files]
self._pending_patch_text = patch_text
self._pending_patch_files = file_paths
self._show_patch_modal = True
else:
self._patch_error_message = patch_text or "No patch generated"
except Exception as e:
self._patch_error_message = str(e)
def _render_log_management(self) -> None:
exp, opened = imgui.begin("Log Management", self.show_windows["Log Management"])
self.show_windows["Log Management"] = bool(opened)

View File

@@ -410,6 +410,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
base_dir=".",
pre_tool_callback=clutch_callback if ticket.step_mode else None,
qa_callback=ai_client.run_tier4_analysis,
patch_callback=ai_client.run_tier4_patch_callback,
stream_callback=stream_callback
)
finally:

73
src/patch_modal.py Normal file
View File

@@ -0,0 +1,73 @@
from typing import Optional, Callable, List
from dataclasses import dataclass, field
@dataclass
class PendingPatch:
patch_text: str
file_paths: List[str]
generated_by: str
timestamp: float
class PatchModalManager:
def __init__(self):
self._pending_patch: Optional[PendingPatch] = None
self._show_modal: bool = False
self._on_apply_callback: Optional[Callable[[str], bool]] = None
self._on_reject_callback: Optional[Callable[[], None]] = None
def request_patch_approval(self, patch_text: str, file_paths: List[str], generated_by: str = "Tier 4 QA") -> bool:
from time import time
self._pending_patch = PendingPatch(
patch_text=patch_text,
file_paths=file_paths,
generated_by=generated_by,
timestamp=time()
)
self._show_modal = True
return True
def get_pending_patch(self) -> Optional[PendingPatch]:
return self._pending_patch
def is_modal_shown(self) -> bool:
return self._show_modal
def set_apply_callback(self, callback: Callable[[str], bool]) -> None:
self._on_apply_callback = callback
def set_reject_callback(self, callback: Callable[[], None]) -> None:
self._on_reject_callback = callback
def apply_patch(self, patch_text: str) -> bool:
if self._on_apply_callback:
return self._on_apply_callback(patch_text)
return False
def reject_patch(self) -> None:
self._pending_patch = None
self._show_modal = False
if self._on_reject_callback:
self._on_reject_callback()
def close_modal(self) -> None:
self._show_modal = False
def reset(self) -> None:
self._pending_patch = None
self._show_modal = False
self._on_apply_callback = None
self._on_reject_callback = None
_patch_modal_manager: Optional[PatchModalManager] = None
def get_patch_modal_manager() -> PatchModalManager:
global _patch_modal_manager
if _patch_modal_manager is None:
_patch_modal_manager = PatchModalManager()
return _patch_modal_manager
def reset_patch_modal_manager() -> None:
global _patch_modal_manager
if _patch_modal_manager:
_patch_modal_manager.reset()
_patch_modal_manager = None

View File

@@ -44,13 +44,14 @@ def _build_subprocess_env() -> dict[str, str]:
env[key] = os.path.expandvars(str(val))
return env
def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str:
def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> str:
"""
Run a PowerShell script with working directory set to base_dir.
Returns a string combining stdout, stderr, and exit code.
Environment is configured via mcp_env.toml (project root).
If qa_callback is provided and the command fails or has stderr,
the callback is called with the stderr content and its result is appended.
If patch_callback is provided, it receives (error, file_context) and returns patch text.
"""
safe_dir: str = str(base_dir).replace("'", "''")
full_script: str = f"Set-Location -LiteralPath '{safe_dir}'\n{script}"
@@ -72,6 +73,10 @@ def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[s
qa_analysis: Optional[str] = qa_callback(stderr.strip())
if qa_analysis:
parts.append(f"\nQA ANALYSIS:\n{qa_analysis}")
if patch_callback and (process.returncode != 0 or stderr.strip()):
patch_text = patch_callback(stderr.strip(), base_dir)
if patch_text:
parts.append(f"\nAUTO_PATCH:\n{patch_text}")
return "\n".join(parts)
except subprocess.TimeoutExpired:
if 'process' in locals() and process:

181
tests/test_diff_viewer.py Normal file
View File

@@ -0,0 +1,181 @@
import pytest
import tempfile
import os
from pathlib import Path
from src.diff_viewer import (
parse_diff, DiffFile, DiffHunk, parse_hunk_header,
get_line_color, render_diff_text_immediate,
create_backup, apply_patch_to_file, restore_from_backup, cleanup_backup
)
def test_parse_diff_empty() -> None:
result = parse_diff("")
assert result == []
def test_parse_diff_none() -> None:
result = parse_diff(None) # type: ignore
assert result == []
def test_parse_simple_diff() -> None:
diff_text = """--- a/src/test.py
+++ b/src/test.py
@@ -1 +1 @@
-old
+new"""
result = parse_diff(diff_text)
assert len(result) == 1
assert result[0].old_path == "src/test.py"
assert result[0].new_path == "src/test.py"
assert len(result[0].hunks) == 1
assert result[0].hunks[0].header == "@@ -1 +1 @@"
def test_parse_diff_with_context() -> None:
diff_text = """--- a/src/example.py
+++ b/src/example.py
@@ -10,5 +10,6 @@
def existing_function():
pass
- old_line
+ old_line
+ new_line
more_code"""
result = parse_diff(diff_text)
assert len(result) == 1
assert result[0].old_path == "src/example.py"
assert len(result[0].hunks) == 1
hunk = result[0].hunks[0]
assert hunk.old_start == 10
assert hunk.old_count == 5
assert hunk.new_start == 10
assert hunk.new_count == 6
assert "- old_line" in hunk.lines
assert "+ new_line" in hunk.lines
def test_parse_multiple_files() -> None:
diff_text = """--- a/file1.py
+++ b/file1.py
@@ -1 +1 @@
-a
+b
--- a/file2.py
+++ b/file2.py
@@ -1 +1 @@
-c
+d"""
result = parse_diff(diff_text)
assert len(result) == 2
assert result[0].old_path == "file1.py"
assert result[1].old_path == "file2.py"
def test_parse_hunk_header() -> None:
result = parse_hunk_header("@@ -10,5 +10,6 @@")
assert result == (10, 5, 10, 6)
result = parse_hunk_header("@@ -1 +1 @@")
assert result == (1, 1, 1, 1)
def test_diff_line_classification() -> None:
diff_text = """--- a/test.py
+++ b/test.py
@@ -1,3 +1,4 @@
context line
-removed line
+removed line
+added line
another context"""
result = parse_diff(diff_text)
hunk = result[0].hunks[0]
assert any(line.startswith("-") for line in hunk.lines)
assert any(line.startswith("+") for line in hunk.lines)
assert any(line.startswith(" ") or not line.startswith(("-", "+")) for line in hunk.lines)
def test_get_line_color() -> None:
assert get_line_color("+added") == "green"
assert get_line_color("-removed") == "red"
assert get_line_color("@@ -1,3 +1,4 @@") == "cyan"
assert get_line_color(" context") == None
def test_render_diff_text_immediate() -> None:
diff_text = """--- a/test.py
+++ b/test.py
@@ -1 +1 @@
-old
+new"""
diff_files = parse_diff(diff_text)
output = render_diff_text_immediate(diff_files)
assert len(output) > 0
assert ("File: test.py", "white") in output
assert ("@@ -1 +1 @@", "cyan") in output
assert ("-old", "red") in output
assert ("+new", "green") in output
def test_create_backup() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "test.py"
test_file.write_text("original content\n")
backup_path = create_backup(str(test_file))
assert backup_path is not None
assert Path(backup_path).exists()
assert Path(backup_path).read_text() == "original content\n"
def test_create_backup_nonexistent() -> None:
result = create_backup("/nonexistent/file.py")
assert result is None
def test_apply_patch_simple() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "test.py"
test_file.write_text("old\n")
patch = f"""--- a/{test_file.name}
+++ b/{test_file.name}
@@ -1 +1 @@
-old
+new"""
success, msg = apply_patch_to_file(patch, tmpdir)
assert success
assert test_file.read_text() == "new\n"
def test_apply_patch_with_context() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "example.py"
test_file.write_text("line 1\nline 2\nline 3\n")
patch = f"""--- a/{test_file.name}
+++ b/{test_file.name}
@@ -1,3 +1,3 @@
-line 1
-line 2
+line one
+line two
line 3"""
success, msg = apply_patch_to_file(patch, tmpdir)
assert success
content = test_file.read_text()
assert "line one" in content
assert "line two" in content
def test_restore_from_backup() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "test.py"
test_file.write_text("modified\n")
backup_file = test_file.with_suffix(".py.backup")
backup_file.write_text("original\n")
success = restore_from_backup(str(test_file))
assert success
assert test_file.read_text() == "original\n"
def test_cleanup_backup() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "test.py"
test_file.write_text("content\n")
backup_file = test_file.with_suffix(".py.backup")
backup_file.write_text("backup\n")
cleanup_backup(str(test_file))
assert not backup_file.exists()

89
tests/test_patch_modal.py Normal file
View File

@@ -0,0 +1,89 @@
import pytest
from src.patch_modal import (
PatchModalManager, PendingPatch,
get_patch_modal_manager, reset_patch_modal_manager
)
def test_patch_modal_manager_init():
manager = PatchModalManager()
assert manager.get_pending_patch() is None
assert manager.is_modal_shown() is False
def test_request_patch_approval():
manager = PatchModalManager()
patch_text = "--- a/test.py\n+++ b/test.py\n@@ -1 +1 @@\n-old\n+new"
file_paths = ["test.py"]
result = manager.request_patch_approval(patch_text, file_paths)
assert result is True
assert manager.is_modal_shown() is True
pending = manager.get_pending_patch()
assert pending is not None
assert pending.patch_text == patch_text
assert pending.file_paths == file_paths
def test_reject_patch():
manager = PatchModalManager()
manager.request_patch_approval("diff", ["file.py"])
manager.reject_patch()
assert manager.get_pending_patch() is None
assert manager.is_modal_shown() is False
def test_close_modal():
manager = PatchModalManager()
manager.request_patch_approval("diff", ["file.py"])
manager.close_modal()
assert manager.is_modal_shown() is False
def test_apply_callback():
manager = PatchModalManager()
called = []
def apply_fn(patch: str) -> bool:
called.append(patch)
return True
manager.set_apply_callback(apply_fn)
patch_text = "--- a/test.py\n+++ b/test.py\n"
result = manager.apply_patch(patch_text)
assert result is True
assert len(called) == 1
assert called[0] == patch_text
def test_reject_callback():
manager = PatchModalManager()
called = []
def reject_fn() -> None:
called.append(True)
manager.set_reject_callback(reject_fn)
manager.reject_patch()
assert len(called) == 1
def test_reset():
manager = PatchModalManager()
manager.request_patch_approval("diff", ["file.py"])
manager.set_apply_callback(lambda x: True)
manager.set_reject_callback(lambda: None)
manager.reset()
assert manager.get_pending_patch() is None
assert manager.is_modal_shown() is False
def test_get_patch_modal_manager_singleton():
reset_patch_modal_manager()
mgr1 = get_patch_modal_manager()
mgr2 = get_patch_modal_manager()
assert mgr1 is mgr2
reset_patch_modal_manager()

View File

@@ -130,5 +130,5 @@ def test_gemini_provider_passes_qa_callback_to_run_script() -> None:
base_dir=".",
qa_callback=qa_callback
)
# Verify _run_script received the qa_callback
mock_run_script.assert_called_with("dir", ".", qa_callback)
# Verify _run_script received the qa_callback and patch_callback
mock_run_script.assert_called_with("dir", ".", qa_callback, None)