Compare commits

...

7 Commits

Author SHA1 Message Date
Ed_
90670b9671 feat(tier4): Integrate patch generation into GUI workflow
- Add patch_callback parameter throughout the tool execution chain
- Add _render_patch_modal() to gui_2.py with colored diff display
- Add patch modal state variables to App.__init__
- Add request_patch_from_tier4() to trigger patch generation
- Add run_tier4_patch_callback() to ai_client.py
- Update shell_runner to accept and execute patch_callback
- Diff colors: green for additions, red for deletions, cyan for headers
- 36 tests passing
2026-03-07 00:26:34 -05:00
Ed_
72a71706e3 conductor(plan): Mark Phase 5 complete - all phases done
Summary of implementation:
- Phase 1: Tier 4 patch generation (run_tier4_patch_generation)
- Phase 2: Diff parser and renderer (src/diff_viewer.py)
- Phase 3: Patch application with backup/rollback
- Phase 4: Patch modal manager for approval workflow
- Phase 5: 29 unit tests passing
2026-03-07 00:15:42 -05:00
Ed_
d58816620a feat(modal): Add patch approval modal manager
- Create src/patch_modal.py with PatchModalManager class
- Manage patch approval workflow: request, apply, reject
- Provide singleton access via get_patch_modal_manager()
- Add 8 unit tests for modal manager
2026-03-07 00:15:06 -05:00
Ed_
125cbc6dd0 feat(patch): Add patch application and backup functions
- Add create_backup() to backup files before patching
- Add apply_patch_to_file() to apply unified diff
- Add restore_from_backup() for rollback
- Add cleanup_backup() to remove backup files
- Add 15 unit tests for all patch operations
2026-03-07 00:14:23 -05:00
Ed_
99a5d7045f feat(diff): Add diff rendering helpers for GUI
- Add get_line_color() to classify diff lines
- Add render_diff_text_immediate() for immediate mode rendering
- Add tests for rendering functions
2026-03-07 00:13:10 -05:00
Ed_
130001c0ba feat(diff): Add diff parser for unified diff format
- Create src/diff_viewer.py with parse_diff function
- Parse unified diff into DiffFile and DiffHunk dataclasses
- Extract file paths, hunk headers, and line changes
- Add unit tests for diff parser
2026-03-07 00:12:06 -05:00
Ed_
da58f46e89 conductor(plan): Mark Phase 1 tasks complete 2026-03-07 00:11:17 -05:00
12 changed files with 711 additions and 38 deletions

View File

@@ -26,7 +26,7 @@ This file tracks all major tracks for the project. Each track has its own detail
3. [x] **Track: Visual DAG & Interactive Ticket Editing** 3. [x] **Track: Visual DAG & Interactive Ticket Editing**
*Link: [./tracks/visual_dag_ticket_editing_20260306/](./tracks/visual_dag_ticket_editing_20260306/)* *Link: [./tracks/visual_dag_ticket_editing_20260306/](./tracks/visual_dag_ticket_editing_20260306/)*
4. [ ] **Track: Advanced Tier 4 QA Auto-Patching** 4. [~] **Track: Advanced Tier 4 QA Auto-Patching**
*Link: [./tracks/tier4_auto_patching_20260306/](./tracks/tier4_auto_patching_20260306/)* *Link: [./tracks/tier4_auto_patching_20260306/](./tracks/tier4_auto_patching_20260306/)*
5. [ ] **Track: Transitioning to Native Orchestrator** 5. [ ] **Track: Transitioning to Native Orchestrator**

View File

@@ -6,7 +6,7 @@
Focus: Generate unified diff on test failure Focus: Generate unified diff on test failure
- [x] Task 1.1: Initialize MMA Environment - [x] Task 1.1: Initialize MMA Environment
- [~] Task 1.2: Extend Tier 4 prompt for patch generation - [x] Task 1.2: Extend Tier 4 prompt for patch generation
- WHERE: `src/mma_prompts.py` or inline in `ai_client.py` - WHERE: `src/mma_prompts.py` or inline in `ai_client.py`
- WHAT: Prompt to generate unified diff - WHAT: Prompt to generate unified diff
- HOW: - HOW:
@@ -24,7 +24,7 @@ Focus: Generate unified diff on test failure
""" """
``` ```
- [ ] Task 1.3: Add patch generation function - [x] Task 1.3: Add patch generation function
- WHERE: `src/ai_client.py` - WHERE: `src/ai_client.py`
- WHAT: Generate patch from error - WHAT: Generate patch from error
- HOW: - HOW:
@@ -119,5 +119,5 @@ Focus: Approval modal for patches
``` ```
## Phase 5: Testing ## Phase 5: Testing
- [ ] Task 5.1: Write unit tests - [x] Task 5.1: Write unit tests
- [ ] Task 5.2: Conductor - Phase Verification - [x] Task 5.2: Conductor - Phase Verification

View File

