Files
manual_slop/shell_runner.py

82 lines
3.2 KiB
Python

# shell_runner.py
import os, subprocess, shutil
from pathlib import Path
from typing import Callable, Optional
try:
import tomllib
except ImportError:
import tomli as tomllib # type: ignore[no-redef]
TIMEOUT_SECONDS: int = 60
_ENV_CONFIG: dict = {}
def _load_env_config() -> dict:
"""Load mcp_env.toml from project root (sibling of this file or parent dir)."""
candidates = [
Path(__file__).parent / "mcp_env.toml",
Path(__file__).parent.parent / "mcp_env.toml",
]
for p in candidates:
if p.exists():
with open(p, "rb") as f:
return tomllib.load(f)
return {}
def _build_subprocess_env() -> dict[str, str]:
"""Build env dict for subprocess: current env + mcp_env.toml overrides."""
global _ENV_CONFIG
if not _ENV_CONFIG:
_ENV_CONFIG = _load_env_config()
env = os.environ.copy()
# Apply [path].prepend entries
prepend_dirs = _ENV_CONFIG.get("path", {}).get("prepend", [])
if prepend_dirs:
env["PATH"] = os.pathsep.join(prepend_dirs) + os.pathsep + env.get("PATH", "")
# Apply [env] key-value pairs, expanding ${VAR} references
for key, val in _ENV_CONFIG.get("env", {}).items():
env[key] = os.path.expandvars(str(val))
return env
def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str:
"""
Run a PowerShell script with working directory set to base_dir.
Returns a string combining stdout, stderr, and exit code.
Environment is configured via mcp_env.toml (project root).
If qa_callback is provided and the command fails or has stderr,
the callback is called with the stderr content and its result is appended.
"""
safe_dir: str = str(base_dir).replace("'", "''")
full_script: str = f"Set-Location -LiteralPath '{safe_dir}'\n{script}"
exe: Optional[str] = next((x for x in ["powershell.exe", "pwsh.exe", "powershell", "pwsh"] if shutil.which(x)), None)
if not exe: return "ERROR: Neither powershell nor pwsh found in PATH"
try:
process = subprocess.Popen(
[exe, "-NoProfile", "-NonInteractive", "-Command", full_script],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
cwd=base_dir, env=_build_subprocess_env(),
)
stdout, stderr = process.communicate(timeout=TIMEOUT_SECONDS)
parts: list[str] = []
if stdout.strip(): parts.append(f"STDOUT:\n{stdout.strip()}")
if stderr.strip(): parts.append(f"STDERR:\n{stderr.strip()}")
parts.append(f"EXIT CODE: {process.returncode}")
if (process.returncode != 0 or stderr.strip()) and qa_callback:
qa_analysis: Optional[str] = qa_callback(stderr.strip())
if qa_analysis:
parts.append(f"\nQA ANALYSIS:\n{qa_analysis}")
return "\n".join(parts)
except subprocess.TimeoutExpired:
if 'process' in locals() and process:
subprocess.run(["taskkill", "/F", "/T", "/PID", str(process.pid)], capture_output=True)
return f"ERROR: timed out after {TIMEOUT_SECONDS}s"
except KeyboardInterrupt:
if 'process' in locals() and process:
subprocess.run(["taskkill", "/F", "/T", "/PID", str(process.pid)], capture_output=True)
raise
except Exception as e:
if 'process' in locals() and process:
subprocess.run(["taskkill", "/F", "/T", "/PID", str(process.pid)], capture_output=True)
return f"ERROR: {e}"