- Add patch_callback parameter throughout the tool execution chain - Add _render_patch_modal() to gui_2.py with colored diff display - Add patch modal state variables to App.__init__ - Add request_patch_from_tier4() to trigger patch generation - Add run_tier4_patch_callback() to ai_client.py - Update shell_runner to accept and execute patch_callback - Diff colors: green for additions, red for deletions, cyan for headers - 36 tests passing
93 lines
3.7 KiB
Python
93 lines
3.7 KiB
Python
# shell_runner.py
|
|
import os
|
|
import subprocess
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Callable, Optional
|
|
|
|
try:
|
|
import tomllib
|
|
except ImportError:
|
|
import tomli as tomllib # type: ignore[no-redef]
|
|
|
|
TIMEOUT_SECONDS: int = 60
|
|
_ENV_CONFIG: dict = {}
|
|
|
|
def _load_env_config() -> dict:
|
|
"""Load mcp_env.toml from project root (sibling of this file or parent dir)."""
|
|
env_path = os.environ.get("SLOP_MCP_ENV")
|
|
if env_path and Path(env_path).exists():
|
|
with open(env_path, "rb") as f:
|
|
return tomllib.load(f)
|
|
candidates = [
|
|
Path(__file__).parent / "mcp_env.toml",
|
|
Path(__file__).parent.parent / "mcp_env.toml",
|
|
]
|
|
for p in candidates:
|
|
if p.exists():
|
|
with open(p, "rb") as f:
|
|
return tomllib.load(f)
|
|
return {}
|
|
|
|
def _build_subprocess_env() -> dict[str, str]:
|
|
"""Build env dict for subprocess: current env + mcp_env.toml overrides."""
|
|
global _ENV_CONFIG
|
|
if not _ENV_CONFIG:
|
|
_ENV_CONFIG = _load_env_config()
|
|
env = os.environ.copy()
|
|
# Apply [path].prepend entries
|
|
prepend_dirs = _ENV_CONFIG.get("path", {}).get("prepend", [])
|
|
if prepend_dirs:
|
|
env["PATH"] = os.pathsep.join(prepend_dirs) + os.pathsep + env.get("PATH", "")
|
|
# Apply [env] key-value pairs, expanding ${VAR} references
|
|
for key, val in _ENV_CONFIG.get("env", {}).items():
|
|
env[key] = os.path.expandvars(str(val))
|
|
return env
|
|
|
|
def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> str:
|
|
"""
|
|
Run a PowerShell script with working directory set to base_dir.
|
|
Returns a string combining stdout, stderr, and exit code.
|
|
Environment is configured via mcp_env.toml (project root).
|
|
If qa_callback is provided and the command fails or has stderr,
|
|
the callback is called with the stderr content and its result is appended.
|
|
If patch_callback is provided, it receives (error, file_context) and returns patch text.
|
|
"""
|
|
safe_dir: str = str(base_dir).replace("'", "''")
|
|
full_script: str = f"Set-Location -LiteralPath '{safe_dir}'\n{script}"
|
|
exe: Optional[str] = next((x for x in ["powershell.exe", "pwsh.exe", "powershell", "pwsh"] if shutil.which(x)), None)
|
|
if not exe: return "ERROR: Neither powershell nor pwsh found in PATH"
|
|
try:
|
|
process = subprocess.Popen(
|
|
[exe, "-NoProfile", "-NonInteractive", "-Command", full_script],
|
|
stdin=subprocess.DEVNULL,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
|
|
cwd=base_dir, env=_build_subprocess_env(),
|
|
)
|
|
stdout, stderr = process.communicate(timeout=TIMEOUT_SECONDS)
|
|
parts: list[str] = []
|
|
if stdout.strip(): parts.append(f"STDOUT:\n{stdout.strip()}")
|
|
if stderr.strip(): parts.append(f"STDERR:\n{stderr.strip()}")
|
|
parts.append(f"EXIT CODE: {process.returncode}")
|
|
if (process.returncode != 0 or stderr.strip()) and qa_callback:
|
|
qa_analysis: Optional[str] = qa_callback(stderr.strip())
|
|
if qa_analysis:
|
|
parts.append(f"\nQA ANALYSIS:\n{qa_analysis}")
|
|
if patch_callback and (process.returncode != 0 or stderr.strip()):
|
|
patch_text = patch_callback(stderr.strip(), base_dir)
|
|
if patch_text:
|
|
parts.append(f"\nAUTO_PATCH:\n{patch_text}")
|
|
return "\n".join(parts)
|
|
except subprocess.TimeoutExpired:
|
|
if 'process' in locals() and process:
|
|
subprocess.run(["taskkill", "/F", "/T", "/PID", str(process.pid)], capture_output=True)
|
|
return f"ERROR: timed out after {TIMEOUT_SECONDS}s"
|
|
except KeyboardInterrupt:
|
|
if 'process' in locals() and process:
|
|
subprocess.run(["taskkill", "/F", "/T", "/PID", str(process.pid)], capture_output=True)
|
|
raise
|
|
except Exception as e:
|
|
if 'process' in locals() and process:
|
|
subprocess.run(["taskkill", "/F", "/T", "/PID", str(process.pid)], capture_output=True)
|
|
return f"ERROR: {e}"
|