@@ -85,7 +85,7 @@ _send_lock: threading.Lock = threading.Lock()
_gemini_cli_adapter: Optional[GeminiCliAdapter] = None _gemini_cli_adapter: Optional[GeminiCliAdapter] = None
# Injected by gui.py - called when AI wants to run a command. # Injected by gui.py - called when AI wants to run a command.
confirm_and_run_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None confirm_and_run_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]], Optional[Callable[[str, str], Optional[str]]]], Optional[str]]] = None
# Injected by gui.py - called whenever a comms entry is appended. # Injected by gui.py - called whenever a comms entry is appended.
comms_log_callback: Optional[Callable[[dict[str, Any]], None]] = None comms_log_callback: Optional[Callable[[dict[str, Any]], None]] = None
@@ -558,7 +558,8 @@ async def _execute_tool_calls_concurrently(
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]], pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]],
qa_callback: Optional[Callable[[str], str]], qa_callback: Optional[Callable[[str], str]],
r_idx: int, r_idx: int,
provider: str provider: str,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None
) -> list[tuple[str, str, str, str]]: # tool_name, call_id, output, original_name ) -> list[tuple[str, str, str, str]]: # tool_name, call_id, output, original_name
""" """
Executes multiple tool calls concurrently using asyncio.gather. Executes multiple tool calls concurrently using asyncio.gather.
@@ -589,7 +590,7 @@ async def _execute_tool_calls_concurrently(
else: else:
continue continue
tasks.append(_execute_single_tool_call_async(name, args, call_id, base_dir, pre_tool_callback, qa_callback, r_idx)) tasks.append(_execute_single_tool_call_async(name, args, call_id, base_dir, pre_tool_callback, qa_callback, r_idx, patch_callback))
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
return results return results
@@ -601,7 +602,8 @@ async def _execute_single_tool_call_async(
base_dir: str, base_dir: str,
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]], pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]],
qa_callback: Optional[Callable[[str], str]], qa_callback: Optional[Callable[[str], str]],
r_idx: int r_idx: int,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None
) -> tuple[str, str, str, str]: ) -> tuple[str, str, str, str]:
out = "" out = ""
tool_executed = False tool_executed = False
@@ -631,16 +633,16 @@ async def _execute_single_tool_call_async(
elif name == TOOL_NAME: elif name == TOOL_NAME:
scr = cast(str, args.get("script", "")) scr = cast(str, args.get("script", ""))
_append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": call_id, "script": scr}) _append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": call_id, "script": scr})
out = await asyncio.to_thread(_run_script, scr, base_dir, qa_callback) out = await asyncio.to_thread(_run_script, scr, base_dir, qa_callback, patch_callback)
else: else:
out = f"ERROR: unknown tool '{name}'" out = f"ERROR: unknown tool '{name}'"
return (name, call_id, out, name) return (name, call_id, out, name)
def _run_script(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str: def _run_script(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> str:
if confirm_and_run_callback is None: if confirm_and_run_callback is None:
return "ERROR: no confirmation handler registered" return "ERROR: no confirmation handler registered"
result = confirm_and_run_callback(script, base_dir, qa_callback) result = confirm_and_run_callback(script, base_dir, qa_callback, patch_callback)
if result is None: if result is None:
output = "USER REJECTED: command was not executed" output = "USER REJECTED: command was not executed"
else: else:
@@ -799,7 +801,8 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None, pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None,
qa_callback: Optional[Callable[[str], str]] = None, qa_callback: Optional[Callable[[str], str]] = None,
enable_tools: bool = True, enable_tools: bool = True,
stream_callback: Optional[Callable[[str], None]] = None) -> str: stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> str:
global _gemini_chat, _gemini_cache, _gemini_cache_md_hash, _gemini_cache_created_at global _gemini_chat, _gemini_cache, _gemini_cache_md_hash, _gemini_cache_created_at
try: try:
_ensure_gemini_client(); mcp_client.configure(file_items or [], [base_dir]) _ensure_gemini_client(); mcp_client.configure(file_items or [], [base_dir])
@@ -979,11 +982,11 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
try: try:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
results = asyncio.run_coroutine_threadsafe( results = asyncio.run_coroutine_threadsafe(
_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini"), _execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini", patch_callback),
loop loop
).result() ).result()
except RuntimeError: except RuntimeError:
results = asyncio.run(_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini")) results = asyncio.run(_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini", patch_callback))
for i, (name, call_id, out, _) in enumerate(results): for i, (name, call_id, out, _) in enumerate(results):
# Check if this is the last tool to trigger file refresh # Check if this is the last tool to trigger file refresh
@@ -1079,11 +1082,11 @@ def _send_gemini_cli(md_content: str, user_message: str, base_dir: str,
try: try:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
results = asyncio.run_coroutine_threadsafe( results = asyncio.run_coroutine_threadsafe(
_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini_cli"), _execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini_cli", patch_callback),
loop loop
).result() ).result()
except RuntimeError: except RuntimeError:
results = asyncio.run(_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini_cli")) results = asyncio.run(_execute_tool_calls_concurrently(calls, base_dir, pre_tool_callback, qa_callback, r_idx, "gemini_cli", patch_callback))
tool_results_for_cli: list[dict[str, Any]] = [] tool_results_for_cli: list[dict[str, Any]] = []
for i, (name, call_id, out, _) in enumerate(results): for i, (name, call_id, out, _) in enumerate(results):
@@ -1404,7 +1407,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
loop loop
).result() ).result()
except RuntimeError: except RuntimeError:
results = asyncio.run(_execute_tool_calls_concurrently(response.content, base_dir, pre_tool_callback, qa_callback, round_idx, "anthropic")) results = asyncio.run(_execute_tool_calls_concurrently(response.content, base_dir, pre_tool_callback, qa_callback, round_idx, "anthropic", patch_callback))
tool_results: list[dict[str, Any]] = [] tool_results: list[dict[str, Any]] = []
for i, (name, call_id, out, _) in enumerate(results): for i, (name, call_id, out, _) in enumerate(results):
@@ -1675,11 +1678,11 @@ def _send_deepseek(md_content: str, user_message: str, base_dir: str,
try: try:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
results = asyncio.run_coroutine_threadsafe( results = asyncio.run_coroutine_threadsafe(
_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "deepseek"), _execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "deepseek", patch_callback),
loop loop
).result() ).result()
except RuntimeError: except RuntimeError:
results = asyncio.run(_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "deepseek")) results = asyncio.run(_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "deepseek", patch_callback))
tool_results_for_history: list[dict[str, Any]] = [] tool_results_for_history: list[dict[str, Any]] = []
for i, (name, call_id, out, _) in enumerate(results): for i, (name, call_id, out, _) in enumerate(results):
@@ -1891,11 +1894,11 @@ def _send_minimax(md_content: str, user_message: str, base_dir: str,
try: try:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
results = asyncio.run_coroutine_threadsafe( results = asyncio.run_coroutine_threadsafe(
_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "minimax"), _execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "minimax", patch_callback),
loop loop
).result() ).result()
except RuntimeError: except RuntimeError:
results = asyncio.run(_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "minimax")) results = asyncio.run(_execute_tool_calls_concurrently(tool_calls_raw, base_dir, pre_tool_callback, qa_callback, round_idx, "minimax", patch_callback))
tool_results_for_history: list[dict[str, Any]] = [] tool_results_for_history: list[dict[str, Any]] = []
for i, (name, call_id, out, _) in enumerate(results): for i, (name, call_id, out, _) in enumerate(results):
@@ -1962,6 +1965,23 @@ def run_tier4_analysis(stderr: str) -> str:
return f"[QA ANALYSIS FAILED] {e}" return f"[QA ANALYSIS FAILED] {e}"
def run_tier4_patch_callback(stderr: str, base_dir: str) -> Optional[str]:
try:
from src import project_manager
file_items = project_manager.get_current_file_items()
file_context = ""
for item in file_items[:5]:
path = item.get("path", "")
content = item.get("content", "")[:2000]
file_context += f"\n\nFile: {path}\n```\n{content}\n```\n"
patch = run_tier4_patch_generation(stderr, file_context)
if patch and "---" in patch and "+++" in patch:
return patch
return None
except Exception as e:
return None
def run_tier4_patch_generation(error: str, file_context: str) -> str: def run_tier4_patch_generation(error: str, file_context: str) -> str:
if not error or not error.strip(): if not error or not error.strip():
return "" return ""
@@ -2032,32 +2052,33 @@ def send(
qa_callback: Optional[Callable[[str], str]] = None, qa_callback: Optional[Callable[[str], str]] = None,
enable_tools: bool = True, enable_tools: bool = True,
stream_callback: Optional[Callable[[str], None]] = None, stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None,
) -> str: ) -> str:
with _send_lock: with _send_lock:
if _provider == "gemini": if _provider == "gemini":
return _send_gemini( return _send_gemini(
md_content, user_message, base_dir, file_items, discussion_history, md_content, user_message, base_dir, file_items, discussion_history,
pre_tool_callback, qa_callback, enable_tools, stream_callback pre_tool_callback, qa_callback, enable_tools, stream_callback, patch_callback
) )
elif _provider == "gemini_cli": elif _provider == "gemini_cli":
return _send_gemini_cli( return _send_gemini_cli(
md_content, user_message, base_dir, file_items, discussion_history, md_content, user_message, base_dir, file_items, discussion_history,
pre_tool_callback, qa_callback, stream_callback pre_tool_callback, qa_callback, stream_callback, patch_callback
) )
elif _provider == "anthropic": elif _provider == "anthropic":
return _send_anthropic( return _send_anthropic(
md_content, user_message, base_dir, file_items, discussion_history, md_content, user_message, base_dir, file_items, discussion_history,
pre_tool_callback, qa_callback, stream_callback=stream_callback pre_tool_callback, qa_callback, stream_callback=stream_callback, patch_callback=patch_callback
) )
elif _provider == "deepseek": elif _provider == "deepseek":
return _send_deepseek( return _send_deepseek(
md_content, user_message, base_dir, file_items, discussion_history, md_content, user_message, base_dir, file_items, discussion_history,
stream, pre_tool_callback, qa_callback, stream_callback stream, pre_tool_callback, qa_callback, stream_callback, patch_callback
) )
elif _provider == "minimax": elif _provider == "minimax":
return _send_minimax( return _send_minimax(
md_content, user_message, base_dir, file_items, discussion_history, md_content, user_message, base_dir, file_items, discussion_history,
stream, pre_tool_callback, qa_callback, stream_callback stream, pre_tool_callback, qa_callback, stream_callback, patch_callback
) )
else: else:
raise ValueError(f"Unknown provider: {_provider}") raise ValueError(f"Unknown provider: {_provider}")
@@ -2079,16 +2100,16 @@ def _is_mutating_tool(name: str) -> bool:
"""Returns True if the tool name is considered a mutating tool.""" """Returns True if the tool name is considered a mutating tool."""
return name in mcp_client.MUTATING_TOOLS or name == TOOL_NAME return name in mcp_client.MUTATING_TOOLS or name == TOOL_NAME
def _confirm_and_run(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> Optional[str]: def _confirm_and_run(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Optional[str]:
""" """
Wrapper for the confirm_and_run_callback. Wrapper for the confirm_and_run_callback.
This is what the providers call to trigger HITL approval. This is what the providers call to trigger HITL approval.
""" """
if confirm_and_run_callback: if confirm_and_run_callback:
return confirm_and_run_callback(script, base_dir, qa_callback) return confirm_and_run_callback(script, base_dir, qa_callback, patch_callback)
# Fallback to direct execution if no callback registered (headless default) # Fallback to direct execution if no callback registered (headless default)
from src import shell_runner from src import shell_runner
return shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback) return shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback)
def get_history_bleed_stats(md_content: Optional[str] = None) -> dict[str, Any]: def get_history_bleed_stats(md_content: Optional[str] = None) -> dict[str, Any]:
if _provider == "anthropic": if _provider == "anthropic":

