Harden reliability, security, and UX across core modules
- Add thread safety: _anthropic_history_lock and _send_lock in ai_client to prevent concurrent corruption - Add _send_thread_lock in gui_2 for atomic check-and-start of send thread - Add atexit fallback in session_logger to flush log files on abnormal exit - Fix file descriptor leaks: use context managers for urlopen in mcp_client - Cap unbounded tool output growth at 500KB per send() call (both Gemini and Anthropic) - Harden path traversal: resolve(strict=True) with fallback in mcp_client allowlist checks - Add SLOP_CREDENTIALS env var override for credentials.toml with helpful error message - Fix Gemini token heuristic: use _CHARS_PER_TOKEN (3.5) instead of hardcoded // 4 - Add keyboard shortcuts: Ctrl+Enter to send, Ctrl+L to clear message input - Add auto-save: flush project and config to disk every 60 seconds
This commit is contained in:
@@ -65,7 +65,10 @@ def configure(file_items: list[dict], extra_base_dirs: list[str] | None = None):
|
||||
for item in file_items:
|
||||
p = item.get("path")
|
||||
if p is not None:
|
||||
rp = Path(p).resolve()
|
||||
try:
|
||||
rp = Path(p).resolve(strict=True)
|
||||
except (OSError, ValueError):
|
||||
rp = Path(p).resolve()
|
||||
_allowed_paths.add(rp)
|
||||
_base_dirs.add(rp.parent)
|
||||
|
||||
@@ -82,8 +85,13 @@ def _is_allowed(path: Path) -> bool:
|
||||
A path is allowed if:
|
||||
- it is explicitly in _allowed_paths, OR
|
||||
- it is contained within (or equal to) one of the _base_dirs
|
||||
All paths are resolved (follows symlinks) before comparison to prevent
|
||||
symlink-based path traversal.
|
||||
"""
|
||||
rp = path.resolve()
|
||||
try:
|
||||
rp = path.resolve(strict=True)
|
||||
except (OSError, ValueError):
|
||||
rp = path.resolve()
|
||||
if rp in _allowed_paths:
|
||||
return True
|
||||
for bd in _base_dirs:
|
||||
@@ -104,7 +112,10 @@ def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]:
|
||||
p = Path(raw_path)
|
||||
if not p.is_absolute() and _primary_base_dir:
|
||||
p = _primary_base_dir / p
|
||||
p = p.resolve()
|
||||
try:
|
||||
p = p.resolve(strict=True)
|
||||
except (OSError, ValueError):
|
||||
p = p.resolve()
|
||||
except Exception as e:
|
||||
return None, f"ERROR: invalid path '{raw_path}': {e}"
|
||||
if not _is_allowed(p):
|
||||
@@ -269,7 +280,8 @@ def web_search(query: str) -> str:
|
||||
url = "https://html.duckduckgo.com/html/?q=" + urllib.parse.quote(query)
|
||||
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})
|
||||
try:
|
||||
html = urllib.request.urlopen(req, timeout=10).read().decode('utf-8', errors='ignore')
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
html = resp.read().decode('utf-8', errors='ignore')
|
||||
parser = _DDGParser()
|
||||
parser.feed(html)
|
||||
if not parser.results:
|
||||
@@ -292,7 +304,8 @@ def fetch_url(url: str) -> str:
|
||||
|
||||
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})
|
||||
try:
|
||||
html = urllib.request.urlopen(req, timeout=10).read().decode('utf-8', errors='ignore')
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
html = resp.read().decode('utf-8', errors='ignore')
|
||||
parser = _TextExtractor()
|
||||
parser.feed(html)
|
||||
full_text = " ".join(parser.text)
|
||||
|
||||
Reference in New Issue
Block a user