feat(files): read/write/delete with backup-before-edit and policy gates
This commit is contained in:
32
src/rook/files.py
Normal file
32
src/rook/files.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import os
|
||||
from rook.policy import is_approved_dir, confirm_spawn, backup_before_edit
|
||||
|
||||
|
||||
class PolicyError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def read_file(path: str) -> str:
|
||||
if not is_approved_dir(path):
|
||||
if not confirm_spawn("read file", path):
|
||||
raise PolicyError(f"Access denied: {path}")
|
||||
with open(path) as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def write_file(path: str, content: str) -> None:
|
||||
if not is_approved_dir(path):
|
||||
if not confirm_spawn("write file", path):
|
||||
raise PolicyError(f"Access denied: {path}")
|
||||
if os.path.exists(path):
|
||||
backup_before_edit(path)
|
||||
with open(path, "w") as f:
|
||||
f.write(content)
|
||||
|
||||
|
||||
def delete_file(path: str) -> None:
|
||||
if not is_approved_dir(path):
|
||||
pass
|
||||
if not confirm_spawn("delete file", path):
|
||||
raise PolicyError(f"Delete denied: {path}")
|
||||
os.remove(path)
|
||||
62
tests/test_files.py
Normal file
62
tests/test_files.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import os
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
|
||||
from rook.files import read_file, write_file, delete_file, PolicyError
|
||||
|
||||
|
||||
def test_read_file_returns_content(tmp_path):
|
||||
tmp_file = tmp_path / "hello.txt"
|
||||
tmp_file.write_text("hello world")
|
||||
with patch("rook.files.is_approved_dir", return_value=True):
|
||||
result = read_file(str(tmp_file))
|
||||
assert result == "hello world"
|
||||
|
||||
|
||||
def test_read_file_unapproved_raises():
|
||||
with patch("rook.files.is_approved_dir", return_value=False), \
|
||||
patch("rook.files.confirm_spawn", return_value=False):
|
||||
with pytest.raises(PolicyError):
|
||||
read_file("/etc/passwd")
|
||||
|
||||
|
||||
def test_write_file_creates_backup(tmp_path):
|
||||
tmp_file = tmp_path / "data.txt"
|
||||
tmp_file.write_text("old")
|
||||
with patch("rook.files.is_approved_dir", return_value=True):
|
||||
write_file(str(tmp_file), "new")
|
||||
bak_path = str(tmp_file) + ".bak"
|
||||
assert os.path.exists(bak_path)
|
||||
with open(bak_path) as f:
|
||||
assert f.read() == "old"
|
||||
assert tmp_file.read_text() == "new"
|
||||
|
||||
|
||||
def test_write_file_new_file_no_backup(tmp_path):
|
||||
path = str(tmp_path / "new.txt")
|
||||
with patch("rook.files.is_approved_dir", return_value=True), \
|
||||
patch("rook.files.backup_before_edit"):
|
||||
write_file(path, "content")
|
||||
assert os.path.exists(path)
|
||||
with open(path) as f:
|
||||
assert f.read() == "content"
|
||||
assert not os.path.exists(path + ".bak")
|
||||
|
||||
|
||||
def test_delete_file_requires_confirmation_and_deletes(tmp_path):
|
||||
tmp_file = tmp_path / "todelete.txt"
|
||||
tmp_file.write_text("bye")
|
||||
with patch("rook.files.is_approved_dir", return_value=True), \
|
||||
patch("rook.files.confirm_spawn", return_value=True):
|
||||
delete_file(str(tmp_file))
|
||||
assert not tmp_file.exists()
|
||||
|
||||
|
||||
def test_delete_file_denied_leaves_file(tmp_path):
|
||||
tmp_file = tmp_path / "safe.txt"
|
||||
tmp_file.write_text("keep me")
|
||||
with patch("rook.files.is_approved_dir", return_value=True), \
|
||||
patch("rook.files.confirm_spawn", return_value=False):
|
||||
with pytest.raises(PolicyError):
|
||||
delete_file(str(tmp_file))
|
||||
assert tmp_file.exists()
|
||||
Reference in New Issue
Block a user