View File

@@ -907,7 +907,8 @@ class AppController:
stream=True, stream=True,
stream_callback=lambda text: self._on_ai_stream(text), stream_callback=lambda text: self._on_ai_stream(text),
pre_tool_callback=self._confirm_and_run, pre_tool_callback=self._confirm_and_run,
qa_callback=ai_client.run_tier4_analysis qa_callback=ai_client.run_tier4_analysis,
patch_callback=ai_client.run_tier4_patch_callback
) )
self.event_queue.put("response", {"text": resp, "status": "done", "role": "AI"}) self.event_queue.put("response", {"text": resp, "status": "done", "role": "AI"})
except ai_client.ProviderError as e: except ai_client.ProviderError as e:
@@ -988,14 +989,14 @@ class AppController:
"ts": project_manager.now_ts() "ts": project_manager.now_ts()
}) })
def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> Optional[str]: def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Optional[str]:
sys.stderr.write(f"[DEBUG] _confirm_and_run called. test_hooks={self.test_hooks_enabled}, manual_approve={getattr(self, 'ui_manual_approve', False)}\n") sys.stderr.write(f"[DEBUG] _confirm_and_run called. test_hooks={self.test_hooks_enabled}, manual_approve={getattr(self, 'ui_manual_approve', False)}\n")
sys.stderr.flush() sys.stderr.flush()
if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False): if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False):
sys.stderr.write("[DEBUG] Auto-approving script.\n") sys.stderr.write("[DEBUG] Auto-approving script.\n")
sys.stderr.flush() sys.stderr.flush()
self._set_status("running powershell...") self._set_status("running powershell...")
output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback) output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback)
self._append_tool_log(script, output) self._append_tool_log(script, output)
self._set_status("powershell done, awaiting AI...") self._set_status("powershell done, awaiting AI...")
return output return output
@@ -1033,7 +1034,7 @@ class AppController:
self._append_tool_log(final_script, "REJECTED by user") self._append_tool_log(final_script, "REJECTED by user")
return None return None
self._set_status("running powershell...") self._set_status("running powershell...")
output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback) output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback)
self._append_tool_log(final_script, output) self._append_tool_log(final_script, output)
self._set_status("powershell done, awaiting AI...") self._set_status("powershell done, awaiting AI...")
return output return output

