feat(mma): Implement Tier 4 QA interceptor in shell_runner.py
This commit is contained in:
@@ -1,14 +1,16 @@
|
||||
# shell_runner.py
|
||||
import subprocess, shutil
|
||||
from pathlib import Path
|
||||
from typing import Callable, Optional
|
||||
|
||||
TIMEOUT_SECONDS = 60
|
||||
|
||||
def run_powershell(script: str, base_dir: str) -> str:
|
||||
def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str:
|
||||
"""
|
||||
Run a PowerShell script with working directory set to base_dir.
|
||||
Returns a string combining stdout, stderr, and exit code.
|
||||
Raises nothing - all errors are captured into the return string.
|
||||
If qa_callback is provided and the command fails or has stderr,
|
||||
the callback is called with the stderr content and its result is appended.
|
||||
"""
|
||||
safe_dir = str(base_dir).replace("'", "''")
|
||||
full_script = f"Set-Location -LiteralPath '{safe_dir}'\n{script}"
|
||||
@@ -25,6 +27,13 @@ def run_powershell(script: str, base_dir: str) -> str:
|
||||
if r.stdout.strip(): parts.append(f"STDOUT:\n{r.stdout.strip()}")
|
||||
if r.stderr.strip(): parts.append(f"STDERR:\n{r.stderr.strip()}")
|
||||
parts.append(f"EXIT CODE: {r.returncode}")
|
||||
|
||||
# QA Interceptor logic
|
||||
if (r.returncode != 0 or r.stderr.strip()) and qa_callback:
|
||||
qa_analysis = qa_callback(r.stderr.strip())
|
||||
if qa_analysis:
|
||||
parts.append(f"\nQA ANALYSIS:\n{qa_analysis}")
|
||||
|
||||
return "\n".join(parts)
|
||||
except subprocess.TimeoutExpired: return f"ERROR: timed out after {TIMEOUT_SECONDS}s"
|
||||
except Exception as e: return f"ERROR: {e}"
|
||||
|
||||
Reference in New Issue
Block a user