feat(shell): safe subprocess execution with policy checks and logging
This commit is contained in:
21
src/rook/shell.py
Normal file
21
src/rook/shell.py
Normal file
@@ -0,0 +1,21 @@
|
||||
import logging
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
from rook.policy import is_approved_dir, confirm_spawn
|
||||
|
||||
logger = logging.getLogger('rook.shell')
|
||||
|
||||
|
||||
class PolicyError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def run_shell(cmd: str, cwd: str = '.') -> str:
|
||||
if not is_approved_dir(cwd):
|
||||
if not confirm_spawn(f'shell: {cmd}', f'cwd={cwd}'):
|
||||
raise PolicyError(f'Shell command denied: {cmd!r} in {cwd!r}')
|
||||
logger.info('[%s] run_shell: %s', datetime.utcnow().isoformat(), cmd)
|
||||
result = subprocess.run(cmd, shell=True, cwd=cwd, capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f'Command failed (rc={result.returncode}): {result.stderr}')
|
||||
return result.stdout
|
||||
42
tests/test_shell.py
Normal file
42
tests/test_shell.py
Normal file
@@ -0,0 +1,42 @@
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from rook.shell import run_shell, PolicyError
|
||||
|
||||
|
||||
def test_run_shell_in_approved_dir():
|
||||
with patch('rook.shell.is_approved_dir', return_value=True):
|
||||
with patch('rook.shell.subprocess.run', return_value=MagicMock(stdout='hello\n', returncode=0)):
|
||||
result = run_shell('echo hello', cwd='/fake/dir')
|
||||
assert result == 'hello\n'
|
||||
|
||||
|
||||
def test_run_shell_denied_raises_policy_error():
|
||||
with patch('rook.shell.is_approved_dir', return_value=False):
|
||||
with patch('rook.shell.confirm_spawn', return_value=False):
|
||||
with pytest.raises(PolicyError):
|
||||
run_shell('ls', cwd='/tmp')
|
||||
|
||||
|
||||
def test_run_shell_confirmed_outside_approved():
|
||||
mock_confirm = MagicMock(return_value=True)
|
||||
with patch('rook.shell.is_approved_dir', return_value=False):
|
||||
with patch('rook.shell.confirm_spawn', mock_confirm):
|
||||
with patch('rook.shell.subprocess.run', return_value=MagicMock(stdout='ok', returncode=0)):
|
||||
result = run_shell('ls', cwd='/tmp')
|
||||
assert result == 'ok'
|
||||
mock_confirm.assert_called_once()
|
||||
|
||||
|
||||
def test_run_shell_logs_call():
|
||||
with patch('rook.shell.is_approved_dir', return_value=True):
|
||||
with patch('rook.shell.subprocess.run', return_value=MagicMock(stdout='hi\n', returncode=0)):
|
||||
with patch('logging.Logger.info') as mock_log:
|
||||
run_shell('echo hi', cwd='/fake')
|
||||
assert mock_log.called
|
||||
|
||||
|
||||
def test_run_shell_nonzero_returncode_raises():
|
||||
with patch('rook.shell.is_approved_dir', return_value=True):
|
||||
with patch('rook.shell.subprocess.run', return_value=MagicMock(stdout='', stderr='err', returncode=1)):
|
||||
with pytest.raises(RuntimeError):
|
||||
run_shell('bad', cwd='/fake')
|
||||
Reference in New Issue
Block a user