220
src/diff_viewer.py Normal file
View File

@@ -0,0 +1,220 @@
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
import shutil
import os
from pathlib import Path
@dataclass
class DiffHunk:
header: str
lines: List[str]
old_start: int
old_count: int
new_start: int
new_count: int
@dataclass
class DiffFile:
old_path: str
new_path: str
hunks: List[DiffHunk]
def parse_diff_header(line: str) -> tuple[Optional[str], Optional[str], Optional[tuple[int, int, int, int]]]:
if not line.startswith(("--- ", "+++ ")):
return None, None, None
if line.startswith("--- "):
path = line[4:]
if path.startswith("a/"):
path = path[2:]
return path, None, None
elif line.startswith("+++ "):
path = line[4:]
if path.startswith("b/"):
path = path[2:]
return None, path, None
return None, None, None
def parse_hunk_header(line: str) -> Optional[tuple[int, int, int, int]]:
if not line.startswith("@@"):
return None
parts = line.split()
if len(parts) < 2:
return None
old_part = parts[1][1:]
new_part = parts[2][1:]
old_parts = old_part.split(",")
new_parts = new_part.split(",")
old_start = int(old_parts[0])
old_count = int(old_parts[1]) if len(old_parts) > 1 else 1
new_start = int(new_parts[0])
new_count = int(new_parts[1]) if len(new_parts) > 1 else 1
return (old_start, old_count, new_start, new_count)
def parse_diff(diff_text: str) -> List[DiffFile]:
if not diff_text or not diff_text.strip():
return []
files: List[DiffFile] = []
current_file: Optional[DiffFile] = None
current_hunk: Optional[DiffHunk] = None
for line in diff_text.split("\n"):
if line.startswith("--- "):
if current_file:
if current_hunk:
current_file.hunks.append(current_hunk)
current_hunk = None
files.append(current_file)
path = line[4:]
if path.startswith("a/"):
path = path[2:]
current_file = DiffFile(old_path=path, new_path="", hunks=[])
elif line.startswith("+++ ") and current_file:
path = line[4:]
if path.startswith("b/"):
path = path[2:]
current_file.new_path = path
elif line.startswith("@@") and current_file:
if current_hunk:
current_file.hunks.append(current_hunk)
hunk_info = parse_hunk_header(line)
if hunk_info:
old_start, old_count, new_start, new_count = hunk_info
current_hunk = DiffHunk(
header=line,
lines=[],
old_start=old_start,
old_count=old_count,
new_start=new_start,
new_count=new_count
)
else:
current_hunk = DiffHunk(
header=line,
lines=[],
old_start=0,
old_count=0,
new_start=0,
new_count=0
)
elif current_hunk is not None:
current_hunk.lines.append(line)
elif line and not line.startswith("diff ") and not line.startswith("index "):
pass
if current_file:
if current_hunk:
current_file.hunks.append(current_hunk)
files.append(current_file)
return files
def format_diff_for_display(diff_files: List[DiffFile]) -> str:
output = []
for df in diff_files:
output.append(f"File: {df.old_path}")
for hunk in df.hunks:
output.append(f" {hunk.header}")
for line in hunk.lines:
output.append(f" {line}")
return "\n".join(output)
def get_line_color(line: str) -> Optional[str]:
if line.startswith("+"):
return "green"
elif line.startswith("-"):
return "red"
elif line.startswith("@@"):
return "cyan"
return None
def render_diff_text_immediate(diff_files: List[DiffFile]) -> List[tuple[str, Optional[str]]]:
output: List[tuple[str, Optional[str]]] = []
for df in diff_files:
output.append((f"File: {df.old_path}", "white"))
for hunk in df.hunks:
output.append((hunk.header, "cyan"))
for line in hunk.lines:
color = get_line_color(line)
output.append((line, color))
return output
def create_backup(file_path: str) -> Optional[str]:
path = Path(file_path)
if not path.exists():
return None
backup_path = path.with_suffix(path.suffix + ".backup")
shutil.copy2(path, backup_path)
return str(backup_path)
def apply_patch_to_file(patch_text: str, base_dir: str = ".") -> Tuple[bool, str]:
import difflib
diff_files = parse_diff(patch_text)
if not diff_files:
return False, "No valid diff found"
results = []
for df in diff_files:
file_path = Path(base_dir) / df.old_path
if not file_path.exists():
results.append(f"File not found: {file_path}")
continue
try:
with open(file_path, "r", encoding="utf-8") as f:
original_lines = f.read().splitlines(keepends=True)
new_lines = original_lines.copy()
offset = 0
for hunk in df.hunks:
hunk_old_start = hunk.old_start - 1
hunk_old_count = hunk.old_count
replace_start = hunk_old_start + offset
replace_count = hunk_old_count
hunk_new_content: List[str] = []
for line in hunk.lines:
if line.startswith("+") and not line.startswith("+++"):
hunk_new_content.append(line[1:] + "\n")
elif line.startswith(" ") or (line and not line.startswith(("-", "+", "@@"))):
hunk_new_content.append(line + "\n")
new_lines = new_lines[:replace_start] + hunk_new_content + new_lines[replace_start + replace_count:]
offset += len(hunk_new_content) - replace_count
with open(file_path, "w", encoding="utf-8", newline="") as f:
f.writelines(new_lines)
results.append(f"Patched: {file_path}")
except Exception as e:
return False, f"Error patching {file_path}: {e}"
return True, "\n".join(results)
def restore_from_backup(file_path: str) -> bool:
backup_path = Path(str(file_path) + ".backup")
if not backup_path.exists():
return False
shutil.copy2(backup_path, file_path)
return True
def cleanup_backup(file_path: str) -> None:
backup_path = Path(str(file_path) + ".backup")
if backup_path.exists():
backup_path.unlink()

