feat(security): Enforce blacklist for discussion history files
This commit is contained in:
18
aggregate.py
18
aggregate.py
@@ -37,14 +37,24 @@ def is_absolute_with_drive(entry: str) -> bool:
|
|||||||
def resolve_paths(base_dir: Path, entry: str) -> list[Path]:
|
def resolve_paths(base_dir: Path, entry: str) -> list[Path]:
|
||||||
has_drive = is_absolute_with_drive(entry)
|
has_drive = is_absolute_with_drive(entry)
|
||||||
is_wildcard = "*" in entry
|
is_wildcard = "*" in entry
|
||||||
|
|
||||||
|
matches = []
|
||||||
if is_wildcard:
|
if is_wildcard:
|
||||||
root = Path(entry) if has_drive else base_dir / entry
|
root = Path(entry) if has_drive else base_dir / entry
|
||||||
matches = [Path(p) for p in glob.glob(str(root), recursive=True) if Path(p).is_file()]
|
matches = [Path(p) for p in glob.glob(str(root), recursive=True) if Path(p).is_file()]
|
||||||
return sorted(matches)
|
|
||||||
else:
|
else:
|
||||||
if has_drive:
|
p = Path(entry) if has_drive else (base_dir / entry).resolve()
|
||||||
return [Path(entry)]
|
matches = [p]
|
||||||
return [(base_dir / entry).resolve()]
|
|
||||||
|
# Blacklist filter
|
||||||
|
filtered = []
|
||||||
|
for p in matches:
|
||||||
|
name = p.name.lower()
|
||||||
|
if name == "history.toml" or name.endswith("_history.toml"):
|
||||||
|
continue
|
||||||
|
filtered.append(p)
|
||||||
|
|
||||||
|
return sorted(filtered)
|
||||||
|
|
||||||
def build_discussion_section(history: list[str]) -> str:
|
def build_discussion_section(history: list[str]) -> str:
|
||||||
sections = []
|
sections = []
|
||||||
|
|||||||
@@ -87,7 +87,14 @@ def _is_allowed(path: Path) -> bool:
|
|||||||
- it is contained within (or equal to) one of the _base_dirs
|
- it is contained within (or equal to) one of the _base_dirs
|
||||||
All paths are resolved (follows symlinks) before comparison to prevent
|
All paths are resolved (follows symlinks) before comparison to prevent
|
||||||
symlink-based path traversal.
|
symlink-based path traversal.
|
||||||
|
|
||||||
|
CRITICAL: Blacklisted files (history) are NEVER allowed.
|
||||||
"""
|
"""
|
||||||
|
# Blacklist check
|
||||||
|
name = path.name.lower()
|
||||||
|
if name == "history.toml" or name.endswith("_history.toml"):
|
||||||
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
rp = path.resolve(strict=True)
|
rp = path.resolve(strict=True)
|
||||||
except (OSError, ValueError):
|
except (OSError, ValueError):
|
||||||
@@ -153,11 +160,18 @@ def list_directory(path: str) -> str:
|
|||||||
try:
|
try:
|
||||||
entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
|
entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
|
||||||
lines = [f"Directory: {p}", ""]
|
lines = [f"Directory: {p}", ""]
|
||||||
|
count = 0
|
||||||
for entry in entries:
|
for entry in entries:
|
||||||
|
# Blacklist check
|
||||||
|
name = entry.name.lower()
|
||||||
|
if name == "history.toml" or name.endswith("_history.toml"):
|
||||||
|
continue
|
||||||
|
|
||||||
kind = "file" if entry.is_file() else "dir "
|
kind = "file" if entry.is_file() else "dir "
|
||||||
size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else ""
|
size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else ""
|
||||||
lines.append(f" [{kind}] {entry.name:<40} {size}")
|
lines.append(f" [{kind}] {entry.name:<40} {size}")
|
||||||
lines.append(f" ({len(entries)} entries)")
|
count += 1
|
||||||
|
lines.append(f" ({count} entries)")
|
||||||
return "\n".join(lines)
|
return "\n".join(lines)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return f"ERROR listing '{path}': {e}"
|
return f"ERROR listing '{path}': {e}"
|
||||||
@@ -178,11 +192,18 @@ def search_files(path: str, pattern: str) -> str:
|
|||||||
if not matches:
|
if not matches:
|
||||||
return f"No files matched '{pattern}' in {path}"
|
return f"No files matched '{pattern}' in {path}"
|
||||||
lines = [f"Search '{pattern}' in {p}:", ""]
|
lines = [f"Search '{pattern}' in {p}:", ""]
|
||||||
|
count = 0
|
||||||
for m in matches:
|
for m in matches:
|
||||||
|
# Blacklist check
|
||||||
|
name = m.name.lower()
|
||||||
|
if name == "history.toml" or name.endswith("_history.toml"):
|
||||||
|
continue
|
||||||
|
|
||||||
rel = m.relative_to(p)
|
rel = m.relative_to(p)
|
||||||
kind = "file" if m.is_file() else "dir "
|
kind = "file" if m.is_file() else "dir "
|
||||||
lines.append(f" [{kind}] {rel}")
|
lines.append(f" [{kind}] {rel}")
|
||||||
lines.append(f" ({len(matches)} match(es))")
|
count += 1
|
||||||
|
lines.append(f" ({count} match(es))")
|
||||||
return "\n".join(lines)
|
return "\n".join(lines)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return f"ERROR searching '{path}': {e}"
|
return f"ERROR searching '{path}': {e}"
|
||||||
|
|||||||
32
tests/test_history_blacklist.py
Normal file
32
tests/test_history_blacklist.py
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
import pytest
|
||||||
|
from pathlib import Path
|
||||||
|
import mcp_client
|
||||||
|
import aggregate
|
||||||
|
|
||||||
|
def test_mcp_blacklist(tmp_path):
|
||||||
|
# Setup a "history" file
|
||||||
|
hist_file = tmp_path / "my_project_history.toml"
|
||||||
|
hist_file.write_text("secret history", encoding="utf-8")
|
||||||
|
|
||||||
|
# Configure MCP client with the tmp_path as allowed
|
||||||
|
mcp_client.configure([{"path": str(hist_file)}], extra_base_dirs=[str(tmp_path)])
|
||||||
|
|
||||||
|
# Try to read it - should fail
|
||||||
|
result = mcp_client.read_file(str(hist_file))
|
||||||
|
assert "ACCESS DENIED" in result or "BLACKLISTED" in result
|
||||||
|
|
||||||
|
# Try to list it
|
||||||
|
result = mcp_client.list_directory(str(tmp_path))
|
||||||
|
assert "my_project_history.toml" not in result
|
||||||
|
|
||||||
|
def test_aggregate_blacklist(tmp_path):
|
||||||
|
# Setup a "history" file
|
||||||
|
hist_file = tmp_path / "my_project_history.toml"
|
||||||
|
hist_file.write_text("secret history", encoding="utf-8")
|
||||||
|
|
||||||
|
# Try to resolve paths including the history file
|
||||||
|
paths = aggregate.resolve_paths(tmp_path, "*_history.toml")
|
||||||
|
assert hist_file not in paths
|
||||||
|
|
||||||
|
paths = aggregate.resolve_paths(tmp_path, "*")
|
||||||
|
assert hist_file not in paths
|
||||||
Reference in New Issue
Block a user