Private
Public Access
0
0

Merge branch 'profiling-stuff'

# Conflicts:
#	config.toml
#	manual_slop_history.toml
This commit is contained in:
2026-06-07 02:15:50 -04:00
9 changed files with 1600 additions and 1279 deletions
+1 -1
View File
@@ -56,7 +56,7 @@ if __name__ == "__main__":
runner_params = hello_imgui.RunnerParams()
runner_params.app_window_params.window_title = "Manual Slop (Web)"
runner_params.app_window_params.borderless = True
runner_params.imgui_window_params.default_imgui_window_type = hello_imgui.DefaultImGuiWindowType.provide_full_screen_dock_space
runner_params.imgui_window_params.default_imgui_window_type = hello_imgui.DefaultImGuiWindowType.provide_full_screen_docker_space
runner_params.app_window_params.restore_previous_window_size = True
with startup_profiler.phase("hello_imgui_run"):
+912 -903
View File
File diff suppressed because it is too large Load Diff
+24 -1
View File
@@ -60,7 +60,14 @@ class _LazyModule:
if self._attr_name is None:
self._cached = mod
else:
self._cached = getattr(mod, self._attr_name)
try:
self._cached = getattr(mod, self._attr_name)
except AttributeError:
sub_mod_name = f"{self._module_name}.{self._attr_name}"
try:
self._cached = _importlib.import_module(sub_mod_name)
except (ImportError, ModuleNotFoundError):
self._cached = _FiledialogStub()
return self._cached
def __getattr__(self, name: str) -> _Any:
@@ -69,6 +76,22 @@ class _LazyModule:
def __call__(self, *args: _Any, **kwargs: _Any) -> _Any:
return self._resolve()(*args, **kwargs)
class _FiledialogStub:
"""No-op replacement for tkinter.filedialog on Python installs where
the Tcl/Tk runtime is missing (e.g. embedded Python, slim Docker images).
All dialog functions return safe empty sentinels so call sites that do
`if p and p not in app.x: app.x.append(p)` treat a missing dialog as a
no-op. Exposes a `available` flag so the UI can detect the stub and
offer an ImGui-based path input as an alternative.
[C: src/gui_2.py:_LazyModule._resolve]
"""
available: bool = False
def askopenfilename(self, *args: _Any, **kwargs: _Any) -> str: return ""
def askopenfilenames(self, *args: _Any, **kwargs: _Any) -> tuple: return ()
def askdirectory(self, *args: _Any, **kwargs: _Any) -> str: return ""
def asksaveasfilename(self, *args: _Any, **kwargs: _Any) -> str: return ""
# Heavy modules that were previously top-level imports (now lazy):
np = _LazyModule("numpy") # was: import numpy as np
filedialog = _LazyModule("tkinter", "filedialog") # was: from tkinter import filedialog
+16
View File
@@ -1,3 +1,19 @@
"""Shared AppController I/O pool factory.
Historical note: an earlier revision of this module registered an
``atexit.register(pool.shutdown, wait=False)`` handler here, mirroring
the conftest fix at commit 8957c9a5. That approach was reverted because
it does not solve the Ctrl+C hang in ``sloppy.py`` when a worker is
mid-task (e.g. a long-running Gemini/Anthropic HTTP request): atexit
handlers do not fire at all in that scenario, so the process still hangs
in ``ThreadPoolExecutor.__del__`` -> ``shutdown(wait=True)`` during
finalization.
The production fix lives in ``AppController.__init__`` as a SIGINT
handler that drains the pool and calls ``os._exit(0)``, sidestepping
the broken finalization chain. See commit log for details.
"""
from concurrent.futures import ThreadPoolExecutor
+338 -373
View File
@@ -134,16 +134,14 @@ def configure(file_items: list[dict[str, Any]], extra_base_dirs: list[str] | Non
def _is_allowed(path: Path) -> bool:
"""
Return True if `path` is within the allowlist.
A path is allowed if:
- it is explicitly in _allowed_paths, OR
- it is contained within (or equal to) one of the _base_dirs
All paths are resolved (follows symlinks) before comparison to prevent
symlink-based path traversal.
Return True if `path` is within the allowlist.
A path is allowed if:
- it is explicitly in _allowed_paths, OR
- it is contained within (or equal to) one of the _base_dirs
All paths are resolved (follows symlinks) before comparison to prevent
symlink-based path traversal.
CRITICAL: Blacklisted files (history) are NEVER allowed.
CRITICAL: Blacklisted files (history) are NEVER allowed.
[C: tests/test_arch_boundary_phase1.py:TestArchBoundaryPhase1.test_mcp_client_whitelist_enforcement, tests/test_history_management.py:test_mcp_blacklist]
"""
from src.paths import get_config_path, get_credentials_path
@@ -181,10 +179,8 @@ def _is_allowed(path: Path) -> bool:
def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]:
"""
Resolve raw_path and verify it passes the allowlist check.
Returns (resolved_path, error_string). error_string is empty on success.
Resolve raw_path and verify it passes the allowlist check.
Returns (resolved_path, error_string). error_string is empty on success.
"""
try:
p = Path(raw_path)
@@ -202,49 +198,10 @@ def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]:
return p, ""
# ------------------------------------------------------------------ tool implementations
def read_file(path: str) -> str:
"""Return the UTF-8 content of a file, or an error string."""
p, err = _resolve_and_check(path)
if err or p is None:
return err
if not p.exists(): return f"ERROR: file not found: {path}"
if not p.is_file(): return f"ERROR: not a file: {path}"
try:
return p.read_text(encoding="utf-8")
except Exception as e:
return f"ERROR reading '{path}': {e}"
def list_directory(path: str) -> str:
"""List entries in a directory. Returns a compact text table."""
p, err = _resolve_and_check(path)
if err or p is None:
return err
if not p.exists(): return f"ERROR: path not found: {path}"
if not p.is_dir(): return f"ERROR: not a directory: {path}"
try:
entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
lines = [f"Directory: {p}", ""]
count = 0
for entry in entries:
# Blacklist check
name = entry.name.lower()
if name == "history.toml" or name.endswith("_history.toml"):
continue
kind = "file" if entry.is_file() else "dir "
size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else ""
lines.append(f" [{kind}] {entry.name:<40} {size}")
count += 1
lines.append(f" ({count} entries)")
return "\n".join(lines)
except Exception as e:
return f"ERROR listing '{path}': {e}"
def search_files(path: str, pattern: str) -> str:
"""
Search for files matching a glob pattern within path.
pattern examples: '*.py', '**/*.toml', 'src/**/*.rs'
Search for files matching a glob pattern within path.
pattern examples: '*.py', '**/*.toml', 'src/**/*.rs'
"""
p, err = _resolve_and_check(path)
if err or p is None:
@@ -271,13 +228,79 @@ def search_files(path: str, pattern: str) -> str:
except Exception as e:
return f"ERROR searching '{path}': {e}"
def list_directory(path: str) -> str:
"""List entries in a directory. Returns a compact text table."""
p, err = _resolve_and_check(path)
if err or p is None:
return err
if not p.exists(): return f"ERROR: path not found: {path}"
if not p.is_dir(): return f"ERROR: not a directory: {path}"
try:
entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
lines = [f"Directory: {p}", ""]
count = 0
for entry in entries:
# Blacklist check
name = entry.name.lower()
if name == "history.toml" or name.endswith("_history.toml"):
continue
kind = "file" if entry.is_file() else "dir "
size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else ""
lines.append(f" [{kind}] {entry.name:<40} {size}")
count += 1
lines.append(f" ({count} entries)")
return "\n".join(lines)
except Exception as e:
return f"ERROR listing '{path}': {e}"
def read_file(path: str) -> str:
"""Return the UTF-8 content of a file, or an error string."""
p, err = _resolve_and_check(path)
if err or p is None:
return err
if not p.exists(): return f"ERROR: file not found: {path}"
if not p.is_file(): return f"ERROR: not a file: {path}"
try:
return p.read_text(encoding="utf-8")
except Exception as e:
return f"ERROR reading '{path}': {e}"
def edit_file(path: str, old_string: str, new_string: str, replace_all: bool = False) -> str:
"""
Replace exact string match in a file. Preserves indentation and line endings.
Drop-in replacement for native edit tool that destroys 1-space indentation.
"""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
if not old_string:
return "ERROR: old_string cannot be empty"
try:
content = p.read_text(encoding="utf-8")
if old_string not in content:
return f"ERROR: old_string not found in '{path}'"
count = content.count(old_string)
if count > 1 and not replace_all:
return f"ERROR: Found {count} matches for old_string in '{path}'. Use replace_all=true or provide more context to make it unique."
if replace_all:
new_content = content.replace(old_string, new_string)
p.write_text(new_content, encoding="utf-8")
return f"Successfully replaced {count} occurrences in '{path}'"
else:
new_content = content.replace(old_string, new_string, 1)
p.write_text(new_content, encoding="utf-8")
return f"Successfully replaced 1 occurrence in '{path}'"
except Exception as e:
return f"ERROR editing '{path}': {e}"
def get_file_summary(path: str) -> str:
"""
Return the heuristic summary for a file (same as the initial context block).
For .py files: imports, classes, methods, functions, constants.
For .toml: table keys. For .md: headings. Others: line count + preview.
Return the heuristic summary for a file (same as the initial context block).
For .py files: imports, classes, methods, functions, constants.
For .toml: table keys. For .md: headings. Others: line count + preview.
"""
p, err = _resolve_and_check(path)
if err or p is None:
@@ -292,220 +315,6 @@ def get_file_summary(path: str) -> str:
except Exception as e:
return f"ERROR summarising '{path}': {e}"
def py_get_skeleton(path: str) -> str:
"""
Returns a skeleton of a Python file (preserving docstrings, stripping function bodies).
"""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file() or p.suffix != ".py":
return f"ERROR: not a python file: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("python")
return parser.get_skeleton(code)
except Exception as e:
return f"ERROR generating skeleton for '{path}': {e}"
def ts_c_get_skeleton(path: str) -> str:
"""
Returns a skeleton of a C file.
[C: tests/test_ts_c_tools.py:test_ts_c_get_skeleton]
"""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("c")
return parser.get_skeleton(code, path=str(p))
except Exception as e:
return f"ERROR generating skeleton for '{path}': {e}"
def ts_cpp_get_skeleton(path: str) -> str:
"""
Returns a skeleton of a C++ file.
[C: tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_get_skeleton]
"""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("cpp")
return parser.get_skeleton(code, path=str(p))
except Exception as e:
return f"ERROR generating skeleton for '{path}': {e}"
def py_get_code_outline(path: str) -> str:
"""
Returns a hierarchical outline of a code file (classes, functions, methods with line ranges).
"""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
try:
code = p.read_text(encoding="utf-8")
return outline_tool.get_outline(p, code)
except Exception as e:
return f"ERROR generating outline for '{path}': {e}"
def ts_c_get_code_outline(path: str) -> str:
"""
Returns a hierarchical outline of a C file.
[C: tests/test_ts_c_tools.py:test_ts_c_get_code_outline]
"""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("c")
return parser.get_code_outline(code, path=str(p))
except Exception as e:
return f"ERROR generating outline for '{path}': {e}"
def ts_cpp_get_code_outline(path: str) -> str:
"""
Returns a hierarchical outline of a C++ file.
[C: tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_get_code_outline]
"""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("cpp")
return parser.get_code_outline(code, path=str(p))
except Exception as e:
return f"ERROR generating outline for '{path}': {e}"
def ts_c_get_definition(path: str, name: str) -> str:
"""Returns the source code for a specific definition in a C file."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("c")
return parser.get_definition(code, name, path=str(p))
except Exception as e:
return f"ERROR retrieving definition '{name}' from '{path}': {e}"
def ts_cpp_get_definition(path: str, name: str) -> str:
"""
Returns the source code for a specific definition in a C++ file.
[C: tests/test_ast_masking_core.py:test_ast_masking_gencpp_samples, tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition, tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition_gencpp]
"""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("cpp")
return parser.get_definition(code, name, path=str(p))
except Exception as e:
return f"ERROR retrieving definition '{name}' from '{path}': {e}"
def ts_c_get_signature(path: str, name: str) -> str:
"""Returns the signature part of a function in a C file."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("c")
return parser.get_signature(code, name, path=str(p))
except Exception as e:
return f"ERROR retrieving signature '{name}' from '{path}': {e}"
def ts_cpp_get_signature(path: str, name: str) -> str:
"""Returns the signature part of a function or method in a C++ file."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("cpp")
return parser.get_signature(code, name, path=str(p))
except Exception as e:
return f"ERROR retrieving signature '{name}' from '{path}': {e}"
def ts_c_update_definition(path: str, name: str, new_content: str) -> str:
"""Surgically replace the definition of a function in a C file."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("c")
updated_code = parser.update_definition(code, name, new_content, path=str(p))
if updated_code.startswith("ERROR:"):
return updated_code
p.write_text(updated_code, encoding="utf-8")
return f"Successfully updated definition '{name}' in {path}"
except Exception as e:
return f"ERROR updating definition '{name}' in '{path}': {e}"
def ts_cpp_update_definition(path: str, name: str, new_content: str) -> str:
"""
Surgically replace the definition of a class or function in a C++ file.
[C: tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition, tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition_gencpp]
"""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("cpp")
updated_code = parser.update_definition(code, name, new_content, path=str(p))
if updated_code.startswith("ERROR:"):
return updated_code
p.write_text(updated_code, encoding="utf-8")
return f"Successfully updated definition '{name}' in {path}"
except Exception as e:
return f"ERROR updating definition '{name}' in '{path}': {e}"
def get_file_slice(path: str, start_line: int, end_line: int) -> str:
"""Return a specific line range from a file."""
p, err = _resolve_and_check(path)
@@ -543,39 +352,187 @@ def set_file_slice(path: str, start_line: int, end_line: int, new_content: str)
except Exception as e:
return f"ERROR updating slice in '{path}': {e}"
def edit_file(path: str, old_string: str, new_string: str, replace_all: bool = False) -> str:
def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str:
"""
Replace exact string match in a file. Preserves indentation and line endings.
Drop-in replacement for native edit tool that destroys 1-space indentation.
Returns the git diff for a file or directory.
base_rev: The base revision (default: HEAD)
head_rev: The head revision (optional)
"""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
if not old_string:
return "ERROR: old_string cannot be empty"
cmd = ["git", "diff", base_rev]
if head_rev:
cmd.append(head_rev)
cmd.extend(["--", str(p)])
try:
content = p.read_text(encoding="utf-8")
if old_string not in content:
return f"ERROR: old_string not found in '{path}'"
count = content.count(old_string)
if count > 1 and not replace_all:
return f"ERROR: Found {count} matches for old_string in '{path}'. Use replace_all=true or provide more context to make it unique."
if replace_all:
new_content = content.replace(old_string, new_string)
p.write_text(new_content, encoding="utf-8")
return f"Successfully replaced {count} occurrences in '{path}'"
else:
new_content = content.replace(old_string, new_string, 1)
p.write_text(new_content, encoding="utf-8")
return f"Successfully replaced 1 occurrence in '{path}'"
result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding="utf-8")
return result.stdout if result.stdout else "(no changes)"
except subprocess.CalledProcessError as e:
return f"ERROR running git diff: {e.stderr}"
except Exception as e:
return f"ERROR editing '{path}': {e}"
return f"ERROR: {e}"
#region: C
def ts_c_get_code_outline(path: str) -> str:
"""
Returns a hierarchical outline of a C file.
[C: tests/test_ts_c_tools.py:test_ts_c_get_code_outline]
"""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("c")
return parser.get_code_outline(code, path=str(p))
except Exception as e:
return f"ERROR generating outline for '{path}': {e}"
def ts_c_get_definition(path: str, name: str) -> str:
"""Returns the source code for a specific definition in a C file."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("c")
return parser.get_definition(code, name, path=str(p))
except Exception as e:
return f"ERROR retrieving definition '{name}' from '{path}': {e}"
def ts_c_get_signature(path: str, name: str) -> str:
"""Returns the signature part of a function in a C file."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("c")
return parser.get_signature(code, name, path=str(p))
except Exception as e:
return f"ERROR retrieving signature '{name}' from '{path}': {e}"
def ts_c_update_definition(path: str, name: str, new_content: str) -> str:
"""Surgically replace the definition of a function in a C file."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("c")
updated_code = parser.update_definition(code, name, new_content, path=str(p))
if updated_code.startswith("ERROR:"):
return updated_code
p.write_text(updated_code, encoding="utf-8")
return f"Successfully updated definition '{name}' in {path}"
except Exception as e:
return f"ERROR updating definition '{name}' in '{path}': {e}"
#endregion: C
#region: C++
def ts_cpp_get_skeleton(path: str) -> str:
"""
Returns a skeleton of a C++ file.
[C: tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_get_skeleton]
"""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("cpp")
return parser.get_skeleton(code, path=str(p))
except Exception as e:
return f"ERROR generating skeleton for '{path}': {e}"
def ts_cpp_get_code_outline(path: str) -> str:
"""
Returns a hierarchical outline of a C++ file.
[C: tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_get_code_outline]
"""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("cpp")
return parser.get_code_outline(code, path=str(p))
except Exception as e:
return f"ERROR generating outline for '{path}': {e}"
def ts_cpp_get_definition(path: str, name: str) -> str:
"""
Returns the source code for a specific definition in a C++ file.
[C: tests/test_ast_masking_core.py:test_ast_masking_gencpp_samples, tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition, tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition_gencpp]
"""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("cpp")
return parser.get_definition(code, name, path=str(p))
except Exception as e:
return f"ERROR retrieving definition '{name}' from '{path}': {e}"
def ts_cpp_get_signature(path: str, name: str) -> str:
"""Returns the signature part of a function or method in a C++ file."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("cpp")
return parser.get_signature(code, name, path=str(p))
except Exception as e:
return f"ERROR retrieving signature '{name}' from '{path}': {e}"
def ts_cpp_update_definition(path: str, name: str, new_content: str) -> str:
"""
Surgically replace the definition of a class or function in a C++ file.
[C: tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition, tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition_gencpp]
"""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.exists(): return f"ERROR: file not found: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("cpp")
updated_code = parser.update_definition(code, name, new_content, path=str(p))
if updated_code.startswith("ERROR:"):
return updated_code
p.write_text(updated_code, encoding="utf-8")
return f"Successfully updated definition '{name}' in {path}"
except Exception as e:
return f"ERROR updating definition '{name}' in '{path}': {e}"
#endregion: C++
#region: Python AST
def _get_symbol_node(tree: ast.AST, name: str) -> Optional[ast.AST]:
"""Helper to find an AST node by name (Class, Function, or Variable). Supports dot notation."""
parts = name.split(".")
@@ -602,12 +559,48 @@ def _get_symbol_node(tree: ast.AST, name: str) -> Optional[ast.AST]:
current = found
return current
def py_get_skeleton(path: str) -> str:
"""
Returns a skeleton of a Python file (preserving docstrings, stripping function bodies).
"""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file() or p.suffix != ".py":
return f"ERROR: not a python file: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("python")
return parser.get_skeleton(code)
except Exception as e:
return f"ERROR generating skeleton for '{path}': {e}"
def py_get_code_outline(path: str) -> str:
"""
Returns a hierarchical outline of a code file (classes, functions, methods with line ranges).
"""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
try:
code = p.read_text(encoding="utf-8")
return outline_tool.get_outline(p, code)
except Exception as e:
return f"ERROR generating outline for '{path}': {e}"
def py_get_symbol_info(path: str, name: str) -> tuple[str, int] | str:
"""
Returns (source_code, line_number) for a specific class, function, or method definition.
If not found, returns an error string.
Returns (source_code, line_number) for a specific class, function, or method definition.
If not found, returns an error string.
"""
p, err = _resolve_and_check(path)
if err:
@@ -632,11 +625,9 @@ def py_get_symbol_info(path: str, name: str) -> tuple[str, int] | str:
def py_get_definition(path: str, name: str) -> str:
"""
Returns the source code for a specific class, function, or method definition.
path: Path to the code file.
name: Name of the definition to retrieve (e.g., 'MyClass', 'my_function', 'MyClass.my_method').
Returns the source code for a specific class, function, or method definition.
path: Path to the code file.
name: Name of the definition to retrieve (e.g., 'MyClass', 'my_function', 'MyClass.my_method').
"""
p, err = _resolve_and_check(path)
if err:
@@ -806,30 +797,6 @@ def py_set_var_declaration(path: str, name: str, new_declaration: str) -> str:
except Exception as e:
return f"ERROR updating variable '{name}' in '{path}': {e}"
def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str:
"""
Returns the git diff for a file or directory.
base_rev: The base revision (default: HEAD)
head_rev: The head revision (optional)
"""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
cmd = ["git", "diff", base_rev]
if head_rev:
cmd.append(head_rev)
cmd.extend(["--", str(p)])
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding="utf-8")
return result.stdout if result.stdout else "(no changes)"
except subprocess.CalledProcessError as e:
return f"ERROR running git diff: {e.stderr}"
except Exception as e:
return f"ERROR: {e}"
def py_find_usages(path: str, name: str) -> str:
"""Finds exact string matches of a symbol in a given file or directory."""
p, err = _resolve_and_check(path)
@@ -964,38 +931,6 @@ def py_get_docstring(path: str, name: str) -> str:
except Exception as e:
return f"ERROR getting docstring for '{name}': {e}"
def get_tree(path: str, max_depth: int = 2) -> str:
"""Returns a directory structure up to a max depth."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.is_dir(): return f"ERROR: not a directory: {path}"
try:
m_depth = max_depth
def _build_tree(dir_path: Path, current_depth: int, prefix: str = "") -> list[str]:
if current_depth > m_depth: return []
lines = []
try:
entries = sorted(dir_path.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
except PermissionError:
return []
# Filter
entries = [e for e in entries if not e.name.startswith('.') and e.name not in ('__pycache__', 'venv', 'env') and e.name != "history.toml" and not e.name.endswith("_history.toml")]
for i, entry in enumerate(entries):
is_last = (i == len(entries) - 1)
connector = "└── " if is_last else "├── "
lines.append(f"{prefix}{connector}{entry.name}")
if entry.is_dir():
extension = " " if is_last else ""
lines.extend(_build_tree(entry, current_depth + 1, prefix + extension))
return lines
tree_lines = [f"{p.name}/"] + _build_tree(p, 1)
return "\n".join(tree_lines)
except Exception as e:
return f"ERROR generating tree for '{path}': {e}"
# ------------------------------------------------------------------ web tools
def derive_code_path(target: str, max_depth: int = 5) -> str:
"""Recursively traces the execution path of a specific function or method."""
from src.file_cache import ASTParser
@@ -1056,6 +991,42 @@ def derive_code_path(target: str, max_depth: int = 5) -> str:
trace(symbol_name, found_path, found_code, 0, "")
return "\n".join(output)
#endregion Python AST
def get_tree(path: str, max_depth: int = 2) -> str:
"""Returns a directory structure up to a max depth."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.is_dir(): return f"ERROR: not a directory: {path}"
try:
m_depth = max_depth
def _build_tree(dir_path: Path, current_depth: int, prefix: str = "") -> list[str]:
if current_depth > m_depth: return []
lines = []
try:
entries = sorted(dir_path.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
except PermissionError:
return []
# Filter
entries = [e for e in entries if not e.name.startswith('.') and e.name not in ('__pycache__', 'venv', 'env') and e.name != "history.toml" and not e.name.endswith("_history.toml")]
for i, entry in enumerate(entries):
is_last = (i == len(entries) - 1)
connector = "└── " if is_last else "├── "
lines.append(f"{prefix}{connector}{entry.name}")
if entry.is_dir():
extension = " " if is_last else ""
lines.extend(_build_tree(entry, current_depth + 1, prefix + extension))
return lines
tree_lines = [f"{p.name}/"] + _build_tree(p, 1)
return "\n".join(tree_lines)
except Exception as e:
return f"ERROR generating tree for '{path}': {e}"
# ------------------------------------------------------------------ web tools
#region: Web
class _DDGParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
@@ -1161,12 +1132,13 @@ def fetch_url(url: str) -> str:
return full_text
except Exception as e:
return f"ERROR fetching URL '{url}': {e}"
#endregion: Web
def get_ui_performance() -> str:
"""
Returns current UI performance metrics (FPS, Frame Time, CPU, Input Lag).
[C: tests/test_mcp_perf_tool.py:test_mcp_perf_tool_retrieval]
Returns current UI performance metrics (FPS, Frame Time, CPU, Input Lag).
[C: tests/test_mcp_perf_tool.py:test_mcp_perf_tool_retrieval]
"""
if perf_monitor_callback is None:
return "INFO: UI Performance monitor is not available (headless/CLI mode). This tool is only functional when the Manual Slop GUI is running."
@@ -1276,7 +1248,6 @@ class ExternalMCPManager:
async def add_server(self, config: models.MCPServerConfig):
"""
Add and start a new MCP server from a configuration object.
[C: tests/test_external_mcp.py:test_external_mcp_real_process, tests/test_external_mcp.py:test_get_tool_schemas_includes_external]
"""
@@ -1289,7 +1260,6 @@ class ExternalMCPManager:
async def stop_all(self):
"""
Stop all managed MCP servers and clear the registry.
[C: tests/test_external_mcp.py:test_external_mcp_real_process, tests/test_external_mcp.py:test_get_tool_schemas_includes_external, tests/test_external_mcp_e2e.py:test_external_mcp_e2e_refresh_and_call]
"""
@@ -1299,7 +1269,6 @@ class ExternalMCPManager:
def get_all_tools(self) -> dict:
"""
Retrieve a dictionary of all tools available across all managed servers.
[C: tests/test_external_mcp.py:test_external_mcp_real_process, tests/test_external_mcp_e2e.py:test_external_mcp_e2e_refresh_and_call]
"""
@@ -1315,7 +1284,6 @@ class ExternalMCPManager:
async def async_dispatch(self, tool_name: str, tool_input: dict) -> str:
"""
Dispatch a tool call to the appropriate external MCP server asynchronously.
[C: src/rag_engine.py:RAGEngine._async_search_mcp, tests/test_external_mcp.py:test_external_mcp_real_process]
"""
@@ -1328,7 +1296,6 @@ _external_mcp_manager = ExternalMCPManager()
def get_external_mcp_manager() -> ExternalMCPManager:
"""
Retrieve the global ExternalMCPManager instance.
[C: tests/test_external_mcp.py:test_get_tool_schemas_includes_external, tests/test_external_mcp_e2e.py:test_external_mcp_e2e_refresh_and_call]
"""
@@ -1508,8 +1475,6 @@ async def async_dispatch(tool_name: str, tool_input: dict[str, Any]) -> str:
return f'ERROR: unknown MCP tool {tool_name}'
def get_tool_schemas() -> list[dict[str, Any]]:
"""
[C: tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_mcp_client_dispatch_completeness, tests/test_external_mcp.py:test_get_tool_schemas_includes_external, tests/test_mcp_client_beads.py:test_bd_mcp_tools]
@@ -2303,4 +2268,4 @@ MCP_TOOL_SPECS: list[dict[str, Any]] = [
}
]
TOOL_NAMES: set[str] = {t['name'] for t in MCP_TOOL_SPECS}
TOOL_NAMES: set[str] = {t['name'] for t in MCP_TOOL_SPECS}
+133
View File
@@ -0,0 +1,133 @@
"""Regression tests for the Ctrl+C hang fix in AppController.
The bug: when a worker of the AppController's I/O pool is mid-task in
user code (e.g. a long-running Gemini/Anthropic HTTP request) and the
user presses Ctrl+C in the terminal, the Python interpreter hangs
forever during finalization. The hang chain is:
1. SIGINT is delivered to the main thread
2. Python's default handler would raise KeyboardInterrupt
3. The exception propagates out of main()
4. Interpreter finalization begins
5. ThreadPoolExecutor.__del__ runs and calls shutdown(wait=True)
6. shutdown(wait=True) joins each worker thread
7. The blocked worker never returns -> hang
atexit handlers do NOT fire in this scenario (verified empirically —
see src/io_pool.py module docstring), so a pool-creation atexit
handler cannot fix it. The fix is a SIGINT handler installed by
AppController.__init__ that drains the pool non-blockingly and calls
os._exit(0), bypassing the broken finalization chain.
These tests verify both the install (unit) and the full signal flow
(subprocess) paths.
"""
import signal
import subprocess
import sys
import textwrap
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any
import pytest
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
from src.app_controller import _install_sigint_exit_handler # noqa: E402
@pytest.fixture
def restore_sigint():
"""Snapshot and restore SIGINT handler around each test."""
original = signal.getsignal(signal.SIGINT)
yield
signal.signal(signal.SIGINT, original)
class _FakeController:
"""Minimal stand-in for AppController: just exposes _io_pool."""
def __init__(self) -> None:
self._io_pool = ThreadPoolExecutor(
max_workers=2, thread_name_prefix="fake-ctrl"
)
def test_install_sigint_handler_installs_callable(restore_sigint: Any) -> None:
"""Unit: helper installs a callable SIGINT handler on the main thread.
The conftest warmup AppController already installed a SIGINT handler at
pytest import time, so we cannot assert against SIG_DFL. We verify the
helper replaces whatever was there with a fresh callable from
``_install_sigint_exit_handler`` (distinct identity check).
"""
ctrl = _FakeController()
try:
before = signal.getsignal(signal.SIGINT)
_install_sigint_exit_handler(ctrl)
after = signal.getsignal(signal.SIGINT)
assert callable(after), f"expected callable handler, got {after!r}"
assert after is not before, "helper did not replace the existing SIGINT handler"
finally:
ctrl._io_pool.shutdown(wait=False)
def test_sigint_subprocess_drains_blocked_pool() -> None:
"""Subprocess: handler behavior — drain + os._exit(0) exits within 2s.
Spawns a Python subprocess that mirrors the production pattern: a
ThreadPoolExecutor with a blocked worker, a SIGINT handler that calls
shutdown(wait=False) + os._exit(0). Invokes the handler directly
(bypassing OS signal delivery — which is flaky for CTRL_C_EVENT to a
python subprocess started with ``-c`` on Windows). Asserts the
subprocess exits within 2 seconds. If the handler were missing the
subprocess would hang until the test runner kills it.
The OS signal-delivery path is verified by the unit test
(``test_install_sigint_handler_installs_callable``) and by manual
end-to-end testing (Ctrl+C in the terminal works because Python's
default SIGINT delivery is the same on all platforms).
"""
script = textwrap.dedent('''
import signal
import threading
import os
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix="subproc-ctrl")
blocker = threading.Event()
pool.submit(blocker.wait)
def _on_sigint(signum, frame):
try: pool.shutdown(wait=False)
except Exception: pass
os._exit(0)
signal.signal(signal.SIGINT, _on_sigint)
print("ready", flush=True)
handler = signal.getsignal(signal.SIGINT)
handler(signal.SIGINT, None)
''')
proc = subprocess.Popen(
[sys.executable, "-c", script],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
t0 = time.perf_counter()
try:
outs, errs = proc.communicate(timeout=2.0)
elapsed = time.perf_counter() - t0
except subprocess.TimeoutExpired:
proc.kill()
proc.communicate(timeout=5.0)
pytest.fail("subprocess did not exit within 2s of handler invocation — drain + os._exit(0) is broken")
assert b"ready" in outs, f"subprocess did not reach handler install; stderr={errs!r}"
assert proc.returncode == 0, (
f"subprocess exited with code {proc.returncode} (expected 0 from os._exit(0)); "
f"stderr={errs.decode(errors='replace')!r}"
)
assert elapsed < 2.0, f"subprocess took {elapsed:.2f}s to exit (expected <2.0s)"
+10 -1
View File
@@ -1,4 +1,13 @@
"""Tests for src/io_pool.py (the shared 4-thread job pool on AppController)."""
"""Tests for src/io_pool.py (the shared 4-thread job pool on AppController).
Historical note: an earlier revision of this file added two regression
tests asserting that ``make_io_pool`` registered an atexit shutdown
handler. Those tests were reverted together with the production atexit
fix they guarded, because the atexit approach does not solve the actual
Ctrl+C hang (see ``src/io_pool.py`` module docstring). The production
fix is a SIGINT handler in ``AppController.__init__``; the regression
test for that lives in ``tests/test_app_controller_sigint.py``.
"""
import threading
import time
@@ -0,0 +1,101 @@
"""
Regression test for: AttributeError: module 'tkinter' has no attribute 'filedialog'
On some Python installs (e.g., embedded distributions, or installs where
the Tcl/Tk runtime is missing), the `tkinter` package imports cleanly but
the `tkinter.filedialog` sub-module fails to load. The original `_LazyModule`
in src/gui_2.py used `getattr(tkinter, 'filedialog')` which raises a
confusing AttributeError at the call site. With 14 call sites in
render_projects_panel, render_workspace_settings_hub, render_fonts_panel,
and render_gemini_cli_settings, this AttributeError spammed the GUI's
stderr at 60fps whenever the Project Settings window was open.
The fix must make `_LazyModule` fall back to a stub that mimics
`tkinter.filedialog`'s public API (askopenfilename, askdirectory,
asksaveasfilename, askopenfilenames) so the GUI does not crash.
This test uses a deliberately-missing sub-module to exercise the fallback
path, making it deterministic across Python installs.
"""
import pytest
import importlib
from src.gui_2 import _LazyModule
def test_lazymodule_falls_back_to_stub_on_attribute_error() -> None:
"""
Resolution must NOT raise AttributeError when the sub-module is
missing. Instead, _resolve() must return a stub that exposes the
public filedialog API. Before the fix, this test fails with
AttributeError: module 'os' has no attribute 'this_submodule_does_not_exist'.
"""
bad = _LazyModule("os", "this_submodule_does_not_exist")
resolved = bad._resolve()
assert resolved is not None
assert hasattr(resolved, "askopenfilename")
assert hasattr(resolved, "askdirectory")
assert hasattr(resolved, "asksaveasfilename")
assert hasattr(resolved, "askopenfilenames")
def test_lazymodule_stub_returns_empty_strings() -> None:
"""
The stub functions must return safe empty values:
- askopenfilename, askdirectory, asksaveasfilename: empty string ""
- askopenfilenames: empty tuple ()
This ensures downstream code that does `if p and p not in app.x:`
or `if paths:` treats the missing-dialog as a no-op.
"""
bad = _LazyModule("os", "this_submodule_does_not_exist")
resolved = bad._resolve()
assert resolved.askopenfilename() == ""
assert resolved.askdirectory() == ""
assert resolved.asksaveasfilename() == ""
assert resolved.askopenfilenames() == ()
def test_lazymodule_stub_ignores_kwargs() -> None:
"""
The stub must accept the same kwargs the real tkinter.filedialog
accepts (title, filetypes, defaultextension, initialdir) and return
the empty sentinel. This prevents TypeError if a call site passes
kwargs that the stub does not know about.
"""
bad = _LazyModule("os", "this_submodule_does_not_exist")
resolved = bad._resolve()
assert resolved.askopenfilename(title="x", filetypes=[("All", "*.*")]) == ""
assert resolved.askdirectory(title="y", initialdir="/") == ""
assert resolved.asksaveasfilename(title="z", defaultextension=".toml", filetypes=[("TOML", "*.toml")]) == ""
assert resolved.askopenfilenames(filetypes=[("Image", "*.png")]) == ()
def test_lazymodule_real_filedialog_resolves_when_tkinter_works() -> None:
"""
On a working tkinter install (with Tcl/Tk runtime), the
`_LazyModule("tkinter", "filedialog")` instance must resolve to the
real tkinter.filedialog module. This is the smoke test: if tkinter
is healthy, the lazy import works as before.
"""
import tkinter as tk_root
try:
import tkinter.filedialog as real_filedialog
except (ImportError, AttributeError, tk_root.TclError):
pytest.skip("tkinter.filedialog not available in this Python install")
lazy = _LazyModule("tkinter", "filedialog")
resolved = lazy._resolve()
assert resolved is real_filedialog
def test_lazymodule_real_filedialog_does_not_raise_attribute_error() -> None:
"""
On a working tkinter install, calling .askopenfilename() through the
lazy module must not raise AttributeError. (Tests the call path
used by 14 call sites in render_projects_panel etc.)
"""
lazy = _LazyModule("tkinter", "filedialog")
resolved = lazy._resolve()
assert hasattr(resolved, "askopenfilename")
assert hasattr(resolved, "askdirectory")
assert hasattr(resolved, "asksaveasfilename")
assert hasattr(resolved, "askopenfilenames")
@@ -0,0 +1,65 @@
"""
Live-GUI smoke test for the tkinter.filedialog AttributeError regression.
On Python installs where the Tcl/Tk runtime is missing, the lazy
`tkinter.filedialog` import raises AttributeError, which previously
crashed the Project Settings window and the Add Project button.
The unit-level test in `test_lazymodule_filedialog_fallback.py`
deterministically exercises the fallback path; this live test verifies
the same fix in the actual running app: opening the Project Settings
window via the Hook API must not produce an AttributeError, and the
app must remain responsive (proving no crash on attribute resolution).
"""
import time
from pathlib import Path
import pytest
from src.api_hook_client import ApiHookClient
def test_live_gui_project_settings_opens_without_filedialog_crash(live_gui) -> None:
"""
Regression: the Project Settings window's render call chain ends
in `render_projects_panel` → `filedialog.askopenfilename(...)` on
the "Add Project" click frame. Before the fix, every frame the
Project Settings window was open on a broken tkinter install would
log `AttributeError: module 'tkinter' has no attribute 'filedialog'`.
The fix in `_LazyModule._resolve()` falls back to a `_FiledialogStub`
that returns empty strings.
This test:
1. Opens the Project Settings window via the Hook API
2. Waits several render frames
3. Verifies the window opened (state is reflected back via get_value)
4. Verifies the app is still responsive (status endpoint returns 200)
5. Verifies no AttributeError was logged (the bug would print to
the GUI's stderr, which the live_gui fixture captures to a log)
"""
process, gui_script = live_gui
client = ApiHookClient()
log_path = Path(f"logs/{Path(gui_script).name.replace('.', '_')}_test.log")
log_offset_before = log_path.stat().st_size if log_path.exists() else 0
client.set_value('show_windows["Project Settings"]', True)
time.sleep(2.0)
opened = client.get_value('show_windows["Project Settings"]')
assert opened is True, f"Project Settings window did not open: {opened}"
status = client.get_status()
assert status is not None, "App status endpoint returned None — app is not responsive"
assert status.get("status") == "ok", f"App status not ok: {status}"
time.sleep(1.0)
if log_path.exists():
with log_path.open("r", encoding="utf-8", errors="ignore") as f:
f.seek(log_offset_before)
new_log = f.read()
assert "AttributeError: module 'tkinter' has no attribute 'filedialog'" not in new_log, (
"GUI logged 'AttributeError: module tkinter has no attribute filedialog' "
"after opening Project Settings. The _LazyModule fallback to _FiledialogStub "
"is not working in the live app."
)
assert "AttributeError: module 'tkinter' has no attribute 'filedialog'" not in new_log
if "AttributeError" in new_log:
pytest.fail(f"App logged unexpected AttributeError: {new_log[max(0, new_log.find('AttributeError')-200):new_log.find('AttributeError')+200]}")