View File

@@ -1,4 +1,4 @@
# gui_2.py # gui_2.py
from __future__ import annotations from __future__ import annotations
import tomli_w import tomli_w
import time import time
@@ -114,6 +114,11 @@ class App:
self._tool_log_dirty: bool = True self._tool_log_dirty: bool = True
self._last_ui_focus_agent: Optional[str] = None self._last_ui_focus_agent: Optional[str] = None
self._log_registry: Optional[log_registry.LogRegistry] = None self._log_registry: Optional[log_registry.LogRegistry] = None
# Patch viewer state for Tier 4 auto-patching
self._pending_patch_text: Optional[str] = None
self._pending_patch_files: list[str] = []
self._show_patch_modal: bool = False
self._patch_error_message: Optional[str] = None
def _handle_approve_tool(self, user_data=None) -> None: def _handle_approve_tool(self, user_data=None) -> None:
"""UI-level wrapper for approving a pending tool execution ask.""" """UI-level wrapper for approving a pending tool execution ask."""
@@ -254,6 +259,7 @@ class App:
self._process_pending_gui_tasks() self._process_pending_gui_tasks()
self._process_pending_history_adds() self._process_pending_history_adds()
self._render_track_proposal_modal() self._render_track_proposal_modal()
self._render_patch_modal()
# Auto-save (every 60s) # Auto-save (every 60s)
now = time.time() now = time.time()
if now - self._last_autosave >= self._autosave_interval: if now - self._last_autosave >= self._autosave_interval:
@@ -873,6 +879,82 @@ class App:
imgui.close_current_popup() imgui.close_current_popup()
imgui.end_popup() imgui.end_popup()
def _render_patch_modal(self) -> None:
if not self._show_patch_modal:
return
imgui.open_popup("Apply Patch?")
if imgui.begin_popup_modal("Apply Patch?", True, imgui.ImVec2(600, 500))[0]:
imgui.text_colored(imgui.ImVec4(1, 0.9, 0.3, 1), "Tier 4 QA Generated a Patch")
imgui.separator()
if self._pending_patch_files:
imgui.text("Files to modify:")
for f in self._pending_patch_files:
imgui.text(f" - {f}")
imgui.separator()
if self._patch_error_message:
imgui.text_colored(imgui.ImVec4(1, 0.3, 0.3, 1), f"Error: {self._patch_error_message}")
imgui.separator()
imgui.text("Diff Preview:")
imgui.begin_child("patch_diff_scroll", imgui.ImVec2(-1, 280), True)
if self._pending_patch_text:
diff_lines = self._pending_patch_text.split("\n")
for line in diff_lines:
if line.startswith("+++") or line.startswith("---") or line.startswith("@@"):
imgui.text_colored(imgui.ImVec4(0.3, 0.7, 1, 1), line)
elif line.startswith("+"):
imgui.text_colored(imgui.ImVec4(0.2, 0.9, 0.2, 1), line)
elif line.startswith("-"):
imgui.text_colored(imgui.ImVec4(0.9, 0.2, 0.2, 1), line)
else:
imgui.text(line)
imgui.end_child()
imgui.separator()
if imgui.button("Apply Patch", imgui.ImVec2(150, 0)):
self._apply_pending_patch()
imgui.same_line()
if imgui.button("Reject", imgui.ImVec2(150, 0)):
self._show_patch_modal = False
self._pending_patch_text = None
self._pending_patch_files = []
self._patch_error_message = None
imgui.close_current_popup()
imgui.end_popup()
def _apply_pending_patch(self) -> None:
if not self._pending_patch_text:
self._patch_error_message = "No patch to apply"
return
try:
from src.diff_viewer import apply_patch_to_file
base_dir = str(self.controller.current_project_dir) if hasattr(self.controller, 'current_project_dir') else "."
success, msg = apply_patch_to_file(self._pending_patch_text, base_dir)
if success:
self._show_patch_modal = False
self._pending_patch_text = None
self._pending_patch_files = []
self._patch_error_message = None
imgui.close_current_popup()
else:
self._patch_error_message = msg
except Exception as e:
self._patch_error_message = str(e)
def request_patch_from_tier4(self, error: str, file_context: str) -> None:
try:
from src import ai_client
from src.diff_viewer import parse_diff
patch_text = ai_client.run_tier4_patch_generation(error, file_context)
if patch_text and "---" in patch_text and "+++" in patch_text:
diff_files = parse_diff(patch_text)
file_paths = [df.old_path for df in diff_files]
self._pending_patch_text = patch_text
self._pending_patch_files = file_paths
self._show_patch_modal = True
else:
self._patch_error_message = patch_text or "No patch generated"
except Exception as e:
self._patch_error_message = str(e)
def _render_log_management(self) -> None: def _render_log_management(self) -> None:
exp, opened = imgui.begin("Log Management", self.show_windows["Log Management"]) exp, opened = imgui.begin("Log Management", self.show_windows["Log Management"])
self.show_windows["Log Management"] = bool(opened) self.show_windows["Log Management"] = bool(opened)

