diff --git a/src/rook/shell.py b/src/rook/shell.py new file mode 100644 index 0000000..691327f --- /dev/null +++ b/src/rook/shell.py @@ -0,0 +1,21 @@ +import logging +import subprocess +from datetime import datetime +from rook.policy import is_approved_dir, confirm_spawn + +logger = logging.getLogger('rook.shell') + + +class PolicyError(Exception): + pass + + +def run_shell(cmd: str, cwd: str = '.') -> str: + if not is_approved_dir(cwd): + if not confirm_spawn(f'shell: {cmd}', f'cwd={cwd}'): + raise PolicyError(f'Shell command denied: {cmd!r} in {cwd!r}') + logger.info('[%s] run_shell: %s', datetime.utcnow().isoformat(), cmd) + result = subprocess.run(cmd, shell=True, cwd=cwd, capture_output=True, text=True) + if result.returncode != 0: + raise RuntimeError(f'Command failed (rc={result.returncode}): {result.stderr}') + return result.stdout diff --git a/tests/test_shell.py b/tests/test_shell.py new file mode 100644 index 0000000..1cbd854 --- /dev/null +++ b/tests/test_shell.py @@ -0,0 +1,42 @@ +import pytest +from unittest.mock import patch, MagicMock +from rook.shell import run_shell, PolicyError + + +def test_run_shell_in_approved_dir(): + with patch('rook.shell.is_approved_dir', return_value=True): + with patch('rook.shell.subprocess.run', return_value=MagicMock(stdout='hello\n', returncode=0)): + result = run_shell('echo hello', cwd='/fake/dir') + assert result == 'hello\n' + + +def test_run_shell_denied_raises_policy_error(): + with patch('rook.shell.is_approved_dir', return_value=False): + with patch('rook.shell.confirm_spawn', return_value=False): + with pytest.raises(PolicyError): + run_shell('ls', cwd='/tmp') + + +def test_run_shell_confirmed_outside_approved(): + mock_confirm = MagicMock(return_value=True) + with patch('rook.shell.is_approved_dir', return_value=False): + with patch('rook.shell.confirm_spawn', mock_confirm): + with patch('rook.shell.subprocess.run', return_value=MagicMock(stdout='ok', returncode=0)): + result = run_shell('ls', cwd='/tmp') + assert result == 'ok' + mock_confirm.assert_called_once() + + +def test_run_shell_logs_call(): + with patch('rook.shell.is_approved_dir', return_value=True): + with patch('rook.shell.subprocess.run', return_value=MagicMock(stdout='hi\n', returncode=0)): + with patch('logging.Logger.info') as mock_log: + run_shell('echo hi', cwd='/fake') + assert mock_log.called + + +def test_run_shell_nonzero_returncode_raises(): + with patch('rook.shell.is_approved_dir', return_value=True): + with patch('rook.shell.subprocess.run', return_value=MagicMock(stdout='', stderr='err', returncode=1)): + with pytest.raises(RuntimeError): + run_shell('bad', cwd='/fake')