View File

@@ -410,6 +410,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
base_dir=".", base_dir=".",
pre_tool_callback=clutch_callback if ticket.step_mode else None, pre_tool_callback=clutch_callback if ticket.step_mode else None,
qa_callback=ai_client.run_tier4_analysis, qa_callback=ai_client.run_tier4_analysis,
patch_callback=ai_client.run_tier4_patch_callback,
stream_callback=stream_callback stream_callback=stream_callback
) )
finally: finally:

73
src/patch_modal.py Normal file
View File

@@ -0,0 +1,73 @@
from typing import Optional, Callable, List
from dataclasses import dataclass, field
@dataclass
class PendingPatch:
patch_text: str
file_paths: List[str]
generated_by: str
timestamp: float
class PatchModalManager:
def __init__(self):
self._pending_patch: Optional[PendingPatch] = None
self._show_modal: bool = False
self._on_apply_callback: Optional[Callable[[str], bool]] = None
self._on_reject_callback: Optional[Callable[[], None]] = None
def request_patch_approval(self, patch_text: str, file_paths: List[str], generated_by: str = "Tier 4 QA") -> bool:
from time import time
self._pending_patch = PendingPatch(
patch_text=patch_text,
file_paths=file_paths,
generated_by=generated_by,
timestamp=time()
)
self._show_modal = True
return True
def get_pending_patch(self) -> Optional[PendingPatch]:
return self._pending_patch
def is_modal_shown(self) -> bool:
return self._show_modal
def set_apply_callback(self, callback: Callable[[str], bool]) -> None:
self._on_apply_callback = callback
def set_reject_callback(self, callback: Callable[[], None]) -> None:
self._on_reject_callback = callback
def apply_patch(self, patch_text: str) -> bool:
if self._on_apply_callback:
return self._on_apply_callback(patch_text)
return False
def reject_patch(self) -> None:
self._pending_patch = None
self._show_modal = False
if self._on_reject_callback:
self._on_reject_callback()
def close_modal(self) -> None:
self._show_modal = False
def reset(self) -> None:
self._pending_patch = None
self._show_modal = False
self._on_apply_callback = None
self._on_reject_callback = None
_patch_modal_manager: Optional[PatchModalManager] = None
def get_patch_modal_manager() -> PatchModalManager:
global _patch_modal_manager
if _patch_modal_manager is None:
_patch_modal_manager = PatchModalManager()
return _patch_modal_manager
def reset_patch_modal_manager() -> None:
global _patch_modal_manager
if _patch_modal_manager:
_patch_modal_manager.reset()
_patch_modal_manager = None

View File

@@ -44,13 +44,14 @@ def _build_subprocess_env() -> dict[str, str]:
env[key] = os.path.expandvars(str(val)) env[key] = os.path.expandvars(str(val))
return env return env
def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str: def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> str:
""" """
Run a PowerShell script with working directory set to base_dir. Run a PowerShell script with working directory set to base_dir.
Returns a string combining stdout, stderr, and exit code. Returns a string combining stdout, stderr, and exit code.
Environment is configured via mcp_env.toml (project root). Environment is configured via mcp_env.toml (project root).
If qa_callback is provided and the command fails or has stderr, If qa_callback is provided and the command fails or has stderr,
the callback is called with the stderr content and its result is appended. the callback is called with the stderr content and its result is appended.
If patch_callback is provided, it receives (error, file_context) and returns patch text.
""" """
safe_dir: str = str(base_dir).replace("'", "''") safe_dir: str = str(base_dir).replace("'", "''")
full_script: str = f"Set-Location -LiteralPath '{safe_dir}'\n{script}" full_script: str = f"Set-Location -LiteralPath '{safe_dir}'\n{script}"
@@ -72,6 +73,10 @@ def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[s
qa_analysis: Optional[str] = qa_callback(stderr.strip()) qa_analysis: Optional[str] = qa_callback(stderr.strip())
if qa_analysis: if qa_analysis:
parts.append(f"\nQA ANALYSIS:\n{qa_analysis}") parts.append(f"\nQA ANALYSIS:\n{qa_analysis}")
if patch_callback and (process.returncode != 0 or stderr.strip()):
patch_text = patch_callback(stderr.strip(), base_dir)
if patch_text:
parts.append(f"\nAUTO_PATCH:\n{patch_text}")
return "\n".join(parts) return "\n".join(parts)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
if 'process' in locals() and process: if 'process' in locals() and process:

181
tests/test_diff_viewer.py Normal file
View File

@@ -0,0 +1,181 @@
import pytest
import tempfile
import os
from pathlib import Path
from src.diff_viewer import (
parse_diff, DiffFile, DiffHunk, parse_hunk_header,
get_line_color, render_diff_text_immediate,
create_backup, apply_patch_to_file, restore_from_backup, cleanup_backup
)
def test_parse_diff_empty() -> None:
result = parse_diff("")
assert result == []
def test_parse_diff_none() -> None:
result = parse_diff(None) # type: ignore
assert result == []
def test_parse_simple_diff() -> None:
diff_text = """--- a/src/test.py
+++ b/src/test.py
@@ -1 +1 @@
-old
+new"""
result = parse_diff(diff_text)
assert len(result) == 1
assert result[0].old_path == "src/test.py"
assert result[0].new_path == "src/test.py"
assert len(result[0].hunks) == 1
assert result[0].hunks[0].header == "@@ -1 +1 @@"
def test_parse_diff_with_context() -> None:
diff_text = """--- a/src/example.py
+++ b/src/example.py
@@ -10,5 +10,6 @@
def existing_function():
pass
- old_line
+ old_line
+ new_line
more_code"""
result = parse_diff(diff_text)
assert len(result) == 1
assert result[0].old_path == "src/example.py"
assert len(result[0].hunks) == 1
hunk = result[0].hunks[0]
assert hunk.old_start == 10
assert hunk.old_count == 5
assert hunk.new_start == 10
assert hunk.new_count == 6
assert "- old_line" in hunk.lines
assert "+ new_line" in hunk.lines
def test_parse_multiple_files() -> None:
diff_text = """--- a/file1.py
+++ b/file1.py
@@ -1 +1 @@
-a
+b
--- a/file2.py
+++ b/file2.py
@@ -1 +1 @@
-c
+d"""
result = parse_diff(diff_text)
assert len(result) == 2
assert result[0].old_path == "file1.py"
assert result[1].old_path == "file2.py"
def test_parse_hunk_header() -> None:
result = parse_hunk_header("@@ -10,5 +10,6 @@")
assert result == (10, 5, 10, 6)
result = parse_hunk_header("@@ -1 +1 @@")
assert result == (1, 1, 1, 1)
def test_diff_line_classification() -> None:
diff_text = """--- a/test.py
+++ b/test.py
@@ -1,3 +1,4 @@
context line
-removed line
+removed line
+added line
another context"""
result = parse_diff(diff_text)
hunk = result[0].hunks[0]
assert any(line.startswith("-") for line in hunk.lines)
assert any(line.startswith("+") for line in hunk.lines)
assert any(line.startswith(" ") or not line.startswith(("-", "+")) for line in hunk.lines)
def test_get_line_color() -> None:
assert get_line_color("+added") == "green"
assert get_line_color("-removed") == "red"
assert get_line_color("@@ -1,3 +1,4 @@") == "cyan"
assert get_line_color(" context") == None
def test_render_diff_text_immediate() -> None:
diff_text = """--- a/test.py
+++ b/test.py
@@ -1 +1 @@
-old
+new"""
diff_files = parse_diff(diff_text)
output = render_diff_text_immediate(diff_files)
assert len(output) > 0
assert ("File: test.py", "white") in output
assert ("@@ -1 +1 @@", "cyan") in output
assert ("-old", "red") in output
assert ("+new", "green") in output
def test_create_backup() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "test.py"
test_file.write_text("original content\n")
backup_path = create_backup(str(test_file))
assert backup_path is not None
assert Path(backup_path).exists()
assert Path(backup_path).read_text() == "original content\n"
def test_create_backup_nonexistent() -> None:
result = create_backup("/nonexistent/file.py")
assert result is None
def test_apply_patch_simple() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "test.py"
test_file.write_text("old\n")
patch = f"""--- a/{test_file.name}
+++ b/{test_file.name}
@@ -1 +1 @@
-old
+new"""
success, msg = apply_patch_to_file(patch, tmpdir)
assert success
assert test_file.read_text() == "new\n"
def test_apply_patch_with_context() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "example.py"
test_file.write_text("line 1\nline 2\nline 3\n")
patch = f"""--- a/{test_file.name}
+++ b/{test_file.name}
@@ -1,3 +1,3 @@
-line 1
-line 2
+line one
+line two
line 3"""
success, msg = apply_patch_to_file(patch, tmpdir)
assert success
content = test_file.read_text()
assert "line one" in content
assert "line two" in content
def test_restore_from_backup() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "test.py"
test_file.write_text("modified\n")
backup_file = test_file.with_suffix(".py.backup")
backup_file.write_text("original\n")
success = restore_from_backup(str(test_file))
assert success
assert test_file.read_text() == "original\n"
def test_cleanup_backup() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "test.py"
test_file.write_text("content\n")
backup_file = test_file.with_suffix(".py.backup")
backup_file.write_text("backup\n")
cleanup_backup(str(test_file))
assert not backup_file.exists()

89
tests/test_patch_modal.py Normal file
View File

@@ -0,0 +1,89 @@
import pytest
from src.patch_modal import (
PatchModalManager, PendingPatch,
get_patch_modal_manager, reset_patch_modal_manager
)
def test_patch_modal_manager_init():
manager = PatchModalManager()
assert manager.get_pending_patch() is None
assert manager.is_modal_shown() is False
def test_request_patch_approval():
manager = PatchModalManager()
patch_text = "--- a/test.py\n+++ b/test.py\n@@ -1 +1 @@\n-old\n+new"
file_paths = ["test.py"]
result = manager.request_patch_approval(patch_text, file_paths)
assert result is True
assert manager.is_modal_shown() is True
pending = manager.get_pending_patch()
assert pending is not None
assert pending.patch_text == patch_text
assert pending.file_paths == file_paths
def test_reject_patch():
manager = PatchModalManager()
manager.request_patch_approval("diff", ["file.py"])
manager.reject_patch()
assert manager.get_pending_patch() is None
assert manager.is_modal_shown() is False
def test_close_modal():
manager = PatchModalManager()
manager.request_patch_approval("diff", ["file.py"])
manager.close_modal()
assert manager.is_modal_shown() is False
def test_apply_callback():
manager = PatchModalManager()
called = []
def apply_fn(patch: str) -> bool:
called.append(patch)
return True
manager.set_apply_callback(apply_fn)
patch_text = "--- a/test.py\n+++ b/test.py\n"
result = manager.apply_patch(patch_text)
assert result is True
assert len(called) == 1
assert called[0] == patch_text
def test_reject_callback():
manager = PatchModalManager()
called = []
def reject_fn() -> None:
called.append(True)
manager.set_reject_callback(reject_fn)
manager.reject_patch()
assert len(called) == 1
def test_reset():
manager = PatchModalManager()
manager.request_patch_approval("diff", ["file.py"])
manager.set_apply_callback(lambda x: True)
manager.set_reject_callback(lambda: None)
manager.reset()
assert manager.get_pending_patch() is None
assert manager.is_modal_shown() is False
def test_get_patch_modal_manager_singleton():
reset_patch_modal_manager()
mgr1 = get_patch_modal_manager()
mgr2 = get_patch_modal_manager()
assert mgr1 is mgr2
reset_patch_modal_manager()

View File

@@ -130,5 +130,5 @@ def test_gemini_provider_passes_qa_callback_to_run_script() -> None:
base_dir=".", base_dir=".",
qa_callback=qa_callback qa_callback=qa_callback
) )
# Verify _run_script received the qa_callback # Verify _run_script received the qa_callback and patch_callback
mock_run_script.assert_called_with("dir", ".", qa_callback) mock_run_script.assert_called_with("dir", ".", qa_callback, None)