Private
Public Access
0
0

feat(style): Fix 1-space indentation in 27 files

Files corrected:
- src/fuzzy_anchor.py (18 violations)
- src/patch_modal.py (14 violations)
- scripts/extract_symbols.py (4 violations)
- scripts/tasks/download_fonts.py (8 violations)
- tests/: 23 files with indentation issues

All files verified with py_compile. Remaining 4 files
(test_api_events.py, test_discussion_takes_gui.py,
test_gui_updates.py, test_headless_service.py) have complex
multi-line with statements that require manual correction.
This commit is contained in:
2026-05-16 03:00:20 -04:00
parent 9d40fec46e
commit 31a8949d64
29 changed files with 1580 additions and 1611 deletions
@@ -3,49 +3,45 @@
## Phase 1: Audit and Classification
Focus: Identify all files requiring indentation correction
- [ ] Task 1.1: Create indentation audit script to scan all Python files
- [x] Task 1.1: Create AST-based indentation audit script
- File: `scripts/audit_indentation.py`
- Detect: Lines with leading whitespace that is not a multiple of 1 space
- Output: List of files with line numbers and current indentation level
- [ ] Task 1.2: Run audit against src/ directory (51 files)
- File: src/
- Categorize: Files with 0 issues, 1-10 issues, 10+ issues
- [ ] Task 1.3: Run audit against tests/ directory
- File: tests/
- [ ] Task 1.4: Run audit against scripts/ directory
- File: scripts/
- [ ] Task 1.5: Run audit against conductor/ directory
- File: conductor/
- Method: Use Python AST to track logical nesting depth
- Output: Files with actual violations (not docstring false positives)
- [x] Task 1.2: Run audit across all directories
- Result: 32 files with violations, 189 total violations
## Phase 2: Correct Indentation - src/ Files
Focus: Fix identified files in src/ (in order of severity)
Focus: Fix identified files in src/ (2 files)
- [ ] Task 2.1: Fix src/ files with 1-10 indentation issues
- [ ] Task 2.2: Fix src/ files with 10+ indentation issues
- [ ] Task 2.3: Verify syntax after each file fix
- [ ] Task 2.1: Fix src/fuzzy_anchor.py (18 violations)
- [ ] Task 2.2: Fix src/patch_modal.py (14 violations)
- [ ] Task 2.3: Verify syntax after each fix
- [ ] Task 2.4: Commit each file individually
## Phase 3: Correct Indentation - tests/ Files
Focus: Fix identified files in tests/
## Phase 3: Correct Indentation - scripts/ Files
Focus: Fix identified files in scripts/ (2 files)
- [ ] Task 3.1: Fix tests/ files with indentation issues
- [ ] Task 3.2: Verify syntax after each file fix
- [ ] Task 3.3: Commit each file individually
- [ ] Task 3.1: Fix scripts/extract_symbols.py (4 violations)
- [ ] Task 3.2: Fix scripts/tasks/download_fonts.py (8 violations)
- [ ] Task 3.3: Verify syntax after each fix
- [ ] Task 3.4: Commit each file individually
## Phase 4: Correct Indentation - scripts/ Files
Focus: Fix identified files in scripts/
## Phase 4: Correct Indentation - tests/ Files
Focus: Fix identified files in tests/ (28 files)
- [ ] Task 4.1: Fix scripts/ files with indentation issues
- [ ] Task 4.2: Verify syntax after each file fix
- [ ] Task 4.3: Commit each file individually
- [ ] Task 4.1: Fix tests/test_arch_boundary_phase1.py (9 violations)
- [ ] Task 4.2: Fix tests/test_arch_boundary_phase2.py (16 violations)
- [ ] Task 4.3: Fix tests/test_arch_boundary_phase3.py (7 violations)
- [ ] Task 4.4: Fix tests/test_external_editor.py (18 violations)
- [ ] Task 4.5: Fix tests/test_headless_service.py (19 violations)
- [ ] Task 4.6: Fix remaining tests/ files (22 files with fewer violations)
- [ ] Task 4.7: Verify syntax after each fix
- [ ] Task 4.8: Commit each file individually
## Phase 5: Correct Indentation - conductor/ Files
Focus: Fix identified files in conductor/
## Phase 5: Final Verification
Focus: Ensure no regressions
- [ ] Task 5.1: Fix conductor/ Python files with indentation issues
- [ ] Task 5.2: Verify syntax after each file fix
- [ ] Task 5.3: Commit each file individually
- [ ] Task 5.1: Re-run audit to confirm 0 violations
- [ ] Task 5.2: Run pytest --collect-only to verify syntax
- [ ] Task 5.3: Create checkpoint commit
+11 -11
View File
@@ -4,17 +4,17 @@ from pathlib import Path
symbols = []
for root in ['src', 'simulation']:
for p in Path(root).rglob('*.py'):
try:
code = p.read_text(encoding='utf-8')
tree = ast.parse(code)
for node in tree.body:
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
if not node.name.startswith('_'):
symbols.append((node.name, str(p)))
except Exception:
continue
for p in Path(root).rglob('*.py'):
try:
code = p.read_text(encoding='utf-8')
tree = ast.parse(code)
for node in tree.body:
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
if not node.name.startswith('_'):
symbols.append((node.name, str(p)))
except Exception:
continue
print(f"TOTAL_SYMBOLS:{len(symbols)}")
for name, path in symbols:
print(f"SYMBOL:{name}|PATH:{path}")
print(f"SYMBOL:{name}|PATH:{path}")
+22 -22
View File
@@ -9,31 +9,31 @@ ssl._create_default_https_context = ssl._create_unverified_context
inter_url = "https://github.com/rsms/inter/releases/download/v4.0/Inter-4.0.zip"
print(f"Downloading Inter from {inter_url}")
try:
req = urllib.request.Request(inter_url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req) as response:
with zipfile.ZipFile(io.BytesIO(response.read())) as z:
for info in z.infolist():
targets = ["Inter-Regular.ttf", "Inter-Bold.ttf", "Inter-Italic.ttf", "Inter-BoldItalic.ttf"]
filename = os.path.basename(info.filename)
if filename in targets:
info.filename = filename
z.extract(info, "assets/fonts/")
print(f"Extracted {info.filename}")
req = urllib.request.Request(inter_url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req) as response:
with zipfile.ZipFile(io.BytesIO(response.read())) as z:
for info in z.infolist():
targets = ["Inter-Regular.ttf", "Inter-Bold.ttf", "Inter-Italic.ttf", "Inter-BoldItalic.ttf"]
filename = os.path.basename(info.filename)
if filename in targets:
info.filename = filename
z.extract(info, "assets/fonts/")
print(f"Extracted {info.filename}")
except Exception as e:
print(f"Failed to get Inter: {e}")
print(f"Failed to get Inter: {e}")
maple_url = "https://github.com/subframe7536/maple-font/releases/download/v6.4/MapleMono-ttf.zip"
print(f"Downloading Maple Mono from {maple_url}")
try:
req = urllib.request.Request(maple_url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req) as response:
with zipfile.ZipFile(io.BytesIO(response.read())) as z:
for info in z.infolist():
targets = ["MapleMono-Regular.ttf", "MapleMono-Bold.ttf", "MapleMono-Italic.ttf", "MapleMono-BoldItalic.ttf"]
filename = os.path.basename(info.filename)
if filename in targets:
info.filename = filename
z.extract(info, "assets/fonts/")
print(f"Extracted {info.filename}")
req = urllib.request.Request(maple_url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req) as response:
with zipfile.ZipFile(io.BytesIO(response.read())) as z:
for info in z.infolist():
targets = ["MapleMono-Regular.ttf", "MapleMono-Bold.ttf", "MapleMono-Italic.ttf", "MapleMono-BoldItalic.ttf"]
filename = os.path.basename(info.filename)
if filename in targets:
info.filename = filename
z.extract(info, "assets/fonts/")
print(f"Extracted {info.filename}")
except Exception as e:
print(f"Failed to get Maple Mono: {e}")
print(f"Failed to get Maple Mono: {e}")
+74 -74
View File
@@ -3,86 +3,86 @@ import re
from typing import Optional, Tuple
class FuzzyAnchor:
@staticmethod
def get_context(lines: list[str], index: int, count: int, direction: int) -> list[str]:
context = []
curr = index
while len(context) < count and 0 <= curr < len(lines):
line = lines[curr].strip()
if line:
context.append(line)
curr += direction
return context
@staticmethod
def get_context(lines: list[str], index: int, count: int, direction: int) -> list[str]:
context = []
curr = index
while len(context) < count and 0 <= curr < len(lines):
line = lines[curr].strip()
if line:
context.append(line)
curr += direction
return context
@classmethod
def create_slice(cls, text: str, start_line: int, end_line: int) -> dict:
"""
start_line and end_line are 1-based.
[C: src/gui_2.py:App._populate_auto_slices, src/gui_2.py:App._render_text_viewer_window, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_create_slice_basic, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations]
"""
lines = text.splitlines()
s_idx = max(0, start_line - 1)
e_idx = min(len(lines), end_line)
slice_lines = lines[s_idx:e_idx]
slice_text = "\n".join(slice_lines)
@classmethod
def create_slice(cls, text: str, start_line: int, end_line: int) -> dict:
"""
start_line and end_line are 1-based.
[C: src/gui_2.py:App._populate_auto_slices, src/gui_2.py:App._render_text_viewer_window, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_create_slice_basic, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations]
"""
lines = text.splitlines()
s_idx = max(0, start_line - 1)
e_idx = min(len(lines), end_line)
slice_lines = lines[s_idx:e_idx]
slice_text = "\n".join(slice_lines)
return {
"start_line": start_line,
"end_line": end_line,
"start_context": cls.get_context(lines, s_idx, 3, 1),
"end_context": cls.get_context(lines, e_idx - 1, 3, -1)[::-1], # Reverse back to normal order
"content_hash": hashlib.mdsafe(slice_text.encode()).hexdigest() if hasattr(hashlib, 'mdsafe') else hashlib.md5(slice_text.encode()).hexdigest()
}
return {
"start_line": start_line,
"end_line": end_line,
"start_context": cls.get_context(lines, s_idx, 3, 1),
"end_context": cls.get_context(lines, e_idx - 1, 3, -1)[::-1], # Reverse back to normal order
"content_hash": hashlib.mdsafe(slice_text.encode()).hexdigest() if hasattr(hashlib, 'mdsafe') else hashlib.md5(slice_text.encode()).hexdigest()
}
@classmethod
def resolve_slice(cls, text: str, slice_data: dict) -> Optional[Tuple[int, int]]:
"""
[C: tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed]
"""
lines = text.splitlines()
# 1. Try exact match
s_idx = slice_data["start_line"] - 1
e_idx = slice_data["end_line"]
if 0 <= s_idx < len(lines) and e_idx <= len(lines):
current_text = "\n".join(lines[s_idx:e_idx])
curr_hash = hashlib.md5(current_text.encode()).hexdigest()
if curr_hash == slice_data["content_hash"]:
return (slice_data["start_line"], slice_data["end_line"])
@classmethod
def resolve_slice(cls, text: str, slice_data: dict) -> Optional[Tuple[int, int]]:
"""
[C: tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed]
"""
lines = text.splitlines()
# 1. Try exact match
s_idx = slice_data["start_line"] - 1
e_idx = slice_data["end_line"]
if 0 <= s_idx < len(lines) and e_idx <= len(lines):
current_text = "\n".join(lines[s_idx:e_idx])
curr_hash = hashlib.md5(current_text.encode()).hexdigest()
if curr_hash == slice_data["content_hash"]:
return (slice_data["start_line"], slice_data["end_line"])
# 2. Fuzzy match
start_ctx = slice_data["start_context"]
end_ctx = slice_data["end_context"]
if not start_ctx or not end_ctx: return None
# 2. Fuzzy match
start_ctx = slice_data["start_context"]
end_ctx = slice_data["end_context"]
if not start_ctx or not end_ctx: return None
# Search for start_ctx
best_s = -1
for i in range(len(lines)):
match = True
for j, ctx_line in enumerate(start_ctx):
if i+j >= len(lines) or lines[i+j].strip() != ctx_line:
match = False
break
if match:
best_s = i
break
# Search for start_ctx
best_s = -1
for i in range(len(lines)):
match = True
for j, ctx_line in enumerate(start_ctx):
if i+j >= len(lines) or lines[i+j].strip() != ctx_line:
match = False
break
if match:
best_s = i
break
if best_s == -1: return None
if best_s == -1: return None
# Search for end_ctx after start_ctx
best_e = -1
for i in range(best_s, len(lines)):
match = True
for j, ctx_line in enumerate(end_ctx):
# end_ctx is the LAST 3 lines. So we match backwards from i.
idx = i - (len(end_ctx) - 1) + j
if idx < 0 or idx >= len(lines) or lines[idx].strip() != ctx_line:
match = False
break
if match:
best_e = i + 1
break
# Search for end_ctx after start_ctx
best_e = -1
for i in range(best_s, len(lines)):
match = True
for j, ctx_line in enumerate(end_ctx):
# end_ctx is the LAST 3 lines. So we match backwards from i.
idx = i - (len(end_ctx) - 1) + j
if idx < 0 or idx >= len(lines) or lines[idx].strip() != ctx_line:
match = False
break
if match:
best_e = i + 1
break
if best_e != -1:
return (best_s + 1, best_e)
if best_e != -1:
return (best_s + 1, best_e)
return None
return None
+84 -84
View File
@@ -3,104 +3,104 @@ from dataclasses import dataclass, field
@dataclass
class PendingPatch:
patch_text: str
file_paths: List[str]
generated_by: str
timestamp: float
patch_text: str
file_paths: List[str]
generated_by: str
timestamp: float
class PatchModalManager:
def __init__(self):
self._pending_patch: Optional[PendingPatch] = None
self._show_modal: bool = False
self._on_apply_callback: Optional[Callable[[str], bool]] = None
self._on_reject_callback: Optional[Callable[[], None]] = None
def __init__(self):
self._pending_patch: Optional[PendingPatch] = None
self._show_modal: bool = False
self._on_apply_callback: Optional[Callable[[str], bool]] = None
self._on_reject_callback: Optional[Callable[[], None]] = None
def request_patch_approval(self, patch_text: str, file_paths: List[str], generated_by: str = "Tier 4 QA") -> bool:
"""
[C: tests/test_patch_modal.py:test_close_modal, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset]
"""
from time import time
self._pending_patch = PendingPatch(
patch_text=patch_text,
file_paths=file_paths,
generated_by=generated_by,
timestamp=time()
)
self._show_modal = True
return True
def request_patch_approval(self, patch_text: str, file_paths: List[str], generated_by: str = "Tier 4 QA") -> bool:
"""
[C: tests/test_patch_modal.py:test_close_modal, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset]
"""
from time import time
self._pending_patch = PendingPatch(
patch_text=patch_text,
file_paths=file_paths,
generated_by=generated_by,
timestamp=time()
)
self._show_modal = True
return True
def get_pending_patch(self) -> Optional[PendingPatch]:
"""
[C: tests/test_patch_modal.py:test_patch_modal_manager_init, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset]
"""
return self._pending_patch
def get_pending_patch(self) -> Optional[PendingPatch]:
"""
[C: tests/test_patch_modal.py:test_patch_modal_manager_init, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset]
"""
return self._pending_patch
def is_modal_shown(self) -> bool:
"""
[C: tests/test_patch_modal.py:test_close_modal, tests/test_patch_modal.py:test_patch_modal_manager_init, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset]
"""
return self._show_modal
def is_modal_shown(self) -> bool:
"""
[C: tests/test_patch_modal.py:test_close_modal, tests/test_patch_modal.py:test_patch_modal_manager_init, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset]
"""
return self._show_modal
def set_apply_callback(self, callback: Callable[[str], bool]) -> None:
"""
[C: tests/test_patch_modal.py:test_apply_callback, tests/test_patch_modal.py:test_reset]
"""
self._on_apply_callback = callback
def set_apply_callback(self, callback: Callable[[str], bool]) -> None:
"""
[C: tests/test_patch_modal.py:test_apply_callback, tests/test_patch_modal.py:test_reset]
"""
self._on_apply_callback = callback
def set_reject_callback(self, callback: Callable[[], None]) -> None:
"""
[C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reset]
"""
self._on_reject_callback = callback
def set_reject_callback(self, callback: Callable[[], None]) -> None:
"""
[C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reset]
"""
self._on_reject_callback = callback
def apply_patch(self, patch_text: str) -> bool:
"""
[C: tests/test_patch_modal.py:test_apply_callback]
"""
if self._on_apply_callback:
return self._on_apply_callback(patch_text)
return False
def apply_patch(self, patch_text: str) -> bool:
"""
[C: tests/test_patch_modal.py:test_apply_callback]
"""
if self._on_apply_callback:
return self._on_apply_callback(patch_text)
return False
def reject_patch(self) -> None:
"""
[C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reject_patch]
"""
self._pending_patch = None
self._show_modal = False
if self._on_reject_callback:
self._on_reject_callback()
def reject_patch(self) -> None:
"""
[C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reject_patch]
"""
self._pending_patch = None
self._show_modal = False
if self._on_reject_callback:
self._on_reject_callback()
def close_modal(self) -> None:
"""
[C: tests/test_patch_modal.py:test_close_modal]
"""
self._show_modal = False
def close_modal(self) -> None:
"""
[C: tests/test_patch_modal.py:test_close_modal]
"""
self._show_modal = False
def reset(self) -> None:
"""
[C: tests/test_patch_modal.py:test_reset]
"""
self._pending_patch = None
self._show_modal = False
self._on_apply_callback = None
self._on_reject_callback = None
def reset(self) -> None:
"""
[C: tests/test_patch_modal.py:test_reset]
"""
self._pending_patch = None
self._show_modal = False
self._on_apply_callback = None
self._on_reject_callback = None
_patch_modal_manager: Optional[PatchModalManager] = None
def get_patch_modal_manager() -> PatchModalManager:
"""
[C: tests/test_patch_modal.py:test_get_patch_modal_manager_singleton]
"""
global _patch_modal_manager
if _patch_modal_manager is None:
_patch_modal_manager = PatchModalManager()
return _patch_modal_manager
"""
[C: tests/test_patch_modal.py:test_get_patch_modal_manager_singleton]
"""
global _patch_modal_manager
if _patch_modal_manager is None:
_patch_modal_manager = PatchModalManager()
return _patch_modal_manager
def reset_patch_modal_manager() -> None:
"""
[C: tests/test_patch_modal.py:test_get_patch_modal_manager_singleton]
"""
global _patch_modal_manager
if _patch_modal_manager:
_patch_modal_manager.reset()
_patch_modal_manager = None
"""
[C: tests/test_patch_modal.py:test_get_patch_modal_manager_singleton]
"""
global _patch_modal_manager
if _patch_modal_manager:
_patch_modal_manager.reset()
_patch_modal_manager = None
+26 -26
View File
@@ -3,29 +3,29 @@ from src import ai_client
def test_ai_client_send_gemini_cli() -> None:
test_message = "Hello, this is a test prompt for the CLI adapter."
test_response = "This is a dummy response from the Gemini CLI."
ai_client.reset_session()
ai_client.set_provider("gemini_cli", "gemini-2.5-flash-lite")
with patch("src.ai_client.GeminiCliAdapter") as MockAdapterClass:
mock_adapter_instance = MagicMock()
mock_adapter_instance.send.return_value = {
"text": test_response,
"tool_calls": [],
}
mock_adapter_instance.last_usage = {"total_tokens": 100}
mock_adapter_instance.last_latency = 0.5
mock_adapter_instance.session_id = "test-session"
MockAdapterClass.return_value = mock_adapter_instance
ai_client._gemini_cli_adapter = mock_adapter_instance
with patch.object(ai_client.events, "emit") as mock_emit:
response = ai_client.send(
md_content="<context></context>",
user_message=test_message,
base_dir=".",
)
mock_adapter_instance.send.assert_called()
emitted_event_names = [call.args[0] for call in mock_emit.call_args_list]
assert "request_start" in emitted_event_names
assert "response_received" in emitted_event_names
assert response == test_response
test_message = "Hello, this is a test prompt for the CLI adapter."
test_response = "This is a dummy response from the Gemini CLI."
ai_client.reset_session()
ai_client.set_provider("gemini_cli", "gemini-2.5-flash-lite")
with patch("src.ai_client.GeminiCliAdapter") as MockAdapterClass:
mock_adapter_instance = MagicMock()
mock_adapter_instance.send.return_value = {
"text": test_response,
"tool_calls": [],
}
mock_adapter_instance.last_usage = {"total_tokens": 100}
mock_adapter_instance.last_latency = 0.5
mock_adapter_instance.session_id = "test-session"
MockAdapterClass.return_value = mock_adapter_instance
ai_client._gemini_cli_adapter = mock_adapter_instance
with patch.object(ai_client.events, "emit") as mock_emit:
response = ai_client.send(
md_content="<context></context>",
user_message=test_message,
base_dir=".",
)
mock_adapter_instance.send.assert_called()
emitted_event_names = [call.args[0] for call in mock_emit.call_args_list]
assert "request_start" in emitted_event_names
assert "response_received" in emitted_event_names
assert response == test_response
+35 -35
View File
@@ -6,45 +6,45 @@ import unittest
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
class TestArchBoundaryPhase1(unittest.TestCase):
def setUp(self) -> None:
pass
def setUp(self) -> None:
pass
def tearDown(self) -> None:
pass
def tearDown(self) -> None:
pass
def test_unfettered_modules_constant_removed(self) -> None:
"""TEST 1: Check 'UNFETTERED_MODULES' string is removed from project_manager.py"""
# We check the source directly to be sure it's not just hidden
with open("src/project_manager.py", "r", encoding="utf-8") as f:
content = f.read()
self.assertNotIn("UNFETTERED_MODULES", content)
def test_unfettered_modules_constant_removed(self) -> None:
"""TEST 1: Check 'UNFETTERED_MODULES' string is removed from project_manager.py"""
# We check the source directly to be sure it's not just hidden
with open("src/project_manager.py", "r", encoding="utf-8") as f:
content = f.read()
self.assertNotIn("UNFETTERED_MODULES", content)
def test_mcp_client_whitelist_enforcement(self) -> None:
"""TEST 2: mcp_client._is_allowed must return False for config.toml"""
from src import mcp_client
from pathlib import Path
def test_mcp_client_whitelist_enforcement(self) -> None:
"""TEST 2: mcp_client._is_allowed must return False for config.toml"""
from src import mcp_client
from pathlib import Path
# Configure with some dummy file items (as dicts)
file_items = [{"path": "src/gui_2.py"}]
mcp_client.configure(file_items, [])
# Configure with some dummy file items (as dicts)
file_items = [{"path": "src/gui_2.py"}]
mcp_client.configure(file_items, [])
# Should allow src files
self.assertTrue(mcp_client._is_allowed(Path("src/gui_2.py")))
# Should REJECT config files
self.assertFalse(mcp_client._is_allowed(Path("config.toml")))
self.assertFalse(mcp_client._is_allowed(Path("credentials.toml")))
# Should allow src files
self.assertTrue(mcp_client._is_allowed(Path("src/gui_2.py")))
# Should REJECT config files
self.assertFalse(mcp_client._is_allowed(Path("config.toml")))
self.assertFalse(mcp_client._is_allowed(Path("credentials.toml")))
def test_mma_exec_no_hardcoded_path(self) -> None:
"""TEST 4: mma_exec.execute_agent must not contain hardcoded machine paths."""
with open("scripts/mma_exec.py", "r", encoding="utf-8") as f:
content = f.read()
# Check for some common home directory patterns or user paths
self.assertNotIn("C:\\Users\\Ed", content)
self.assertNotIn("/Users/ed", content)
def test_mma_exec_no_hardcoded_path(self) -> None:
"""TEST 4: mma_exec.execute_agent must not contain hardcoded machine paths."""
with open("scripts/mma_exec.py", "r", encoding="utf-8") as f:
content = f.read()
# Check for some common home directory patterns or user paths
self.assertNotIn("C:\\Users\\Ed", content)
self.assertNotIn("/Users/ed", content)
def test_claude_mma_exec_no_hardcoded_path(self) -> None:
"""TEST 5: claude_mma_exec.execute_agent must not contain hardcoded machine paths."""
with open("scripts/claude_mma_exec.py", "r", encoding="utf-8") as f:
content = f.read()
self.assertNotIn("C:\\Users\\Ed", content)
self.assertNotIn("/Users/ed", content)
def test_claude_mma_exec_no_hardcoded_path(self) -> None:
"""TEST 5: claude_mma_exec.execute_agent must not contain hardcoded machine paths."""
with open("scripts/claude_mma_exec.py", "r", encoding="utf-8") as f:
content = f.read()
self.assertNotIn("C:\\Users\\Ed", content)
self.assertNotIn("/Users/ed", content)
+81 -81
View File
@@ -8,93 +8,93 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
class TestArchBoundaryPhase2(unittest.TestCase):
def setUp(self) -> None:
def setUp(self) -> None:
pass
def test_toml_exposes_all_dispatch_tools(self) -> None:
"""manual_slop.toml [agent.tools] must list every tool in mcp_client.dispatch()."""
from src import models
# We check the tool names in the source of mcp_client.dispatch
import inspect
import src.mcp_client as mcp
source = inspect.getsource(mcp.dispatch)
# This is a bit dynamic, but we can check if it covers our core tool names
for tool in models.AGENT_TOOL_NAMES:
if tool not in ("set_file_slice", "py_update_definition", "py_set_signature", "py_set_var_declaration"):
# Non-mutating tools should definitely be handled
pass
def test_toml_mutating_tools_disabled_by_default(self) -> None:
"""Verify that the core set of read-only tools is present."""
from src.models import AGENT_TOOL_NAMES
# Our architecture now uses a fixed set of high-signal tools
self.assertIn("read_file", AGENT_TOOL_NAMES)
self.assertIn("list_directory", AGENT_TOOL_NAMES)
self.assertIn("py_get_skeleton", AGENT_TOOL_NAMES)
def test_toml_exposes_all_dispatch_tools(self) -> None:
"""manual_slop.toml [agent.tools] must list every tool in mcp_client.dispatch()."""
from src import models
def test_mcp_client_dispatch_completeness(self) -> None:
"""Verify that all tools in tool_schemas are handled by dispatch()."""
from src import mcp_client
# get_tool_schemas exists
available_tools = [t["name"] for t in mcp_client.get_tool_schemas()]
self.assertGreater(len(available_tools), 0)
def test_mutating_tool_triggers_callback(self) -> None:
"""All mutating tools must trigger the pre_tool_callback."""
from src.app_controller import AppController
# We check the tool names in the source of mcp_client.dispatch
import inspect
import src.mcp_client as mcp
source = inspect.getsource(mcp.dispatch)
# This is a bit dynamic, but we can check if it covers our core tool names
for tool in models.AGENT_TOOL_NAMES:
if tool not in ("set_file_slice", "py_update_definition", "py_set_signature", "py_set_var_declaration"):
# Non-mutating tools should definitely be handled
pass
def test_toml_mutating_tools_disabled_by_default(self) -> None:
"""Verify that the core set of read-only tools is present."""
from src.models import AGENT_TOOL_NAMES
# Our architecture now uses a fixed set of high-signal tools
self.assertIn("read_file", AGENT_TOOL_NAMES)
self.assertIn("list_directory", AGENT_TOOL_NAMES)
self.assertIn("py_get_skeleton", AGENT_TOOL_NAMES)
# Use a real AppController to test its _confirm_and_run
with patch('src.models.load_config', return_value={}), \
patch('src.performance_monitor.PerformanceMonitor'), \
patch('src.session_logger.open_session'), \
patch('src.session_logger.reset_session'), \
patch('src.app_controller.AppController._prune_old_logs'), \
patch('src.app_controller.AppController._init_ai_and_hooks'):
controller = AppController()
mock_cb = MagicMock(return_value="output")
# AppController implements its own _confirm_and_run, let's see how we can mock the HITL part
# In AppController._confirm_and_run, if test_hooks_enabled=False (default), it waits for a dialog
with patch("src.shell_runner.run_powershell", return_value="output"):
# Simulate auto-approval for test
controller.test_hooks_enabled = True
controller.ui_manual_approve = False
res = controller._confirm_and_run("echo hello", ".")
self.assertEqual(res, "output")
def test_mcp_client_dispatch_completeness(self) -> None:
"""Verify that all tools in tool_schemas are handled by dispatch()."""
from src import mcp_client
# get_tool_schemas exists
available_tools = [t["name"] for t in mcp_client.get_tool_schemas()]
self.assertGreater(len(available_tools), 0)
def test_mutating_tool_triggers_callback(self) -> None:
"""All mutating tools must trigger the pre_tool_callback."""
from src.app_controller import AppController
def test_rejection_prevents_dispatch(self) -> None:
"""When pre_tool_callback returns None (rejected), dispatch must NOT be called."""
from src.app_controller import AppController
# Use a real AppController to test its _confirm_and_run
with patch('src.models.load_config', return_value={}), \
patch('src.performance_monitor.PerformanceMonitor'), \
patch('src.session_logger.open_session'), \
patch('src.session_logger.reset_session'), \
patch('src.app_controller.AppController._prune_old_logs'), \
patch('src.app_controller.AppController._init_ai_and_hooks'):
controller = AppController()
with patch('src.models.load_config', return_value={}), \
patch('src.performance_monitor.PerformanceMonitor'), \
patch('src.session_logger.open_session'), \
patch('src.session_logger.reset_session'), \
patch('src.app_controller.AppController._prune_old_logs'), \
patch('src.app_controller.AppController._init_ai_and_hooks'):
controller = AppController()
mock_cb = MagicMock(return_value="output")
# AppController implements its own _confirm_and_run, let's see how we can mock the HITL part
# In AppController._confirm_and_run, if test_hooks_enabled=False (default), it waits for a dialog
with patch("src.shell_runner.run_powershell", return_value="output"):
# Simulate auto-approval for test
controller.test_hooks_enabled = True
controller.ui_manual_approve = False
res = controller._confirm_and_run("echo hello", ".")
self.assertEqual(res, "output")
def test_rejection_prevents_dispatch(self) -> None:
"""When pre_tool_callback returns None (rejected), dispatch must NOT be called."""
from src.app_controller import AppController
with patch('src.models.load_config', return_value={}), \
patch('src.performance_monitor.PerformanceMonitor'), \
patch('src.session_logger.open_session'), \
patch('src.session_logger.reset_session'), \
patch('src.app_controller.AppController._prune_old_logs'), \
patch('src.app_controller.AppController._init_ai_and_hooks'):
controller = AppController()
# Mock the wait() method of ConfirmDialog to return (False, script)
with patch("src.app_controller.ConfirmDialog") as mock_dialog_class:
mock_dialog = mock_dialog_class.return_value
mock_dialog.wait.return_value = (False, "script")
mock_dialog._uid = "test_uid"
# Mock the wait() method of ConfirmDialog to return (False, script)
with patch("src.app_controller.ConfirmDialog") as mock_dialog_class:
mock_dialog = mock_dialog_class.return_value
mock_dialog.wait.return_value = (False, "script")
mock_dialog._uid = "test_uid"
with patch("src.shell_runner.run_powershell") as mock_run:
controller.test_hooks_enabled = False # Force manual approval (dialog)
res = controller._confirm_and_run("script", ".")
self.assertIsNone(res)
self.assertFalse(mock_run.called)
with patch("src.shell_runner.run_powershell") as mock_run:
controller.test_hooks_enabled = False # Force manual approval (dialog)
res = controller._confirm_and_run("script", ".")
self.assertIsNone(res)
self.assertFalse(mock_run.called)
def test_non_mutating_tool_skips_callback(self) -> None:
"""Read-only tools must NOT trigger pre_tool_callback."""
from src import ai_client
# Check internal list or method
if hasattr(ai_client, '_is_mutating_tool'):
mutating = ["run_powershell", "set_file_slice"]
for t in mutating:
self.assertTrue(ai_client._is_mutating_tool(t))
def test_non_mutating_tool_skips_callback(self) -> None:
"""Read-only tools must NOT trigger pre_tool_callback."""
from src import ai_client
# Check internal list or method
if hasattr(ai_client, '_is_mutating_tool'):
mutating = ["run_powershell", "set_file_slice"]
for t in mutating:
self.assertTrue(ai_client._is_mutating_tool(t))
self.assertFalse(ai_client._is_mutating_tool("read_file"))
self.assertFalse(ai_client._is_mutating_tool("list_directory"))
self.assertFalse(ai_client._is_mutating_tool("read_file"))
self.assertFalse(ai_client._is_mutating_tool("list_directory"))
+59 -86
View File
@@ -1,91 +1,64 @@
import os
import sys
import unittest
# Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
class TestArchBoundaryPhase3(unittest.TestCase):
def setUp(self) -> None:
pass
def test_cascade_blocks_simple(self) -> None:
"""Test that a blocked dependency blocks its immediate dependent."""
from src.models import Ticket, Track
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"])
track = Track(id="TR1", description="track", tickets=[t1, t2])
# ExecutionEngine should identify T2 as blocked during tick
from src.dag_engine import TrackDAG, ExecutionEngine
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
engine.tick()
self.assertEqual(t2.status, "blocked")
if t2.blocked_reason:
self.assertIn("T1", t2.blocked_reason)
def test_cascade_blocks_multi_hop(self) -> None:
"""Test that blocking cascades through multiple dependencies."""
from src.models import Ticket
from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"])
t3 = Ticket(id="T3", description="d3", status="todo", assigned_to="worker1", depends_on=["T2"])
dag = TrackDAG([t1, t2, t3])
engine = ExecutionEngine(dag)
engine.tick()
self.assertEqual(t2.status, "blocked")
self.assertEqual(t3.status, "blocked")
def test_manual_unblock_restores_todo(self) -> None:
"""Test that unblocking a task manually works if dependencies are met."""
from src.models import Ticket
from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="completed", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="blocked", assigned_to="worker1", blocked_reason="manual")
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
# Update status to todo
engine.update_task_status("T2", "todo")
self.assertEqual(t2.status, "todo")
# Next tick should keep it todo (ready)
ready = engine.tick()
self.assertIn(t2, ready)
def test_in_progress_not_blocked(self) -> None:
"""Test that in_progress tasks are not blocked automatically (only todo)."""
from src.models import Ticket
from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="in_progress", assigned_to="worker1", depends_on=["T1"])
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
engine.tick()
# T2 should remain in_progress because it's already running
self.assertEqual(t2.status, "in_progress")
def test_execution_engine_tick_cascades_blocks(self) -> None:
"""Test that ExecutionEngine.tick() triggers the cascading blocks."""
from src.models import Ticket
from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"])
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
engine.tick()
self.assertEqual(t2.status, "blocked")
def setUp(self) -> None:
pass
def test_cascade_blocks_simple(self) -> None:
"""Test that a blocked dependency blocks its immediate dependent."""
from src.models import Ticket, Track
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"])
track = Track(id="TR1", description="track", tickets=[t1, t2])
from src.dag_engine import TrackDAG, ExecutionEngine
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
engine.tick()
self.assertEqual(t2.status, "blocked")
if t2.blocked_reason:
self.assertIn("T1", t2.blocked_reason)
def test_cascade_blocks_multi_hop(self) -> None:
"""Test that blocking cascades through multiple dependencies."""
from src.models import Ticket
from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"])
t3 = Ticket(id="T3", description="d3", status="todo", assigned_to="worker1", depends_on=["T2"])
dag = TrackDAG([t1, t2, t3])
engine = ExecutionEngine(dag)
engine.tick()
self.assertEqual(t2.status, "blocked")
self.assertEqual(t3.status, "blocked")
def test_manual_unblock_restores_todo(self) -> None:
"""Test that unblocking a task manually works if dependencies are met."""
from src.models import Ticket
from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="completed", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="blocked", assigned_to="worker1", blocked_reason="manual")
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
engine.update_task_status("T2", "todo")
self.assertEqual(t2.status, "todo")
ready = engine.tick()
self.assertIn(t2, ready)
def test_in_progress_not_blocked(self) -> None:
"""Test that in_progress tasks are not blocked automatically (only todo)."""
from src.models import Ticket
from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="in_progress", assigned_to="worker1", depends_on=["T1"])
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
engine.tick()
self.assertEqual(t2.status, "in_progress")
def test_execution_engine_tick_cascades_blocks(self) -> None:
"""Test that ExecutionEngine.tick() triggers the cascading blocks."""
from src.models import Ticket
from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"])
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
engine.tick()
self.assertEqual(t2.status, "blocked")
+31 -31
View File
@@ -4,45 +4,45 @@ from src.app_controller import AppController
from src.models import FileItem
def test_context_files_is_decoupled():
controller = AppController()
controller = AppController()
# Verify both lists exist and are distinct
assert hasattr(controller, 'files')
assert hasattr(controller, 'context_files')
assert controller.files is not controller.context_files
# Verify both lists exist and are distinct
assert hasattr(controller, 'files')
assert hasattr(controller, 'context_files')
assert controller.files is not controller.context_files
# Modifying one should not affect the other
controller.files.append(FileItem(path="whitelist.txt"))
controller.context_files.append(FileItem(path="context.txt"))
# Modifying one should not affect the other
controller.files.append(FileItem(path="whitelist.txt"))
controller.context_files.append(FileItem(path="context.txt"))
assert len(controller.files) == 1
assert controller.files[0].path == "whitelist.txt"
assert len(controller.files) == 1
assert controller.files[0].path == "whitelist.txt"
assert len(controller.context_files) == 1
assert controller.context_files[0].path == "context.txt"
assert len(controller.context_files) == 1
assert controller.context_files[0].path == "context.txt"
def test_do_generate_uses_context_files(monkeypatch):
controller = AppController()
controller.init_state()
controller.context_files = [FileItem(path="context.txt")]
controller.files = [FileItem(path="whitelist.txt")]
controller = AppController()
controller.init_state()
controller.context_files = [FileItem(path="context.txt")]
controller.files = [FileItem(path="whitelist.txt")]
# Mock project_manager.flat_config and aggregate.run to verify passed data
import src.project_manager as pm
import src.aggregate as agg
# Mock project_manager.flat_config and aggregate.run to verify passed data
import src.project_manager as pm
import src.aggregate as agg
def mock_flat_config(*args, **kwargs):
return {"files": {}}
def mock_flat_config(*args, **kwargs):
return {"files": {}}
def mock_aggregate_run(flat, **kwargs):
assert flat["files"]["paths"] == controller.context_files
return ("md", Path("path"), [])
def mock_aggregate_run(flat, **kwargs):
assert flat["files"]["paths"] == controller.context_files
return ("md", Path("path"), [])
monkeypatch.setattr(pm, "flat_config", mock_flat_config)
monkeypatch.setattr(pm, "save_project", lambda *args: None)
monkeypatch.setattr(agg, "run", mock_aggregate_run)
monkeypatch.setattr(agg, "build_markdown_no_history", lambda *args, **kwargs: "stable")
monkeypatch.setattr(agg, "build_discussion_text", lambda *args, **kwargs: "disc")
monkeypatch.setattr(pm, "flat_config", mock_flat_config)
monkeypatch.setattr(pm, "save_project", lambda *args: None)
monkeypatch.setattr(agg, "run", mock_aggregate_run)
monkeypatch.setattr(agg, "build_markdown_no_history", lambda *args, **kwargs: "stable")
monkeypatch.setattr(agg, "build_discussion_text", lambda *args, **kwargs: "disc")
# Should not raise assertion error
controller._do_generate()
# Should not raise assertion error
controller._do_generate()
+22 -22
View File
@@ -3,28 +3,28 @@ from src.aggregate import group_files_by_dir, compute_file_stats
from src.models import FileItem
def test_group_files_by_dir():
files = [
FileItem(path="src/main.py"),
FileItem(path="src/utils/helpers.py"),
FileItem(path="tests/test_main.py"),
FileItem(path="README.md")
]
grouped = group_files_by_dir(files)
assert len(grouped) == 4
assert grouped["src"] == [files[0]]
assert grouped["src/utils"] == [files[1]]
assert grouped["tests"] == [files[2]]
assert grouped["."] == [files[3]]
files = [
FileItem(path="src/main.py"),
FileItem(path="src/utils/helpers.py"),
FileItem(path="tests/test_main.py"),
FileItem(path="README.md")
]
grouped = group_files_by_dir(files)
assert len(grouped) == 4
assert grouped["src"] == [files[0]]
assert grouped["src/utils"] == [files[1]]
assert grouped["tests"] == [files[2]]
assert grouped["."] == [files[3]]
def test_compute_file_stats():
import tempfile
import os
with tempfile.TemporaryDirectory() as temp_dir:
# Create a dummy python file
py_path = os.path.join(temp_dir, "test.py")
with open(py_path, "w") as f:
f.write("def foo():\n pass\n\nclass Bar:\n pass\n")
import tempfile
import os
with tempfile.TemporaryDirectory() as temp_dir:
# Create a dummy python file
py_path = os.path.join(temp_dir, "test.py")
with open(py_path, "w") as f:
f.write("def foo():\n pass\n\nclass Bar:\n pass\n")
stats = compute_file_stats(py_path)
assert stats["lines"] == 5
assert stats["ast_elements"] == 2 # 1 func, 1 class
stats = compute_file_stats(py_path)
assert stats["lines"] == 5
assert stats["ast_elements"] == 2 # 1 func, 1 class
+22 -22
View File
@@ -3,33 +3,33 @@ from src.gui_2 import App
from src.models import FileItem
def test_view_mode_initialization():
app = App()
# Mock imgui
from imgui_bundle import imgui
app = App()
# Mock imgui
from imgui_bundle import imgui
app.context_files = [
FileItem(path="test1.py"),
FileItem(path="test2.py", view_mode="full")
]
app.context_files = [
FileItem(path="test1.py"),
FileItem(path="test2.py", view_mode="full")
]
# We test the model defaults and the rendering assignment logic indirectly.
assert app.context_files[0].view_mode == "full" # Default from FileItem Model
assert app.context_files[1].view_mode == "full"
# We test the model defaults and the rendering assignment logic indirectly.
assert app.context_files[0].view_mode == "full" # Default from FileItem Model
assert app.context_files[1].view_mode == "full"
def test_batch_view_mode_change():
app = App()
app = App()
f1 = FileItem(path="test1.py", view_mode="full")
f2 = FileItem(path="test2.py", view_mode="full")
f1 = FileItem(path="test1.py", view_mode="full")
f2 = FileItem(path="test2.py", view_mode="full")
app.context_files = [f1, f2]
app.ui_selected_context_files = {"test1.py"}
app.context_files = [f1, f2]
app.ui_selected_context_files = {"test1.py"}
# Simulate clicking "Skeleton" batch button
for f in app.context_files:
f_path = f.path if hasattr(f, "path") else str(f)
if f_path in app.ui_selected_context_files:
f.view_mode = "skeleton"
# Simulate clicking "Skeleton" batch button
for f in app.context_files:
f_path = f.path if hasattr(f, "path") else str(f)
if f_path in app.ui_selected_context_files:
f.view_mode = "skeleton"
assert f1.view_mode == "skeleton"
assert f2.view_mode == "full" # not selected
assert f1.view_mode == "skeleton"
assert f2.view_mode == "full" # not selected
+66 -66
View File
@@ -3,55 +3,55 @@ import tempfile
import os
from pathlib import Path
from src.diff_viewer import (
parse_diff, DiffFile, DiffHunk, parse_hunk_header,
get_line_color, apply_patch_to_file
parse_diff, DiffFile, DiffHunk, parse_hunk_header,
get_line_color, apply_patch_to_file
)
def test_parse_diff_empty() -> None:
result = parse_diff("")
assert result == []
result = parse_diff("")
assert result == []
def test_parse_diff_none() -> None:
result = parse_diff(None) # type: ignore
assert result == []
result = parse_diff(None) # type: ignore
assert result == []
def test_parse_simple_diff() -> None:
diff_text = """--- a/src/test.py
diff_text = """--- a/src/test.py
+++ b/src/test.py
@@ -1 +1 @@
-old
+new"""
result = parse_diff(diff_text)
assert len(result) == 1
assert result[0].old_path == "src/test.py"
assert result[0].new_path == "src/test.py"
assert len(result[0].hunks) == 1
assert result[0].hunks[0].header == "@@ -1 +1 @@"
result = parse_diff(diff_text)
assert len(result) == 1
assert result[0].old_path == "src/test.py"
assert result[0].new_path == "src/test.py"
assert len(result[0].hunks) == 1
assert result[0].hunks[0].header == "@@ -1 +1 @@"
def test_parse_diff_with_context() -> None:
diff_text = """--- a/src/example.py
diff_text = """--- a/src/example.py
+++ b/src/example.py
@@ -10,5 +10,6 @@
def existing_function():
pass
def existing_function():
pass
- old_line
+ old_line
+ new_line
more_code"""
result = parse_diff(diff_text)
assert len(result) == 1
assert result[0].old_path == "src/example.py"
assert len(result[0].hunks) == 1
hunk = result[0].hunks[0]
assert hunk.old_start == 10
assert hunk.old_count == 5
assert hunk.new_start == 10
assert hunk.new_count == 6
assert "- old_line" in hunk.lines
assert "+ new_line" in hunk.lines
more_code"""
result = parse_diff(diff_text)
assert len(result) == 1
assert result[0].old_path == "src/example.py"
assert len(result[0].hunks) == 1
hunk = result[0].hunks[0]
assert hunk.old_start == 10
assert hunk.old_count == 5
assert hunk.new_start == 10
assert hunk.new_count == 6
assert "- old_line" in hunk.lines
assert "+ new_line" in hunk.lines
def test_parse_multiple_files() -> None:
diff_text = """--- a/file1.py
diff_text = """--- a/file1.py
+++ b/file1.py
@@ -1 +1 @@
-a
@@ -61,70 +61,70 @@ def test_parse_multiple_files() -> None:
@@ -1 +1 @@
-c
+d"""
result = parse_diff(diff_text)
assert len(result) == 2
assert result[0].old_path == "file1.py"
assert result[1].old_path == "file2.py"
result = parse_diff(diff_text)
assert len(result) == 2
assert result[0].old_path == "file1.py"
assert result[1].old_path == "file2.py"
def test_parse_hunk_header() -> None:
result = parse_hunk_header("@@ -10,5 +10,6 @@")
assert result == (10, 5, 10, 6)
result = parse_hunk_header("@@ -10,5 +10,6 @@")
assert result == (10, 5, 10, 6)
result = parse_hunk_header("@@ -1 +1 @@")
assert result == (1, 1, 1, 1)
result = parse_hunk_header("@@ -1 +1 @@")
assert result == (1, 1, 1, 1)
def test_diff_line_classification() -> None:
diff_text = """--- a/test.py
diff_text = """--- a/test.py
+++ b/test.py
@@ -1,3 +1,4 @@
context line
context line
-removed line
+removed line
+added line
another context"""
result = parse_diff(diff_text)
hunk = result[0].hunks[0]
assert any(line.startswith("-") for line in hunk.lines)
assert any(line.startswith("+") for line in hunk.lines)
assert any(line.startswith(" ") or not line.startswith(("-", "+")) for line in hunk.lines)
another context"""
result = parse_diff(diff_text)
hunk = result[0].hunks[0]
assert any(line.startswith("-") for line in hunk.lines)
assert any(line.startswith("+") for line in hunk.lines)
assert any(line.startswith(" ") or not line.startswith(("-", "+")) for line in hunk.lines)
def test_get_line_color() -> None:
assert get_line_color("+added") == "green"
assert get_line_color("-removed") == "red"
assert get_line_color("@@ -1,3 +1,4 @@") == "cyan"
assert get_line_color(" context") == None
assert get_line_color("+added") == "green"
assert get_line_color("-removed") == "red"
assert get_line_color("@@ -1,3 +1,4 @@") == "cyan"
assert get_line_color(" context") == None
def test_apply_patch_simple() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "test.py"
test_file.write_text("old\n")
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "test.py"
test_file.write_text("old\n")
patch = f"""--- a/{test_file.name}
patch = f"""--- a/{test_file.name}
+++ b/{test_file.name}
@@ -1 +1 @@
-old
+new"""
success, msg = apply_patch_to_file(patch, tmpdir)
assert success
assert test_file.read_text() == "new\n"
success, msg = apply_patch_to_file(patch, tmpdir)
assert success
assert test_file.read_text() == "new\n"
def test_apply_patch_with_context() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "example.py"
test_file.write_text("line 1\nline 2\nline 3\n")
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "example.py"
test_file.write_text("line 1\nline 2\nline 3\n")
patch = f"""--- a/{test_file.name}
patch = f"""--- a/{test_file.name}
+++ b/{test_file.name}
@@ -1,3 +1,3 @@
-line 1
-line 2
+line one
+line two
line 3"""
line 3"""
success, msg = apply_patch_to_file(patch, tmpdir)
assert success
content = test_file.read_text()
assert "line one" in content
assert "line two" in content
success, msg = apply_patch_to_file(patch, tmpdir)
assert success
content = test_file.read_text()
assert "line one" in content
assert "line two" in content
+82 -82
View File
@@ -3,127 +3,127 @@ import pytest
from unittest.mock import patch, MagicMock
from src.models import TextEditorConfig, ExternalEditorConfig
from src.external_editor import (
ExternalEditorLauncher,
get_default_launcher,
create_temp_modified_file,
ExternalEditorLauncher,
get_default_launcher,
create_temp_modified_file,
)
@pytest.fixture
def vscode_editor():
return TextEditorConfig(name="vscode", path="C:\\path\\to\\code.exe", diff_args=["--diff"])
return TextEditorConfig(name="vscode", path="C:\\path\\to\\code.exe", diff_args=["--diff"])
@pytest.fixture
def notepadpp_editor():
return TextEditorConfig(name="notepad++", path="C:\\path\\to\\notepad++.exe", diff_args=["-multiInst", "-nosession"])
return TextEditorConfig(name="notepad++", path="C:\\path\\to\\notepad++.exe", diff_args=["-multiInst", "-nosession"])
@pytest.fixture
def ext_config(vscode_editor, notepadpp_editor):
return ExternalEditorConfig(
editors={"vscode": vscode_editor, "notepad++": notepadpp_editor},
default_editor="vscode",
)
return ExternalEditorConfig(
editors={"vscode": vscode_editor, "notepad++": notepadpp_editor},
default_editor="vscode",
)
@pytest.fixture
def launcher(ext_config):
return ExternalEditorLauncher(ext_config)
return ExternalEditorLauncher(ext_config)
class TestTextEditorConfig:
def test_from_dict_with_diff_args(self):
data = {"name": "vscode", "path": "C:\\code.exe", "diff_args": ["--diff"]}
editor = TextEditorConfig.from_dict(data)
assert editor.name == "vscode"
assert editor.path == "C:\\code.exe"
assert editor.diff_args == ["--diff"]
def test_from_dict_with_diff_args(self):
data = {"name": "vscode", "path": "C:\\code.exe", "diff_args": ["--diff"]}
editor = TextEditorConfig.from_dict(data)
assert editor.name == "vscode"
assert editor.path == "C:\\code.exe"
assert editor.diff_args == ["--diff"]
def test_from_dict_without_diff_args(self):
data = {"name": "vscode", "path": "C:\\code.exe"}
editor = TextEditorConfig.from_dict(data)
assert editor.diff_args == []
def test_from_dict_without_diff_args(self):
data = {"name": "vscode", "path": "C:\\code.exe"}
editor = TextEditorConfig.from_dict(data)
assert editor.diff_args == []
def test_to_dict(self, vscode_editor):
result = vscode_editor.to_dict()
assert result["name"] == "vscode"
assert result["path"] == "C:\\path\\to\\code.exe"
assert result["diff_args"] == ["--diff"]
def test_to_dict(self, vscode_editor):
result = vscode_editor.to_dict()
assert result["name"] == "vscode"
assert result["path"] == "C:\\path\\to\\code.exe"
assert result["diff_args"] == ["--diff"]
class TestExternalEditorConfig:
def test_from_dict_with_string_editors(self):
data = {"editors": {"vscode": "C:\\code.exe"}, "default_editor": "vscode"}
config = ExternalEditorConfig.from_dict(data)
assert "vscode" in config.editors
assert config.editors["vscode"].path == "C:\\code.exe"
def test_from_dict_with_string_editors(self):
data = {"editors": {"vscode": "C:\\code.exe"}, "default_editor": "vscode"}
config = ExternalEditorConfig.from_dict(data)
assert "vscode" in config.editors
assert config.editors["vscode"].path == "C:\\code.exe"
def test_from_dict_with_dict_editors(self, vscode_editor):
data = {"editors": {"vscode": {"name": "vscode", "path": "C:\\code.exe", "diff_args": ["--diff"]}}}
config = ExternalEditorConfig.from_dict(data)
assert config.editors["vscode"].diff_args == ["--diff"]
def test_from_dict_with_dict_editors(self, vscode_editor):
data = {"editors": {"vscode": {"name": "vscode", "path": "C:\\code.exe", "diff_args": ["--diff"]}}}
config = ExternalEditorConfig.from_dict(data)
assert config.editors["vscode"].diff_args == ["--diff"]
def test_get_default_returns_configured(self, ext_config):
result = ext_config.get_default()
assert result.name == "vscode"
def test_get_default_returns_configured(self, ext_config):
result = ext_config.get_default()
assert result.name == "vscode"
def test_get_default_fallback_to_first(self):
config = ExternalEditorConfig(editors={"notepad++": TextEditorConfig(name="notepad++", path="C:\\npp.exe")})
result = config.get_default()
assert result.name == "notepad++"
def test_get_default_fallback_to_first(self):
config = ExternalEditorConfig(editors={"notepad++": TextEditorConfig(name="notepad++", path="C:\\npp.exe")})
result = config.get_default()
assert result.name == "notepad++"
def test_get_default_returns_none_when_empty(self):
config = ExternalEditorConfig(editors={})
assert config.get_default() is None
def test_get_default_returns_none_when_empty(self):
config = ExternalEditorConfig(editors={})
assert config.get_default() is None
def test_to_dict(self, ext_config):
result = ext_config.to_dict()
assert result["default_editor"] == "vscode"
assert "vscode" in result["editors"]
def test_to_dict(self, ext_config):
result = ext_config.to_dict()
assert result["default_editor"] == "vscode"
assert "vscode" in result["editors"]
class TestExternalEditorLauncher:
def test_get_editor_by_name(self, launcher):
editor = launcher.get_editor("notepad++")
assert editor.name == "notepad++"
def test_get_editor_by_name(self, launcher):
editor = launcher.get_editor("notepad++")
assert editor.name == "notepad++"
def test_get_editor_returns_default(self, launcher):
editor = launcher.get_editor()
assert editor.name == "vscode"
def test_get_editor_returns_default(self, launcher):
editor = launcher.get_editor()
assert editor.name == "vscode"
def test_get_editor_unknown_name(self, launcher):
editor = launcher.get_editor("unknown")
assert editor is None
def test_get_editor_unknown_name(self, launcher):
editor = launcher.get_editor("unknown")
assert editor is None
def test_build_diff_command(self, launcher, vscode_editor):
cmd = launcher.build_diff_command(vscode_editor, "orig.txt", "mod.txt")
assert cmd == ["C:\\path\\to\\code.exe", "--diff", "orig.txt", "mod.txt"]
def test_build_diff_command(self, launcher, vscode_editor):
cmd = launcher.build_diff_command(vscode_editor, "orig.txt", "mod.txt")
assert cmd == ["C:\\path\\to\\code.exe", "--diff", "orig.txt", "mod.txt"]
def test_launch_diff_missing_editor(self, launcher):
result = launcher.launch_diff("nonexistent", "orig.txt", "mod.txt")
assert result is None
def test_launch_diff_missing_editor(self, launcher):
result = launcher.launch_diff("nonexistent", "orig.txt", "mod.txt")
assert result is None
@patch("subprocess.Popen")
def test_launch_diff_success(self, mock_popen, launcher):
mock_popen.return_value = MagicMock()
result = launcher.launch_diff("vscode", "orig.txt", "mod.txt")
assert result is not None
mock_popen.assert_called_once()
@patch("subprocess.Popen")
def test_launch_diff_success(self, mock_popen, launcher):
mock_popen.return_value = MagicMock()
result = launcher.launch_diff("vscode", "orig.txt", "mod.txt")
assert result is not None
mock_popen.assert_called_once()
@patch("subprocess.Popen")
def test_launch_diff_file_not_found(self, mock_popen, launcher):
mock_popen.side_effect = FileNotFoundError()
result = launcher.launch_diff("vscode", "orig.txt", "mod.txt")
assert result is None
@patch("subprocess.Popen")
def test_launch_diff_file_not_found(self, mock_popen, launcher):
mock_popen.side_effect = FileNotFoundError()
result = launcher.launch_diff("vscode", "orig.txt", "mod.txt")
assert result is None
class TestHelperFunctions:
def test_create_temp_modified_file(self):
content = "test content"
path = create_temp_modified_file(content)
assert path.endswith("_modified")
with open(path, encoding="utf-8") as f:
assert f.read() == content
import os
os.unlink(path)
def test_create_temp_modified_file(self):
content = "test content"
path = create_temp_modified_file(content)
assert path.endswith("_modified")
with open(path, encoding="utf-8") as f:
assert f.read() == content
import os
os.unlink(path)
+41 -41
View File
@@ -8,55 +8,55 @@ from src import models
@pytest.mark.asyncio
async def test_external_mcp_hitl_approval():
# 1. Setup mock manager and server
mock_manager = mcp_client.ExternalMCPManager()
mock_server = AsyncMock()
mock_server.name = "test-server"
mock_server.tools = {"ext_tool": {"name": "ext_tool", "description": "desc"}}
mock_server.call_tool.return_value = "Success"
mock_manager.servers["test-server"] = mock_server
# 1. Setup mock manager and server
mock_manager = mcp_client.ExternalMCPManager()
mock_server = AsyncMock()
mock_server.name = "test-server"
mock_server.tools = {"ext_tool": {"name": "ext_tool", "description": "desc"}}
mock_server.call_tool.return_value = "Success"
mock_manager.servers["test-server"] = mock_server
with patch("src.mcp_client.get_external_mcp_manager", return_value=mock_manager):
# 2. Setup ai_client callbacks
mock_pre_tool = MagicMock(return_value="Approved")
ai_client.confirm_and_run_callback = mock_pre_tool
with patch("src.mcp_client.get_external_mcp_manager", return_value=mock_manager):
# 2. Setup ai_client callbacks
mock_pre_tool = MagicMock(return_value="Approved")
ai_client.confirm_and_run_callback = mock_pre_tool
# 3. Call _execute_single_tool_call_async
name = "ext_tool"
args = {"arg1": "val1"}
call_id = "call_123"
base_dir = "."
# 3. Call _execute_single_tool_call_async
name = "ext_tool"
args = {"arg1": "val1"}
call_id = "call_123"
base_dir = "."
# We need to pass the callback to the function
name, cid, out, orig_name = await ai_client._execute_single_tool_call_async(
name, args, call_id, base_dir, mock_pre_tool, None, 0
)
# We need to pass the callback to the function
name, cid, out, orig_name = await ai_client._execute_single_tool_call_async(
name, args, call_id, base_dir, mock_pre_tool, None, 0
)
# 4. Assertions
assert out == "Success"
mock_pre_tool.assert_called_once()
# Check description contains EXTERNAL MCP
call_args = mock_pre_tool.call_args[0]
assert "EXTERNAL MCP TOOL: ext_tool" in call_args[0]
assert "arg1: 'val1'" in call_args[0]
# 4. Assertions
assert out == "Success"
mock_pre_tool.assert_called_once()
# Check description contains EXTERNAL MCP
call_args = mock_pre_tool.call_args[0]
assert "EXTERNAL MCP TOOL: ext_tool" in call_args[0]
assert "arg1: 'val1'" in call_args[0]
@pytest.mark.asyncio
async def test_external_mcp_hitl_rejection():
mock_manager = mcp_client.ExternalMCPManager()
mock_server = AsyncMock()
mock_server.name = "test-server"
mock_server.tools = {"ext_tool": {"name": "ext_tool"}}
mock_manager.servers["test-server"] = mock_server
mock_manager = mcp_client.ExternalMCPManager()
mock_server = AsyncMock()
mock_server.name = "test-server"
mock_server.tools = {"ext_tool": {"name": "ext_tool"}}
mock_manager.servers["test-server"] = mock_server
with patch("src.mcp_client.get_external_mcp_manager", return_value=mock_manager):
mock_pre_tool = MagicMock(return_value=None) # Rejection
with patch("src.mcp_client.get_external_mcp_manager", return_value=mock_manager):
mock_pre_tool = MagicMock(return_value=None) # Rejection
name = "ext_tool"
args = {"arg1": "val1"}
name = "ext_tool"
args = {"arg1": "val1"}
name, cid, out, orig_name = await ai_client._execute_single_tool_call_async(
name, args, "id", ".", mock_pre_tool, None, 0
)
name, cid, out, orig_name = await ai_client._execute_single_tool_call_async(
name, args, "id", ".", mock_pre_tool, None, 0
)
assert out == "USER REJECTED: tool execution cancelled"
mock_server.call_tool.assert_not_called()
assert out == "USER REJECTED: tool execution cancelled"
mock_server.call_tool.assert_not_called()
+49 -49
View File
@@ -3,57 +3,57 @@ from src.fuzzy_anchor import FuzzyAnchor
class TestFuzzyAnchor:
def test_create_slice_basic(self):
text = "line0\nline1\nline2\nline3\nline4\n"
result = FuzzyAnchor.create_slice(text, 2, 4)
assert "start_line" in result
assert "end_line" in result
assert "content_hash" in result
assert "start_context" in result
assert "end_context" in result
assert result["start_line"] == 2
assert result["end_line"] == 4
assert result["start_context"] == result["end_context"]
def test_create_slice_basic(self):
text = "line0\nline1\nline2\nline3\nline4\n"
result = FuzzyAnchor.create_slice(text, 2, 4)
assert "start_line" in result
assert "end_line" in result
assert "content_hash" in result
assert "start_context" in result
assert "end_context" in result
assert result["start_line"] == 2
assert result["end_line"] == 4
assert result["start_context"] == result["end_context"]
def test_resolve_slice_exact_match(self):
text = "line0\nline1\nline2\nline3\nline4\n"
slc = FuzzyAnchor.create_slice(text, 2, 4)
result = FuzzyAnchor.resolve_slice(text, slc)
assert result is not None
start, end = result
assert start == 2
assert end == 4
def test_resolve_slice_exact_match(self):
text = "line0\nline1\nline2\nline3\nline4\n"
slc = FuzzyAnchor.create_slice(text, 2, 4)
result = FuzzyAnchor.resolve_slice(text, slc)
assert result is not None
start, end = result
assert start == 2
assert end == 4
def test_resolve_slice_line_inserted_before(self):
original = "line0\nline1\nline2\nline3\nline4\n"
modified = "NEW\nline0\nline1\nline2\nline3\nline4\n"
slc = FuzzyAnchor.create_slice(original, 2, 4)
result = FuzzyAnchor.resolve_slice(modified, slc)
assert result is not None
start, end = result
assert start == 3
assert end == 5
def test_resolve_slice_line_inserted_before(self):
original = "line0\nline1\nline2\nline3\nline4\n"
modified = "NEW\nline0\nline1\nline2\nline3\nline4\n"
slc = FuzzyAnchor.create_slice(original, 2, 4)
result = FuzzyAnchor.resolve_slice(modified, slc)
assert result is not None
start, end = result
assert start == 3
assert end == 5
def test_resolve_slice_line_deleted_before_returns_none(self):
original = "line0\nline1\nline2\nline3\nline4\n"
modified = "line0\nline2\nline3\nline4\n"
slc = FuzzyAnchor.create_slice(original, 2, 4)
result = FuzzyAnchor.resolve_slice(modified, slc)
assert result is None
def test_resolve_slice_line_deleted_before_returns_none(self):
original = "line0\nline1\nline2\nline3\nline4\n"
modified = "line0\nline2\nline3\nline4\n"
slc = FuzzyAnchor.create_slice(original, 2, 4)
result = FuzzyAnchor.resolve_slice(modified, slc)
assert result is None
def test_resolve_slice_multiple_lines_changed(self):
original = "line0\nline1\nline2\nline3\nline4\n"
modified = "a\nb\nc\nd\ne\nline0\nline1\nline2\nline3\nline4\n"
slc = FuzzyAnchor.create_slice(original, 1, 2)
result = FuzzyAnchor.resolve_slice(modified, slc)
assert result is not None
start, end = result
assert start == 6
assert end == 7
def test_resolve_slice_multiple_lines_changed(self):
original = "line0\nline1\nline2\nline3\nline4\n"
modified = "a\nb\nc\nd\ne\nline0\nline1\nline2\nline3\nline4\n"
slc = FuzzyAnchor.create_slice(original, 1, 2)
result = FuzzyAnchor.resolve_slice(modified, slc)
assert result is not None
start, end = result
assert start == 6
assert end == 7
def test_resolve_slice_anchor_mismatch_returns_none(self):
original = "alpha\nbeta\ngamma\ndelta\nepsilon\n"
modified = "foo\nbar\nbaz\ndelta\nepsilon\n"
slc = FuzzyAnchor.create_slice(original, 2, 3)
result = FuzzyAnchor.resolve_slice(modified, slc)
assert result is None
def test_resolve_slice_anchor_mismatch_returns_none(self):
original = "alpha\nbeta\ngamma\ndelta\nepsilon\n"
modified = "foo\nbar\nbaz\ndelta\nepsilon\n"
slc = FuzzyAnchor.create_slice(original, 2, 3)
result = FuzzyAnchor.resolve_slice(modified, slc)
assert result is None
+75 -75
View File
@@ -4,82 +4,82 @@ from src.gemini_cli_adapter import GeminiCliAdapter
class TestGeminiCliAdapter:
@patch("subprocess.Popen")
def test_send_starts_subprocess_with_correct_args(
self, mock_popen: MagicMock
) -> None:
adapter = GeminiCliAdapter(binary_path="gemini")
mock_process = MagicMock()
mock_process.communicate.return_value = (
'{"type": "message", "content": "hello"}',
"",
)
mock_process.returncode = 0
mock_popen.return_value = mock_process
adapter.send("test prompt")
assert mock_popen.called
args, kwargs = mock_popen.call_args
cmd_list = args[0]
assert "gemini" in cmd_list
assert "--prompt" in cmd_list
assert "--output-format" in cmd_list
assert "stream-json" in cmd_list
@patch("subprocess.Popen")
def test_send_starts_subprocess_with_correct_args(
self, mock_popen: MagicMock
) -> None:
adapter = GeminiCliAdapter(binary_path="gemini")
mock_process = MagicMock()
mock_process.communicate.return_value = (
'{"type": "message", "content": "hello"}',
"",
)
mock_process.returncode = 0
mock_popen.return_value = mock_process
adapter.send("test prompt")
assert mock_popen.called
args, kwargs = mock_popen.call_args
cmd_list = args[0]
assert "gemini" in cmd_list
assert "--prompt" in cmd_list
assert "--output-format" in cmd_list
assert "stream-json" in cmd_list
@patch("subprocess.Popen")
def test_send_parses_jsonl_output(self, mock_popen: MagicMock) -> None:
adapter = GeminiCliAdapter()
stdout_str = '{"type": "message", "content": "Hello "}\n{"type": "message", "content": "world!"}\n'
mock_process = MagicMock()
mock_process.communicate.return_value = (stdout_str, "")
mock_process.returncode = 0
mock_popen.return_value = mock_process
result = adapter.send("msg")
assert result["text"] == "Hello world!"
@patch("subprocess.Popen")
def test_send_parses_jsonl_output(self, mock_popen: MagicMock) -> None:
adapter = GeminiCliAdapter()
stdout_str = '{"type": "message", "content": "Hello "}\n{"type": "message", "content": "world!"}\n'
mock_process = MagicMock()
mock_process.communicate.return_value = (stdout_str, "")
mock_process.returncode = 0
mock_popen.return_value = mock_process
result = adapter.send("msg")
assert result["text"] == "Hello world!"
@patch("subprocess.Popen")
def test_send_handles_tool_use_events(self, mock_popen: MagicMock) -> None:
adapter = GeminiCliAdapter()
tool_json = {
"type": "tool_use",
"tool_name": "read_file",
"parameters": {"path": "test.txt"},
"tool_id": "call_123",
}
stdout_str = json.dumps(tool_json) + "\n"
mock_process = MagicMock()
mock_process.communicate.return_value = (stdout_str, "")
mock_process.returncode = 0
mock_popen.return_value = mock_process
result = adapter.send("msg")
assert len(result["tool_calls"]) == 1
assert result["tool_calls"][0]["name"] == "read_file"
assert result["tool_calls"][0]["args"]["path"] == "test.txt"
@patch("subprocess.Popen")
def test_send_handles_tool_use_events(self, mock_popen: MagicMock) -> None:
adapter = GeminiCliAdapter()
tool_json = {
"type": "tool_use",
"tool_name": "read_file",
"parameters": {"path": "test.txt"},
"tool_id": "call_123",
}
stdout_str = json.dumps(tool_json) + "\n"
mock_process = MagicMock()
mock_process.communicate.return_value = (stdout_str, "")
mock_process.returncode = 0
mock_popen.return_value = mock_process
result = adapter.send("msg")
assert len(result["tool_calls"]) == 1
assert result["tool_calls"][0]["name"] == "read_file"
assert result["tool_calls"][0]["args"]["path"] == "test.txt"
@patch("subprocess.Popen")
def test_send_captures_usage_metadata(self, mock_popen: MagicMock) -> None:
adapter = GeminiCliAdapter()
result_json = {"type": "result", "stats": {"total_tokens": 50}}
stdout_str = json.dumps(result_json) + "\n"
mock_process = MagicMock()
mock_process.communicate.return_value = (stdout_str, "")
mock_process.returncode = 0
mock_popen.return_value = mock_process
adapter.send("msg")
assert adapter.last_usage is not None
assert adapter.last_usage.get("total_tokens") == 50
@patch("subprocess.Popen")
def test_send_captures_usage_metadata(self, mock_popen: MagicMock) -> None:
adapter = GeminiCliAdapter()
result_json = {"type": "result", "stats": {"total_tokens": 50}}
stdout_str = json.dumps(result_json) + "\n"
mock_process = MagicMock()
mock_process.communicate.return_value = (stdout_str, "")
mock_process.returncode = 0
mock_popen.return_value = mock_process
adapter.send("msg")
assert adapter.last_usage is not None
assert adapter.last_usage.get("total_tokens") == 50
@patch("subprocess.Popen")
def test_full_flow_integration(self, mock_popen: MagicMock) -> None:
adapter = GeminiCliAdapter()
msg_json = {"type": "message", "content": "Final response"}
result_json = {
"type": "result",
"stats": {"total_tokens": 25, "input_tokens": 10, "output_tokens": 15},
}
stdout_str = json.dumps(msg_json) + "\n" + json.dumps(result_json) + "\n"
mock_process = MagicMock()
mock_process.communicate.return_value = (stdout_str, "")
mock_process.returncode = 0
mock_popen.return_value = mock_process
result = adapter.send("test")
assert "Final response" in result["text"]
@patch("subprocess.Popen")
def test_full_flow_integration(self, mock_popen: MagicMock) -> None:
adapter = GeminiCliAdapter()
msg_json = {"type": "message", "content": "Final response"}
result_json = {
"type": "result",
"stats": {"total_tokens": 25, "input_tokens": 10, "output_tokens": 15},
}
stdout_str = json.dumps(msg_json) + "\n" + json.dumps(result_json) + "\n"
mock_process = MagicMock()
mock_process.communicate.return_value = (stdout_str, "")
mock_process.returncode = 0
mock_popen.return_value = mock_process
result = adapter.send("test")
assert "Final response" in result["text"]
+48 -48
View File
@@ -5,60 +5,60 @@ from src import gui_2
@pytest.fixture
def mock_gui():
gui = gui_2.App()
gui.project = {
'discussion': {
'active': 'main',
'discussions': {
'main': {'history': []},
'main_take_1': {'history': []},
'other_topic': {'history': []}
}
}
}
gui.active_discussion = 'main'
gui.perf_profiling_enabled = False
gui.is_viewing_prior_session = False
gui._get_discussion_names = lambda: ['main', 'main_take_1', 'other_topic']
return gui
gui = gui_2.App()
gui.project = {
'discussion': {
'active': 'main',
'discussions': {
'main': {'history': []},
'main_take_1': {'history': []},
'other_topic': {'history': []}
}
}
}
gui.active_discussion = 'main'
gui.perf_profiling_enabled = False
gui.is_viewing_prior_session = False
gui._get_discussion_names = lambda: ['main', 'main_take_1', 'other_topic']
return gui
def test_discussion_tabs_rendered(mock_gui):
with patch('src.gui_2.imgui') as mock_imgui, \
patch('src.gui_2.imscope') as mock_imscope, \
patch('src.imgui_scopes.imgui', new=mock_imgui), \
patch('src.app_controller.AppController.active_project_root', new_callable=PropertyMock, return_value='.'):
with patch('src.gui_2.imgui') as mock_imgui, \
patch('src.gui_2.imscope') as mock_imscope, \
patch('src.imgui_scopes.imgui', new=mock_imgui), \
patch('src.app_controller.AppController.active_project_root', new_callable=PropertyMock, return_value='.'):
# Setup imscope mocks
mock_imscope.window.return_value.__enter__.return_value = (True, True)
mock_imscope.child.return_value.__enter__.return_value = True
mock_imscope.table.return_value.__enter__.return_value = True
mock_imscope.tree_node_ex.return_value.__enter__.return_value = True
mock_imscope.tab_item.return_value.__enter__.return_value = (True, True)
mock_imscope.style_color.return_value.__enter__.return_value = None
mock_imscope.style_var.return_value.__enter__.return_value = None
# Setup imscope mocks
mock_imscope.window.return_value.__enter__.return_value = (True, True)
mock_imscope.child.return_value.__enter__.return_value = True
mock_imscope.table.return_value.__enter__.return_value = True
mock_imscope.tree_node_ex.return_value.__enter__.return_value = True
mock_imscope.tab_item.return_value.__enter__.return_value = (True, True)
mock_imscope.style_color.return_value.__enter__.return_value = None
mock_imscope.style_var.return_value.__enter__.return_value = None
# We expect a combo box for base discussion
mock_imgui.begin_combo.return_value = True
mock_imgui.selectable.return_value = (False, False)
# We expect a combo box for base discussion
mock_imgui.begin_combo.return_value = True
mock_imgui.selectable.return_value = (False, False)
# We expect a tab bar for takes
mock_imgui.begin_tab_bar.return_value = True
mock_imgui.begin_tab_item.return_value = (True, True)
mock_imgui.input_text.return_value = (False, "")
mock_imgui.input_text_multiline.return_value = (False, "")
mock_imgui.checkbox.return_value = (False, False)
mock_imgui.input_int.return_value = (False, 0)
# We expect a tab bar for takes
mock_imgui.begin_tab_bar.return_value = True
mock_imgui.begin_tab_item.return_value = (True, True)
mock_imgui.input_text.return_value = (False, "")
mock_imgui.input_text_multiline.return_value = (False, "")
mock_imgui.checkbox.return_value = (False, False)
mock_imgui.input_int.return_value = (False, 0)
mock_clipper = MagicMock()
mock_clipper.step.return_value = False
mock_imgui.ListClipper.return_value = mock_clipper
mock_clipper = MagicMock()
mock_clipper.step.return_value = False
mock_imgui.ListClipper.return_value = mock_clipper
mock_gui._render_discussion_panel()
mock_gui._render_discussion_panel()
mock_imgui.begin_combo.assert_called_once_with("##disc_sel", 'main')
mock_imgui.begin_tab_bar.assert_called_once_with('discussion_takes_tabs')
mock_imgui.begin_combo.assert_called_once_with("##disc_sel", 'main')
mock_imgui.begin_tab_bar.assert_called_once_with('discussion_takes_tabs')
calls = [c[0][0] for c in mock_imscope.tab_item.call_args_list]
assert 'Original###main' in calls
assert 'Take 1###main_take_1' in calls
assert 'Synthesis###Synthesis' in calls
calls = [c[0][0] for c in mock_imscope.tab_item.call_args_list]
assert 'Original###main' in calls
assert 'Take 1###main_take_1' in calls
assert 'Synthesis###Synthesis' in calls
+29 -29
View File
@@ -9,32 +9,32 @@ from src.api_hook_client import ApiHookClient
def test_comms_volume_stress_performance(live_gui) -> None:
time.sleep(5.0)
client = ApiHookClient()
time.sleep(2.0)
baseline_resp = client.get_performance()
baseline = baseline_resp.get("performance", {})
baseline_ft = baseline.get("last_frame_time_ms", 0.0)
large_session = []
for i in range(50):
large_session.append(
{
"role": "User",
"content": f"Stress test entry {i} " * 5,
"ts": time.time(),
"collapsed": False,
}
)
client.post_session(large_session)
time.sleep(1.0)
stress_resp = client.get_performance()
stress = stress_resp.get("performance", {})
stress_ft = stress.get("last_frame_time_ms", 0.0)
print(f"Baseline FT: {baseline_ft:.2f}ms, Stress FT: {stress_ft:.2f}ms")
if stress_ft > 0:
assert stress_ft < 100.0, (
f"Stress frame time {stress_ft:.2f}ms exceeds 10fps threshold"
)
session_data = client.get_session()
entries = session_data.get("session", {}).get("entries", [])
assert len(entries) >= 50, f"Expected at least 50 entries, got {len(entries)}"
time.sleep(5.0)
client = ApiHookClient()
time.sleep(2.0)
baseline_resp = client.get_performance()
baseline = baseline_resp.get("performance", {})
baseline_ft = baseline.get("last_frame_time_ms", 0.0)
large_session = []
for i in range(50):
large_session.append(
{
"role": "User",
"content": f"Stress test entry {i} " * 5,
"ts": time.time(),
"collapsed": False,
}
)
client.post_session(large_session)
time.sleep(1.0)
stress_resp = client.get_performance()
stress = stress_resp.get("performance", {})
stress_ft = stress.get("last_frame_time_ms", 0.0)
print(f"Baseline FT: {baseline_ft:.2f}ms, Stress FT: {stress_ft:.2f}ms")
if stress_ft > 0:
assert stress_ft < 100.0, (
f"Stress frame time {stress_ft:.2f}ms exceeds 10fps threshold"
)
session_data = client.get_session()
entries = session_data.get("session", {}).get("entries", [])
assert len(entries) >= 50, f"Expected at least 50 entries, got {len(entries)}"
+87 -87
View File
@@ -3,97 +3,97 @@ from src.history import HistoryManager, UISnapshot
class TestHistoryManager:
def test_push_and_undo(self):
hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "initial")
hm.push({"value": 2}, "change to 2")
assert hm.can_undo is True
result = hm.undo(current_state={"value": 2})
assert result is not None
assert result.state["value"] == 2
assert hm.can_undo is True
def test_push_and_undo(self):
hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "initial")
hm.push({"value": 2}, "change to 2")
assert hm.can_undo is True
result = hm.undo(current_state={"value": 2})
assert result is not None
assert result.state["value"] == 2
assert hm.can_undo is True
def test_undo_and_redo(self):
hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "initial")
hm.push({"value": 2}, "change to 2")
assert hm.can_redo is False
hm.undo(current_state={"value": 2})
assert hm.can_redo is True
redone = hm.redo(current_state={"value": 1})
assert redone is not None
assert redone.state["value"] == 2
def test_undo_and_redo(self):
hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "initial")
hm.push({"value": 2}, "change to 2")
assert hm.can_redo is False
hm.undo(current_state={"value": 2})
assert hm.can_redo is True
redone = hm.redo(current_state={"value": 1})
assert redone is not None
assert redone.state["value"] == 2
def test_undo_no_history_returns_none(self):
hm = HistoryManager(max_capacity=10)
assert hm.can_undo is False
result = hm.undo(current_state={"value": 1})
assert result is None
def test_undo_no_history_returns_none(self):
hm = HistoryManager(max_capacity=10)
assert hm.can_undo is False
result = hm.undo(current_state={"value": 1})
assert result is None
def test_redo_no_history_returns_none(self):
hm = HistoryManager(max_capacity=10)
assert hm.can_redo is False
result = hm.redo(current_state={"value": 1})
assert result is None
def test_redo_no_history_returns_none(self):
hm = HistoryManager(max_capacity=10)
assert hm.can_redo is False
result = hm.redo(current_state={"value": 1})
assert result is None
def test_jump_to_undo(self):
hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "initial")
hm.push({"value": 2}, "change to 2")
hm.push({"value": 3}, "change to 3")
result = hm.jump_to_undo(0, current_state={"value": 3})
assert result is not None
assert result.state["value"] == 1
assert hm.can_redo is True
def test_jump_to_undo(self):
hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "initial")
hm.push({"value": 2}, "change to 2")
hm.push({"value": 3}, "change to 3")
result = hm.jump_to_undo(0, current_state={"value": 3})
assert result is not None
assert result.state["value"] == 1
assert hm.can_redo is True
def test_get_history_returns_descriptions(self):
hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "first")
hm.push({"value": 2}, "second")
hm.push({"value": 3}, "third")
history = hm.get_history()
assert len(history) == 3
assert history[0]["description"] == "first"
assert history[1]["description"] == "second"
assert "timestamp" in history[0]
def test_get_history_returns_descriptions(self):
hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "first")
hm.push({"value": 2}, "second")
hm.push({"value": 3}, "third")
history = hm.get_history()
assert len(history) == 3
assert history[0]["description"] == "first"
assert history[1]["description"] == "second"
assert "timestamp" in history[0]
def test_snapshot_roundtrip(self):
snap = UISnapshot(
ai_input="test input",
project_system_prompt="project prompt",
global_system_prompt="global prompt",
base_system_prompt="base prompt",
use_default_base_prompt=True,
temperature=0.5,
top_p=0.9,
max_tokens=8192,
auto_add_history=True,
disc_entries=[{"role": "user", "content": "hello"}],
files=[{"path": "a.txt"}],
context_files=[],
screenshots=["screenshot.png"],
)
d = snap.to_dict()
restored = UISnapshot.from_dict(d)
assert restored.ai_input == snap.ai_input
assert restored.project_system_prompt == snap.project_system_prompt
assert restored.global_system_prompt == snap.global_system_prompt
assert restored.base_system_prompt == snap.base_system_prompt
assert restored.use_default_base_prompt == snap.use_default_base_prompt
assert restored.temperature == snap.temperature
assert restored.top_p == snap.top_p
assert restored.max_tokens == snap.max_tokens
assert restored.auto_add_history == snap.auto_add_history
assert restored.disc_entries == snap.disc_entries
assert restored.files == snap.files
assert restored.context_files == snap.context_files
assert restored.screenshots == snap.screenshots
def test_snapshot_roundtrip(self):
snap = UISnapshot(
ai_input="test input",
project_system_prompt="project prompt",
global_system_prompt="global prompt",
base_system_prompt="base prompt",
use_default_base_prompt=True,
temperature=0.5,
top_p=0.9,
max_tokens=8192,
auto_add_history=True,
disc_entries=[{"role": "user", "content": "hello"}],
files=[{"path": "a.txt"}],
context_files=[],
screenshots=["screenshot.png"],
)
d = snap.to_dict()
restored = UISnapshot.from_dict(d)
assert restored.ai_input == snap.ai_input
assert restored.project_system_prompt == snap.project_system_prompt
assert restored.global_system_prompt == snap.global_system_prompt
assert restored.base_system_prompt == snap.base_system_prompt
assert restored.use_default_base_prompt == snap.use_default_base_prompt
assert restored.temperature == snap.temperature
assert restored.top_p == snap.top_p
assert restored.max_tokens == snap.max_tokens
assert restored.auto_add_history == snap.auto_add_history
assert restored.disc_entries == snap.disc_entries
assert restored.files == snap.files
assert restored.context_files == snap.context_files
assert restored.screenshots == snap.screenshots
def test_push_clears_redo_stack(self):
hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "initial")
hm.push({"value": 2}, "change to 2")
hm.undo(current_state={"value": 2})
assert hm.can_redo is True
hm.push({"value": 3}, "change to 3")
assert hm.can_redo is False
def test_push_clears_redo_stack(self):
hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "initial")
hm.push({"value": 2}, "change to 2")
hm.undo(current_state={"value": 2})
assert hm.can_redo is True
hm.push({"value": 3}, "change to 3")
assert hm.can_redo is False
+76 -76
View File
@@ -6,95 +6,95 @@ import sys
import types
def test_hot_module_dataclass_fields():
hm = HotModule(
name="test_module",
file_path="/path/to/test_module.py",
state_keys=["attr1", "attr2"],
delegation_targets=["method_a", "method_b"],
)
assert hm.name == "test_module"
assert hm.file_path == "/path/to/test_module.py"
assert hm.state_keys == ["attr1", "attr2"]
assert hm.delegation_targets == ["method_a", "method_b"]
hm = HotModule(
name="test_module",
file_path="/path/to/test_module.py",
state_keys=["attr1", "attr2"],
delegation_targets=["method_a", "method_b"],
)
assert hm.name == "test_module"
assert hm.file_path == "/path/to/test_module.py"
assert hm.state_keys == ["attr1", "attr2"]
assert hm.delegation_targets == ["method_a", "method_b"]
def test_hot_reloader_register_and_get():
HotReloader.HOT_MODULES.clear()
hm = HotModule(name="test_mod", file_path="/fake/path.py", state_keys=[], delegation_targets=[])
HotReloader.register(hm)
assert "test_mod" in HotReloader.HOT_MODULES
assert HotReloader.HOT_MODULES["test_mod"] is hm
HotReloader.HOT_MODULES.clear()
hm = HotModule(name="test_mod", file_path="/fake/path.py", state_keys=[], delegation_targets=[])
HotReloader.register(hm)
assert "test_mod" in HotReloader.HOT_MODULES
assert HotReloader.HOT_MODULES["test_mod"] is hm
def test_hot_reloader_register_duplicate_raises():
HotReloader.HOT_MODULES.clear()
hm1 = HotModule(name="dup", file_path="/a.py", state_keys=[], delegation_targets=[])
HotReloader.register(hm1)
with pytest.raises(ValueError, match="already registered"):
HotReloader.register(hm1)
HotReloader.HOT_MODULES.clear()
hm1 = HotModule(name="dup", file_path="/a.py", state_keys=[], delegation_targets=[])
HotReloader.register(hm1)
with pytest.raises(ValueError, match="already registered"):
HotReloader.register(hm1)
def test_hot_reloader_is_error_state():
HotReloader.HOT_MODULES.clear()
HotReloader.last_error = None
HotReloader.is_error_state = False
assert HotReloader.is_error_state is False
HotReloader.HOT_MODULES.clear()
HotReloader.last_error = None
HotReloader.is_error_state = False
assert HotReloader.is_error_state is False
def test_reload_unknown_module_returns_false():
HotReloader.HOT_MODULES.clear()
HotReloader.register(HotModule(name="nonexistent_mod", file_path="/nonexistent.py", state_keys=[], delegation_targets=[]))
app = MagicMock()
result = HotReloader.reload("nonexistent_mod", app)
assert result is False
assert HotReloader.is_error_state is True
assert HotReloader.last_error is not None
HotReloader.HOT_MODULES.clear()
HotReloader.register(HotModule(name="nonexistent_mod", file_path="/nonexistent.py", state_keys=[], delegation_targets=[]))
app = MagicMock()
result = HotReloader.reload("nonexistent_mod", app)
assert result is False
assert HotReloader.is_error_state is True
assert HotReloader.last_error is not None
def test_reload_success_clears_error_state():
HotReloader.HOT_MODULES.clear()
test_mod = types.ModuleType("src._test_reload_mod_src")
sys.modules["src._test_reload_mod_src"] = test_mod
HotReloader.register(HotModule(name="src._test_reload_mod_src", file_path="/fake.py", state_keys=[], delegation_targets=[]))
app = MagicMock()
HotReloader.is_error_state = True
HotReloader.last_error = "previous error"
with patch("importlib.reload", return_value=test_mod):
result = HotReloader.reload("src._test_reload_mod_src", app)
assert result is True
assert HotReloader.is_error_state is False
assert HotReloader.last_error is None
del sys.modules["src._test_reload_mod_src"]
HotReloader.HOT_MODULES.clear()
test_mod = types.ModuleType("src._test_reload_mod_src")
sys.modules["src._test_reload_mod_src"] = test_mod
HotReloader.register(HotModule(name="src._test_reload_mod_src", file_path="/fake.py", state_keys=[], delegation_targets=[]))
app = MagicMock()
HotReloader.is_error_state = True
HotReloader.last_error = "previous error"
with patch("importlib.reload", return_value=test_mod):
result = HotReloader.reload("src._test_reload_mod_src", app)
assert result is True
assert HotReloader.is_error_state is False
assert HotReloader.last_error is None
del sys.modules["src._test_reload_mod_src"]
def test_reload_captures_and_restores_state_on_failure():
HotReloader.HOT_MODULES.clear()
HotReloader.register(HotModule(name="bad_mod", file_path="/bad.py", state_keys=["_test_attr"], delegation_targets=[]))
app = MagicMock()
app._test_attr = "preserved_value"
result = HotReloader.reload("bad_mod", app)
assert result is False
assert HotReloader.is_error_state is True
assert app._test_attr == "preserved_value"
HotReloader.HOT_MODULES.clear()
HotReloader.register(HotModule(name="bad_mod", file_path="/bad.py", state_keys=["_test_attr"], delegation_targets=[]))
app = MagicMock()
app._test_attr = "preserved_value"
result = HotReloader.reload("bad_mod", app)
assert result is False
assert HotReloader.is_error_state is True
assert app._test_attr == "preserved_value"
def test_reload_all_success():
HotReloader.HOT_MODULES.clear()
mod1 = types.ModuleType("hr_test_mod1")
mod2 = types.ModuleType("hr_test_mod2")
sys.modules["hr_test_mod1"] = mod1
sys.modules["hr_test_mod2"] = mod2
HotReloader.register(HotModule(name="hr_test_mod1", file_path="/fake1.py", state_keys=[], delegation_targets=[]))
HotReloader.register(HotModule(name="hr_test_mod2", file_path="/fake2.py", state_keys=[], delegation_targets=[]))
app = MagicMock()
with patch("importlib.reload", return_value=mod1):
result = HotReloader.reload_all(app)
assert result is True
assert HotReloader.is_error_state is False
del sys.modules["hr_test_mod1"]
del sys.modules["hr_test_mod2"]
HotReloader.HOT_MODULES.clear()
mod1 = types.ModuleType("hr_test_mod1")
mod2 = types.ModuleType("hr_test_mod2")
sys.modules["hr_test_mod1"] = mod1
sys.modules["hr_test_mod2"] = mod2
HotReloader.register(HotModule(name="hr_test_mod1", file_path="/fake1.py", state_keys=[], delegation_targets=[]))
HotReloader.register(HotModule(name="hr_test_mod2", file_path="/fake2.py", state_keys=[], delegation_targets=[]))
app = MagicMock()
with patch("importlib.reload", return_value=mod1):
result = HotReloader.reload_all(app)
assert result is True
assert HotReloader.is_error_state is False
del sys.modules["hr_test_mod1"]
del sys.modules["hr_test_mod2"]
def test_reload_all_partial_failure():
HotReloader.HOT_MODULES.clear()
mod1 = types.ModuleType("hr_test_mod1")
sys.modules["hr_test_mod1"] = mod1
HotReloader.register(HotModule(name="hr_test_mod1", file_path="/fake1.py", state_keys=[], delegation_targets=[]))
HotReloader.register(HotModule(name="hr_nonexistent", file_path="/nonexistent.py", state_keys=[], delegation_targets=[]))
app = MagicMock()
result = HotReloader.reload_all(app)
assert result is False
assert HotReloader.is_error_state is True
del sys.modules["hr_test_mod1"]
HotReloader.HOT_MODULES.clear()
mod1 = types.ModuleType("hr_test_mod1")
sys.modules["hr_test_mod1"] = mod1
HotReloader.register(HotModule(name="hr_test_mod1", file_path="/fake1.py", state_keys=[], delegation_targets=[]))
HotReloader.register(HotModule(name="hr_nonexistent", file_path="/nonexistent.py", state_keys=[], delegation_targets=[]))
app = MagicMock()
result = HotReloader.reload_all(app)
assert result is False
assert HotReloader.is_error_state is True
del sys.modules["hr_test_mod1"]
+61 -61
View File
@@ -6,70 +6,70 @@ from src.gui_2 import App
@pytest.fixture
def app_instance() -> Any:
with (
patch("src.models.load_config", return_value={"ai": {}, "projects": {}}),
patch("src.models.save_config"),
patch("src.gui_2.project_manager"),
patch("src.app_controller.project_manager") as mock_pm,
patch("src.gui_2.session_logger"),
patch("src.gui_2.immapp.run"),
patch("src.app_controller.AppController._load_active_project"),
patch("src.app_controller.AppController._fetch_models"),
patch.object(App, "_load_fonts"),
patch.object(App, "_post_init"),
patch("src.app_controller.AppController._prune_old_logs"),
patch("src.app_controller.AppController.start_services"),
patch("src.app_controller.AppController._init_ai_and_hooks"),
):
app = App()
app.project = {}
app.ui_files_base_dir = "."
yield app, mock_pm
with (
patch("src.models.load_config", return_value={"ai": {}, "projects": {}}),
patch("src.models.save_config"),
patch("src.gui_2.project_manager"),
patch("src.app_controller.project_manager") as mock_pm,
patch("src.gui_2.session_logger"),
patch("src.gui_2.immapp.run"),
patch("src.app_controller.AppController._load_active_project"),
patch("src.app_controller.AppController._fetch_models"),
patch.object(App, "_load_fonts"),
patch.object(App, "_post_init"),
patch("src.app_controller.AppController._prune_old_logs"),
patch("src.app_controller.AppController.start_services"),
patch("src.app_controller.AppController._init_ai_and_hooks"),
):
app = App()
app.project = {}
app.ui_files_base_dir = "."
yield app, mock_pm
def test_mma_dashboard_refresh(app_instance: Any) -> None:
app, mock_pm = app_instance
mock_tracks = [
{
"id": "track_1",
"title": "Track 1",
"status": "new",
"complete": 0,
"total": 0,
"progress": 0.0,
},
{
"id": "track_2",
"title": "Track 2",
"status": "new",
"complete": 0,
"total": 0,
"progress": 0.0,
},
]
mock_pm.get_all_tracks.return_value = mock_tracks
app._refresh_from_project()
assert hasattr(app, "tracks"), "App instance should have a 'tracks' attribute"
assert app.tracks == mock_tracks
assert len(app.tracks) == 2
assert app.tracks[0]["id"] == "track_1"
assert app.tracks[1]["id"] == "track_2"
mock_pm.get_all_tracks.assert_called_with(app.active_project_root)
app, mock_pm = app_instance
mock_tracks = [
{
"id": "track_1",
"title": "Track 1",
"status": "new",
"complete": 0,
"total": 0,
"progress": 0.0,
},
{
"id": "track_2",
"title": "Track 2",
"status": "new",
"complete": 0,
"total": 0,
"progress": 0.0,
},
]
mock_pm.get_all_tracks.return_value = mock_tracks
app._refresh_from_project()
assert hasattr(app, "tracks"), "App instance should have a 'tracks' attribute"
assert app.tracks == mock_tracks
assert len(app.tracks) == 2
assert app.tracks[0]["id"] == "track_1"
assert app.tracks[1]["id"] == "track_2"
mock_pm.get_all_tracks.assert_called_with(app.active_project_root)
def test_mma_dashboard_initialization_refresh(app_instance: Any) -> None:
app, mock_pm = app_instance
mock_tracks = [
{
"id": "init_track",
"title": "Initial Track",
"status": "new",
"complete": 0,
"total": 0,
"progress": 0.0,
}
]
mock_pm.get_all_tracks.return_value = mock_tracks
app._refresh_from_project()
assert app.tracks == mock_tracks
assert app.tracks[0]["id"] == "init_track"
app, mock_pm = app_instance
mock_tracks = [
{
"id": "init_track",
"title": "Initial Track",
"status": "new",
"complete": 0,
"total": 0,
"progress": 0.0,
}
]
mock_pm.get_all_tracks.return_value = mock_tracks
app._refresh_from_project()
assert app.tracks == mock_tracks
assert app.tracks[0]["id"] == "init_track"
+35 -35
View File
@@ -4,48 +4,48 @@ import sys
from imgui_bundle import imgui_node_editor as ed
def test_imgui_node_editor_import():
assert ed is not None
assert hasattr(ed, "begin_node")
assert hasattr(ed, "end_node")
assert ed is not None
assert hasattr(ed, "begin_node")
assert hasattr(ed, "end_node")
def test_app_has_node_editor_attrs():
from src.gui_2 import App
# Use patch to avoid initializing the entire App
with patch('src.app_controller.AppController'), \
patch('src.gui_2.immapp.RunnerParams'), \
patch('imgui_bundle.imgui_node_editor.create_editor'):
app = App.__new__(App)
# Manually set what we expect from __init__
app.node_editor_config = ed.Config()
app.node_editor_ctx = ed.create_editor(app.node_editor_config)
app.ui_selected_ticket_id = None
from src.gui_2 import App
# Use patch to avoid initializing the entire App
with patch('src.app_controller.AppController'), \
patch('src.gui_2.immapp.RunnerParams'), \
patch('imgui_bundle.imgui_node_editor.create_editor'):
app = App.__new__(App)
# Manually set what we expect from __init__
app.node_editor_config = ed.Config()
app.node_editor_ctx = ed.create_editor(app.node_editor_config)
app.ui_selected_ticket_id = None
assert hasattr(app, 'node_editor_config')
assert hasattr(app, 'node_editor_ctx')
assert hasattr(app, 'ui_selected_ticket_id')
assert hasattr(app, 'node_editor_config')
assert hasattr(app, 'node_editor_ctx')
assert hasattr(app, 'ui_selected_ticket_id')
def test_node_id_stability():
"""Verify that node/pin IDs generated via hash are stable for the same input."""
tid = "T-001"
int_id = abs(hash(tid))
in_pin_id = abs(hash(tid + "_in"))
out_pin_id = abs(hash(tid + "_out"))
"""Verify that node/pin IDs generated via hash are stable for the same input."""
tid = "T-001"
int_id = abs(hash(tid))
in_pin_id = abs(hash(tid + "_in"))
out_pin_id = abs(hash(tid + "_out"))
# Re-generate and compare
assert int_id == abs(hash(tid))
assert in_pin_id == abs(hash(tid + "_in"))
assert out_pin_id == abs(hash(tid + "_out"))
# Re-generate and compare
assert int_id == abs(hash(tid))
assert in_pin_id == abs(hash(tid + "_in"))
assert out_pin_id == abs(hash(tid + "_out"))
# Verify uniqueness
assert int_id != in_pin_id
assert int_id != out_pin_id
assert in_pin_id != out_pin_id
# Verify uniqueness
assert int_id != in_pin_id
assert int_id != out_pin_id
assert in_pin_id != out_pin_id
def test_link_id_stability():
"""Verify that link IDs are stable."""
source_tid = "T-001"
target_tid = "T-002"
link_id = abs(hash(source_tid + "_" + target_tid))
"""Verify that link IDs are stable."""
source_tid = "T-001"
target_tid = "T-002"
link_id = abs(hash(source_tid + "_" + target_tid))
assert link_id == abs(hash(source_tid + "_" + target_tid))
assert link_id != abs(hash(target_tid + "_" + source_tid))
assert link_id == abs(hash(source_tid + "_" + target_tid))
assert link_id != abs(hash(target_tid + "_" + source_tid))
+89 -89
View File
@@ -8,107 +8,107 @@ from src.gui_2 import App
def test_mma_ui_state_initialization(app_instance: App) -> None:
"""Verifies that the new MMA UI state variables are initialized correctly."""
assert hasattr(app_instance, "ui_epic_input")
assert hasattr(app_instance, "proposed_tracks")
assert hasattr(app_instance, "_show_track_proposal_modal")
assert hasattr(app_instance, "mma_streams")
assert app_instance.ui_epic_input == ""
assert app_instance.proposed_tracks == []
assert app_instance._show_track_proposal_modal is False
assert app_instance.mma_streams == {}
"""Verifies that the new MMA UI state variables are initialized correctly."""
assert hasattr(app_instance, "ui_epic_input")
assert hasattr(app_instance, "proposed_tracks")
assert hasattr(app_instance, "_show_track_proposal_modal")
assert hasattr(app_instance, "mma_streams")
assert app_instance.ui_epic_input == ""
assert app_instance.proposed_tracks == []
assert app_instance._show_track_proposal_modal is False
assert app_instance.mma_streams == {}
def test_process_pending_gui_tasks_show_track_proposal(app_instance: App) -> None:
"""Verifies that the 'show_track_proposal' action correctly updates the UI state."""
mock_tracks = [{"id": "track_1", "title": "Test Track"}]
task = {"action": "show_track_proposal", "payload": mock_tracks}
app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks()
assert app_instance.proposed_tracks == mock_tracks
assert app_instance._show_track_proposal_modal is True
"""Verifies that the 'show_track_proposal' action correctly updates the UI state."""
mock_tracks = [{"id": "track_1", "title": "Test Track"}]
task = {"action": "show_track_proposal", "payload": mock_tracks}
app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks()
assert app_instance.proposed_tracks == mock_tracks
assert app_instance._show_track_proposal_modal is True
def test_cb_plan_epic_launches_thread(app_instance: App) -> None:
"""Verifies that _cb_plan_epic launches a thread and eventually queues a task."""
app_instance.ui_epic_input = "Develop a new feature"
app_instance.active_project_path = "test_project.toml"
mock_tracks = [{"id": "track_1", "title": "Test Track"}]
with (
patch(
"src.orchestrator_pm.get_track_history_summary",
return_value="History summary",
) as mock_get_history,
patch(
"src.orchestrator_pm.generate_tracks", return_value=mock_tracks
) as mock_gen_tracks,
patch("src.aggregate.build_file_items", return_value=[]),
):
with (
patch("src.project_manager.load_project", return_value={}),
patch("src.project_manager.flat_config", return_value={}),
):
app_instance._cb_plan_epic()
max_wait = 5
start_time = time.time()
while (
len(app_instance._pending_gui_tasks) < 3
and time.time() - start_time < max_wait
):
time.sleep(0.1)
assert len(app_instance._pending_gui_tasks) >= 3
actions = [t["action"] for t in app_instance._pending_gui_tasks]
assert "handle_ai_response" in actions
assert "show_track_proposal" in actions
mock_get_history.assert_called_once()
mock_gen_tracks.assert_called_once()
"""Verifies that _cb_plan_epic launches a thread and eventually queues a task."""
app_instance.ui_epic_input = "Develop a new feature"
app_instance.active_project_path = "test_project.toml"
mock_tracks = [{"id": "track_1", "title": "Test Track"}]
with (
patch(
"src.orchestrator_pm.get_track_history_summary",
return_value="History summary",
) as mock_get_history,
patch(
"src.orchestrator_pm.generate_tracks", return_value=mock_tracks
) as mock_gen_tracks,
patch("src.aggregate.build_file_items", return_value=[]),
):
with (
patch("src.project_manager.load_project", return_value={}),
patch("src.project_manager.flat_config", return_value={}),
):
app_instance._cb_plan_epic()
max_wait = 5
start_time = time.time()
while (
len(app_instance._pending_gui_tasks) < 3
and time.time() - start_time < max_wait
):
time.sleep(0.1)
assert len(app_instance._pending_gui_tasks) >= 3
actions = [t["action"] for t in app_instance._pending_gui_tasks]
assert "handle_ai_response" in actions
assert "show_track_proposal" in actions
mock_get_history.assert_called_once()
mock_gen_tracks.assert_called_once()
def test_process_pending_gui_tasks_mma_spawn_approval(app_instance: App) -> None:
"""Verifies that the 'mma_spawn_approval' action correctly updates the UI state."""
task = {
"action": "mma_spawn_approval",
"ticket_id": "T1",
"role": "Tier 3 Worker",
"prompt": "Test Prompt",
"context_md": "Test Context",
"dialog_container": [None],
}
app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks()
assert app_instance._pending_mma_spawn == task
assert app_instance._mma_spawn_prompt == "Test Prompt"
assert app_instance._mma_spawn_context == "Test Context"
assert app_instance._mma_spawn_open is True
assert app_instance._mma_spawn_edit_mode is False
assert task["dialog_container"][0] is not None
"""Verifies that the 'mma_spawn_approval' action correctly updates the UI state."""
task = {
"action": "mma_spawn_approval",
"ticket_id": "T1",
"role": "Tier 3 Worker",
"prompt": "Test Prompt",
"context_md": "Test Context",
"dialog_container": [None],
}
app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks()
assert app_instance._pending_mma_spawn == task
assert app_instance._mma_spawn_prompt == "Test Prompt"
assert app_instance._mma_spawn_context == "Test Context"
assert app_instance._mma_spawn_open is True
assert app_instance._mma_spawn_edit_mode is False
assert task["dialog_container"][0] is not None
def test_handle_ai_response_with_stream_id(app_instance: App) -> None:
"""Verifies routing to mma_streams."""
task = {
"action": "handle_ai_response",
"payload": {
"text": "Tier 1 Strategy Content",
"stream_id": "Tier 1",
"status": "Thinking...",
},
}
app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks()
assert app_instance.mma_streams.get("Tier 1") == "Tier 1 Strategy Content"
assert app_instance.ai_status == "Thinking..."
assert app_instance.ai_response == ""
"""Verifies routing to mma_streams."""
task = {
"action": "handle_ai_response",
"payload": {
"text": "Tier 1 Strategy Content",
"stream_id": "Tier 1",
"status": "Thinking...",
},
}
app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks()
assert app_instance.mma_streams.get("Tier 1") == "Tier 1 Strategy Content"
assert app_instance.ai_status == "Thinking..."
assert app_instance.ai_response == ""
def test_handle_ai_response_fallback(app_instance: App) -> None:
"""Verifies fallback to ai_response when stream_id is missing."""
task = {
"action": "handle_ai_response",
"payload": {"text": "Regular AI Response", "status": "done"},
}
app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks()
assert app_instance.ai_response == "Regular AI Response"
assert app_instance.ai_status == "done"
assert len(app_instance.mma_streams) == 0
"""Verifies fallback to ai_response when stream_id is missing."""
task = {
"action": "handle_ai_response",
"payload": {"text": "Regular AI Response", "status": "done"},
}
app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks()
assert app_instance.ai_response == "Regular AI Response"
assert app_instance.ai_status == "done"
assert len(app_instance.mma_streams) == 0
+109 -109
View File
@@ -5,140 +5,140 @@ from src import mcp_client
@pytest.fixture
def temp_py_file(tmp_path):
p = tmp_path / "sample.py"
content = """class MyClass:
\"\"\"Docstring.\"\"\"
def method1(self):
print("m1")
p = tmp_path / "sample.py"
content = """class MyClass:
\"\"\"Docstring.\"\"\"
def method1(self):
print("m1")
def top_func():
\"\"\"Top doc.\"\"\"
print("top")
\"\"\"Top doc.\"\"\"
print("top")
"""
p.write_text(content, encoding="utf-8")
return str(p)
p.write_text(content, encoding="utf-8")
return str(p)
def test_find_definition_range():
source = """class A:
def m(self): pass
source = """class A:
def m(self): pass
def f(): pass
"""
assert py_struct_tools.find_definition_range(source, "A") == (1, 2)
assert py_struct_tools.find_definition_range(source, "A.m") == (2, 2)
assert py_struct_tools.find_definition_range(source, "f") == (3, 3)
assert py_struct_tools.find_definition_range(source, "nonexistent") is None
assert py_struct_tools.find_definition_range(source, "A") == (1, 2)
assert py_struct_tools.find_definition_range(source, "A.m") == (2, 2)
assert py_struct_tools.find_definition_range(source, "f") == (3, 3)
assert py_struct_tools.find_definition_range(source, "nonexistent") is None
def test_shift_indentation():
payload = "def f():\n print('hi')" # 2-space
shifted = py_struct_tools.shift_indentation(payload, 1)
assert shifted == " def f():\n print('hi')" # wait, shift_indentation strips min and prepends.
payload = "def f():\n print('hi')" # 2-space
shifted = py_struct_tools.shift_indentation(payload, 1)
assert shifted == " def f():\n print('hi')" # wait, shift_indentation strips min and prepends.
# Let's re-test shift_indentation logic
# Original:
# line 1: 'def f():' (0 indent)
# line 2: ' print('hi')' (2 indent)
# min_indent = 0
# Prepend 1 space:
# ' def f():'
# ' print('hi')'
# Let's re-test shift_indentation logic
# Original:
# line 1: 'def f():' (0 indent)
# line 2: ' print('hi')' (2 indent)
# min_indent = 0
# Prepend 1 space:
# ' def f():'
# ' print('hi')'
# If payload was:
# def f():
# print('hi')
# min_indent = 2
# target_depth = 1
# ' def f():'
# ' print('hi')'
# If payload was:
# def f():
# print('hi')
# min_indent = 2
# target_depth = 1
# ' def f():'
# ' print('hi')'
payload2 = " def f():\n print('hi')"
shifted2 = py_struct_tools.shift_indentation(payload2, 1)
assert shifted2 == " def f():\n print('hi')"
payload2 = " def f():\n print('hi')"
shifted2 = py_struct_tools.shift_indentation(payload2, 1)
assert shifted2 == " def f():\n print('hi')"
def test_py_remove_def(temp_py_file):
err = py_struct_tools.py_remove_def(temp_py_file, "MyClass.method1")
assert err == ""
with open(temp_py_file, 'r') as f:
content = f.read()
assert "def method1" not in content
assert "class MyClass" in content
err = py_struct_tools.py_remove_def(temp_py_file, "MyClass.method1")
assert err == ""
with open(temp_py_file, 'r') as f:
content = f.read()
assert "def method1" not in content
assert "class MyClass" in content
def test_py_add_def(temp_py_file):
new_code = "def method2(self):\n print('m2')"
err = py_struct_tools.py_add_def(temp_py_file, "MyClass", new_code, "after", "method1")
assert err == ""
with open(temp_py_file, 'r') as f:
content = f.read()
assert "def method2" in content
# Check 1-space indentation
assert " def method2(self):" in content
new_code = "def method2(self):\n print('m2')"
err = py_struct_tools.py_add_def(temp_py_file, "MyClass", new_code, "after", "method1")
assert err == ""
with open(temp_py_file, 'r') as f:
content = f.read()
assert "def method2" in content
# Check 1-space indentation
assert " def method2(self):" in content
def test_py_region_wrap(temp_py_file):
err = py_struct_tools.py_region_wrap(temp_py_file, 6, 8, "MyRegion")
assert err == ""
with open(temp_py_file, 'r') as f:
content = f.read()
assert "#region: MyRegion" in content
assert "#endregion: MyRegion" in content
err = py_struct_tools.py_region_wrap(temp_py_file, 6, 8, "MyRegion")
assert err == ""
with open(temp_py_file, 'r') as f:
content = f.read()
assert "#region: MyRegion" in content
assert "#endregion: MyRegion" in content
def test_mcp_dispatch_integration(temp_py_file):
# Mock allowlist
mcp_client.configure([{"path": temp_py_file}])
# Mock allowlist
mcp_client.configure([{"path": temp_py_file}])
# Test py_remove_def
result = mcp_client.dispatch("py_remove_def", {"path": temp_py_file, "name": "top_func"})
assert result == ""
with open(temp_py_file, 'r') as f:
content = f.read()
assert "def top_func" not in content
# Test py_remove_def
result = mcp_client.dispatch("py_remove_def", {"path": temp_py_file, "name": "top_func"})
assert result == ""
with open(temp_py_file, 'r') as f:
content = f.read()
assert "def top_func" not in content
# Test py_add_def (module level top)
result = mcp_client.dispatch("py_add_def", {
"path": temp_py_file,
"name": "",
"new_content": "def head_func():\n print('head')",
"anchor_type": "top"
})
assert result == ""
with open(temp_py_file, 'r') as f:
content = f.read()
assert content.startswith("def head_func")
# Test py_add_def (module level top)
result = mcp_client.dispatch("py_add_def", {
"path": temp_py_file,
"name": "",
"new_content": "def head_func():\n print('head')",
"anchor_type": "top"
})
assert result == ""
with open(temp_py_file, 'r') as f:
content = f.read()
assert content.startswith("def head_func")
# Test py_add_def (class bottom)
result = mcp_client.dispatch("py_add_def", {
"path": temp_py_file,
"name": "MyClass",
"new_content": "def tail_method(self):\n print('tail')",
"anchor_type": "bottom"
})
assert result == ""
with open(temp_py_file, 'r') as f:
content = f.read()
assert "def tail_method" in content
assert " def tail_method(self):" in content # Check indent
# Test py_add_def (class bottom)
result = mcp_client.dispatch("py_add_def", {
"path": temp_py_file,
"name": "MyClass",
"new_content": "def tail_method(self):\n print('tail')",
"anchor_type": "bottom"
})
assert result == ""
with open(temp_py_file, 'r') as f:
content = f.read()
assert "def tail_method" in content
assert " def tail_method(self):" in content # Check indent
# Test py_move_def (cross-file simulated with same file)
# We move method1 to after tail_method
result = mcp_client.dispatch("py_move_def", {
"src_path": temp_py_file,
"dest_path": temp_py_file,
"name": "MyClass.method1",
"dest_name": "MyClass",
"anchor_type": "after",
"anchor_symbol": "tail_method"
})
assert result == ""
with open(temp_py_file, 'r') as f:
content = f.read()
# method1 should now be AFTER tail_method
assert content.find("def method1") > content.find("def tail_method")
# Test py_move_def (cross-file simulated with same file)
# We move method1 to after tail_method
result = mcp_client.dispatch("py_move_def", {
"src_path": temp_py_file,
"dest_path": temp_py_file,
"name": "MyClass.method1",
"dest_name": "MyClass",
"anchor_type": "after",
"anchor_symbol": "tail_method"
})
assert result == ""
with open(temp_py_file, 'r') as f:
content = f.read()
# method1 should now be AFTER tail_method
assert content.find("def method1") > content.find("def tail_method")
def test_mcp_dispatch_errors(temp_py_file):
mcp_client.configure([{"path": temp_py_file}])
mcp_client.configure([{"path": temp_py_file}])
# Non-existent symbol
result = mcp_client.dispatch("py_remove_def", {"path": temp_py_file, "name": "NoSuchSymbol"})
assert "ERROR" in result or "not found" in result
# Non-existent symbol
result = mcp_client.dispatch("py_remove_def", {"path": temp_py_file, "name": "NoSuchSymbol"})
assert "ERROR" in result or "not found" in result
# Denied path
result = mcp_client.dispatch("py_remove_def", {"path": "C:/windows/system32/cmd.exe", "name": "foo"})
assert "ACCESS DENIED" in result
# Denied path
result = mcp_client.dispatch("py_remove_def", {"path": "C:/windows/system32/cmd.exe", "name": "foo"})
assert "ACCESS DENIED" in result
+66 -66
View File
@@ -7,88 +7,88 @@ from src.models import ThinkingSegment
def test_save_and_load_history_with_thinking_segments():
with tempfile.TemporaryDirectory() as tmpdir:
project_path = Path(tmpdir) / "test_project"
project_path.mkdir()
with tempfile.TemporaryDirectory() as tmpdir:
project_path = Path(tmpdir) / "test_project"
project_path.mkdir()
project_file = project_path / "test_project.toml"
project_file.write_text("[project]\nname = 'test'\n")
project_file = project_path / "test_project.toml"
project_file.write_text("[project]\nname = 'test'\n")
history_data = {
"entries": [
{
"role": "AI",
"content": "Here's the response",
"thinking_segments": [
{"content": "Let me think about this...", "marker": "thinking"}
],
"ts": "2026-03-13T10:00:00",
"collapsed": False,
},
{
"role": "User",
"content": "Hello",
"ts": "2026-03-13T09:00:00",
"collapsed": False,
},
]
}
history_data = {
"entries": [
{
"role": "AI",
"content": "Here's the response",
"thinking_segments": [
{"content": "Let me think about this...", "marker": "thinking"}
],
"ts": "2026-03-13T10:00:00",
"collapsed": False,
},
{
"role": "User",
"content": "Hello",
"ts": "2026-03-13T09:00:00",
"collapsed": False,
},
]
}
project_manager.save_project(
{"project": {"name": "test"}}, project_file, disc_data=history_data
)
project_manager.save_project(
{"project": {"name": "test"}}, project_file, disc_data=history_data
)
loaded = project_manager.load_history(project_file)
loaded = project_manager.load_history(project_file)
assert "entries" in loaded
assert len(loaded["entries"]) == 2
assert "entries" in loaded
assert len(loaded["entries"]) == 2
ai_entry = loaded["entries"][0]
assert ai_entry["role"] == "AI"
assert ai_entry["content"] == "Here's the response"
assert "thinking_segments" in ai_entry
assert len(ai_entry["thinking_segments"]) == 1
assert (
ai_entry["thinking_segments"][0]["content"] == "Let me think about this..."
)
ai_entry = loaded["entries"][0]
assert ai_entry["role"] == "AI"
assert ai_entry["content"] == "Here's the response"
assert "thinking_segments" in ai_entry
assert len(ai_entry["thinking_segments"]) == 1
assert (
ai_entry["thinking_segments"][0]["content"] == "Let me think about this..."
)
user_entry = loaded["entries"][1]
assert user_entry["role"] == "User"
assert "thinking_segments" not in user_entry
user_entry = loaded["entries"][1]
assert user_entry["role"] == "User"
assert "thinking_segments" not in user_entry
def test_entry_to_str_with_thinking():
entry = {
"role": "AI",
"content": "Response text",
"thinking_segments": [{"content": "Thinking...", "marker": "thinking"}],
"ts": "2026-03-13T10:00:00",
}
result = project_manager.entry_to_str(entry)
assert "@2026-03-13T10:00:00" in result
assert "AI:" in result
assert "Response text" in result
entry = {
"role": "AI",
"content": "Response text",
"thinking_segments": [{"content": "Thinking...", "marker": "thinking"}],
"ts": "2026-03-13T10:00:00",
}
result = project_manager.entry_to_str(entry)
assert "@2026-03-13T10:00:00" in result
assert "AI:" in result
assert "Response text" in result
def test_str_to_entry_with_thinking():
raw = "@2026-03-13T10:00:00\nAI:\nResponse text"
roles = ["User", "AI", "Vendor API", "System", "Reasoning"]
result = project_manager.str_to_entry(raw, roles)
assert result["role"] == "AI"
assert result["content"] == "Response text"
assert "ts" in result
raw = "@2026-03-13T10:00:00\nAI:\nResponse text"
roles = ["User", "AI", "Vendor API", "System", "Reasoning"]
result = project_manager.str_to_entry(raw, roles)
assert result["role"] == "AI"
assert result["content"] == "Response text"
assert "ts" in result
def test_clean_nones_removes_thinking():
entry = {"role": "AI", "content": "Test", "thinking_segments": None, "ts": None}
cleaned = project_manager.clean_nones(entry)
assert "thinking_segments" not in cleaned
assert "ts" not in cleaned
entry = {"role": "AI", "content": "Test", "thinking_segments": None, "ts": None}
cleaned = project_manager.clean_nones(entry)
assert "thinking_segments" not in cleaned
assert "ts" not in cleaned
if __name__ == "__main__":
test_save_and_load_history_with_thinking_segments()
test_entry_to_str_with_thinking()
test_str_to_entry_with_thinking()
test_clean_nones_removes_thinking()
print("All project_manager thinking tests passed!")
test_save_and_load_history_with_thinking_segments()
test_entry_to_str_with_thinking()
test_str_to_entry_with_thinking()
test_clean_nones_removes_thinking()
print("All project_manager thinking tests passed!")
+102 -102
View File
@@ -3,134 +3,134 @@ from src.shell_runner import run_powershell
from src import ai_client
def test_run_powershell_qa_callback_on_failure(vlogger) -> None:
"""Test that qa_callback is called when a powershell command fails (non-zero exit code)."""
qa_callback = MagicMock(return_value="FIX: Check path")
"""Test that qa_callback is called when a powershell command fails (non-zero exit code)."""
qa_callback = MagicMock(return_value="FIX: Check path")
vlogger.log_state("QA Callback Called", False, "pending")
# Simulate a failure
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = ("stdout", "stderr error")
mock_process.returncode = 1
mock_popen.return_value = mock_process
vlogger.log_state("QA Callback Called", False, "pending")
# Simulate a failure
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = ("stdout", "stderr error")
mock_process.returncode = 1
mock_popen.return_value = mock_process
result = run_powershell("invalid_cmd", ".", qa_callback=qa_callback)
result = run_powershell("invalid_cmd", ".", qa_callback=qa_callback)
vlogger.log_state("QA Callback Called", "pending", str(qa_callback.called))
assert qa_callback.called
assert "QA ANALYSIS:\nFIX: Check path" in result
vlogger.finalize("Tier 4 Interceptor", "PASS", "Interceptor triggered and result appended.")
vlogger.log_state("QA Callback Called", "pending", str(qa_callback.called))
assert qa_callback.called
assert "QA ANALYSIS:\nFIX: Check path" in result
vlogger.finalize("Tier 4 Interceptor", "PASS", "Interceptor triggered and result appended.")
def test_run_powershell_qa_callback_on_stderr_only(vlogger) -> None:
"""Test that qa_callback is called when a powershell command has stderr output, even if exit code is 0."""
qa_callback = MagicMock(return_value="WARNING: Check permissions")
"""Test that qa_callback is called when a powershell command has stderr output, even if exit code is 0."""
qa_callback = MagicMock(return_value="WARNING: Check permissions")
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = ("stdout", "non-fatal warning")
mock_process.returncode = 0
mock_popen.return_value = mock_process
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = ("stdout", "non-fatal warning")
mock_process.returncode = 0
mock_popen.return_value = mock_process
result = run_powershell("cmd_with_warning", ".", qa_callback=qa_callback)
result = run_powershell("cmd_with_warning", ".", qa_callback=qa_callback)
assert qa_callback.called
assert "QA ANALYSIS:\nWARNING: Check permissions" in result
vlogger.finalize("Tier 4 Non-Fatal Interceptor", "PASS", "Interceptor triggered for non-fatal stderr.")
assert qa_callback.called
assert "QA ANALYSIS:\nWARNING: Check permissions" in result
vlogger.finalize("Tier 4 Non-Fatal Interceptor", "PASS", "Interceptor triggered for non-fatal stderr.")
def test_run_powershell_no_qa_callback_on_success() -> None:
qa_callback = MagicMock()
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = ("ok", "")
mock_process.returncode = 0
mock_popen.return_value = mock_process
qa_callback = MagicMock()
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = ("ok", "")
mock_process.returncode = 0
mock_popen.return_value = mock_process
result = run_powershell("success_cmd", ".", qa_callback=qa_callback)
assert not qa_callback.called
assert "QA ANALYSIS" not in result
result = run_powershell("success_cmd", ".", qa_callback=qa_callback)
assert not qa_callback.called
assert "QA ANALYSIS" not in result
def test_run_powershell_optional_qa_callback() -> None:
# Should not crash if qa_callback is None
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = ("error", "error")
mock_process.returncode = 1
mock_popen.return_value = mock_process
# Should not crash if qa_callback is None
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = ("error", "error")
mock_process.returncode = 1
mock_popen.return_value = mock_process
result = run_powershell("fail_no_cb", ".", qa_callback=None)
assert "EXIT CODE: 1" in result
result = run_powershell("fail_no_cb", ".", qa_callback=None)
assert "EXIT CODE: 1" in result
def test_end_to_end_tier4_integration(vlogger) -> None:
"""
"""
1. Start a task that triggers a tool failure.
2. Ensure Tier 4 QA analysis is run.
3. Verify the analysis is merged into the next turn's prompt.
"""
# Trigger a send that results in a tool failure
# (In reality, the tool loop handles this)
# For unit testing, we just check if ai_client.send passes the qa_callback
# to the underlying provider function.
pass
vlogger.finalize("E2E Tier 4 Integration", "PASS", "ai_client.run_tier4_analysis correctly called and results merged.")
1. Start a task that triggers a tool failure.
2. Ensure Tier 4 QA analysis is run.
3. Verify the analysis is merged into the next turn's prompt.
"""
# Trigger a send that results in a tool failure
# (In reality, the tool loop handles this)
# For unit testing, we just check if ai_client.send passes the qa_callback
# to the underlying provider function.
pass
vlogger.finalize("E2E Tier 4 Integration", "PASS", "ai_client.run_tier4_analysis correctly called and results merged.")
def test_ai_client_passes_qa_callback() -> None:
"""Verifies that ai_client.send passes the qa_callback down to the provider function."""
qa_callback = lambda x: "analysis"
"""Verifies that ai_client.send passes the qa_callback down to the provider function."""
qa_callback = lambda x: "analysis"
with patch("src.ai_client._send_gemini") as mock_send:
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
ai_client.send("ctx", "msg", qa_callback=qa_callback)
args, kwargs = mock_send.call_args
# It might be passed as positional or keyword depending on how 'send' calls it
# send() calls _send_gemini(md_content, user_message, base_dir, ..., qa_callback, ...)
# In current impl of send(), it is the 7th argument after md_content, user_msg, base_dir, file_items, disc_hist, pre_tool
assert args[6] == qa_callback or kwargs.get("qa_callback") == qa_callback
with patch("src.ai_client._send_gemini") as mock_send:
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
ai_client.send("ctx", "msg", qa_callback=qa_callback)
args, kwargs = mock_send.call_args
# It might be passed as positional or keyword depending on how 'send' calls it
# send() calls _send_gemini(md_content, user_message, base_dir, ..., qa_callback, ...)
# In current impl of send(), it is the 7th argument after md_content, user_msg, base_dir, file_items, disc_hist, pre_tool
assert args[6] == qa_callback or kwargs.get("qa_callback") == qa_callback
def test_gemini_provider_passes_qa_callback_to_run_script() -> None:
"""Verifies that _send_gemini passes the qa_callback to _run_script."""
qa_callback = MagicMock()
"""Verifies that _send_gemini passes the qa_callback to _run_script."""
qa_callback = MagicMock()
# Mock the tool loop behavior
with patch("src.ai_client._run_script", return_value="output") as mock_run_script, \
patch("src.ai_client._ensure_gemini_client"), \
patch("src.ai_client._gemini_client") as mock_gen_client:
# Mock the tool loop behavior
with patch("src.ai_client._run_script", return_value="output") as mock_run_script, \
patch("src.ai_client._ensure_gemini_client"), \
patch("src.ai_client._gemini_client") as mock_gen_client:
mock_chat = MagicMock()
mock_gen_client.chats.create.return_value = mock_chat
mock_chat = MagicMock()
mock_gen_client.chats.create.return_value = mock_chat
# 1st round: tool call
mock_fc = MagicMock()
mock_fc.name = "run_powershell"
mock_fc.args = {"script": "dir"}
mock_part = MagicMock()
mock_part.function_call = mock_fc
mock_part.text = ""
mock_candidate = MagicMock()
mock_candidate.content.parts = [mock_part]
mock_candidate.finish_reason.name = "STOP"
# 1st round: tool call
mock_fc = MagicMock()
mock_fc.name = "run_powershell"
mock_fc.args = {"script": "dir"}
mock_part = MagicMock()
mock_part.function_call = mock_fc
mock_part.text = ""
mock_candidate = MagicMock()
mock_candidate.content.parts = [mock_part]
mock_candidate.finish_reason.name = "STOP"
mock_resp1 = MagicMock()
mock_resp1.candidates = [mock_candidate]
mock_resp1.usage_metadata.prompt_token_count = 10
mock_resp1.usage_metadata.candidates_token_count = 5
mock_resp1.text = ""
mock_resp1 = MagicMock()
mock_resp1.candidates = [mock_candidate]
mock_resp1.usage_metadata.prompt_token_count = 10
mock_resp1.usage_metadata.candidates_token_count = 5
mock_resp1.text = ""
# 2nd round: final text
mock_resp2 = MagicMock()
mock_resp2.candidates = []
mock_resp2.usage_metadata.prompt_token_count = 20
mock_resp2.usage_metadata.candidates_token_count = 10
mock_resp2.text = "done"
# 2nd round: final text
mock_resp2 = MagicMock()
mock_resp2.candidates = []
mock_resp2.usage_metadata.prompt_token_count = 20
mock_resp2.usage_metadata.candidates_token_count = 10
mock_resp2.text = "done"
mock_chat.send_message.side_effect = [mock_resp1, mock_resp2]
mock_chat.send_message.side_effect = [mock_resp1, mock_resp2]
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
ai_client._send_gemini(
md_content="Context",
user_message="Run dir",
base_dir=".",
qa_callback=qa_callback
)
# Verify _run_script received the qa_callback and patch_callback
mock_run_script.assert_called_with("dir", ".", qa_callback, None)
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
ai_client._send_gemini(
md_content="Context",
user_message="Run dir",
base_dir=".",
qa_callback=qa_callback
)
# Verify _run_script received the qa_callback and patch_callback
mock_run_script.assert_called_with("dir", ".", qa_callback, None)
+37 -37
View File
@@ -7,56 +7,56 @@ from pathlib import Path
from unittest.mock import MagicMock, patch
def test_persona_aggregation_strategy():
p = Persona(name="test_persona", aggregation_strategy="summarize")
assert p.aggregation_strategy == "summarize"
p = Persona(name="test_persona", aggregation_strategy="summarize")
assert p.aggregation_strategy == "summarize"
d = p.to_dict()
assert d.get("aggregation_strategy") == "summarize"
d = p.to_dict()
assert d.get("aggregation_strategy") == "summarize"
p2 = Persona.from_dict("test2", d)
assert p2.aggregation_strategy == "summarize"
p2 = Persona.from_dict("test2", d)
assert p2.aggregation_strategy == "summarize"
@patch("src.aggregate.build_markdown_from_items")
def test_app_controller_do_generate_uses_persona_strategy(mock_build):
mock_build.return_value = "fake_md"
app = app_controller.AppController()
app.personas = {}
p = Persona(name="p1", aggregation_strategy="full")
app.personas["p1"] = p
app.ui_active_persona = "p1"
mock_build.return_value = "fake_md"
app = app_controller.AppController()
app.personas = {}
p = Persona(name="p1", aggregation_strategy="full")
app.personas["p1"] = p
app.ui_active_persona = "p1"
with patch("src.project_manager.flat_config", return_value={"project": {"name": "test"}, "output": {"output_dir": "."}, "files": {"paths": [], "base_dir": "."}}):
with patch("src.aggregate.build_file_items", return_value=[]):
with patch("pathlib.Path.mkdir"):
with patch("pathlib.Path.write_text"):
with patch.object(app, "_flush_to_project"):
with patch.object(app, "_flush_to_config"):
with patch("src.models.save_config"):
full_md, path, file_items, stable_md, disc = app._do_generate()
with patch("src.project_manager.flat_config", return_value={"project": {"name": "test"}, "output": {"output_dir": "."}, "files": {"paths": [], "base_dir": "."}}):
with patch("src.aggregate.build_file_items", return_value=[]):
with patch("pathlib.Path.mkdir"):
with patch("pathlib.Path.write_text"):
with patch.object(app, "_flush_to_project"):
with patch.object(app, "_flush_to_config"):
with patch("src.models.save_config"):
full_md, path, file_items, stable_md, disc = app._do_generate()
# Verify aggregate.run and build_markdown_no_history received aggregation_strategy="full"
# Actually mock_build captures the inner calls of build_markdown_no_history
call_kwargs = mock_build.call_args[1]
assert call_kwargs.get("aggregation_strategy") == "full"
# Verify aggregate.run and build_markdown_no_history received aggregation_strategy="full"
# Actually mock_build captures the inner calls of build_markdown_no_history
call_kwargs = mock_build.call_args[1]
assert call_kwargs.get("aggregation_strategy") == "full"
@patch("src.summarize.summarise_file")
@patch("src.multi_agent_conductor.ai_client.send")
def test_run_worker_lifecycle_uses_strategy(mock_send, mock_summarise, tmp_path):
mock_send.return_value = "fake response"
mock_summarise.return_value = "fake summary"
mock_send.return_value = "fake response"
mock_summarise.return_value = "fake summary"
test_file = tmp_path / "test.py"
test_file.write_text("def test():\n pass")
test_file = tmp_path / "test.py"
test_file.write_text("def test():\n pass")
ticket = Ticket(id="1", description="test")
context = WorkerContext(ticket_id="1", model_name="test", persona_id="test_persona")
ticket = Ticket(id="1", description="test")
context = WorkerContext(ticket_id="1", model_name="test", persona_id="test_persona")
p = Persona(name="test_persona", aggregation_strategy="summarize")
p = Persona(name="test_persona", aggregation_strategy="summarize")
with patch("src.personas.PersonaManager.load_all", return_value={"test_persona": p}):
multi_agent_conductor.run_worker_lifecycle(
ticket, context, context_files=[str(test_file)]
)
with patch("src.personas.PersonaManager.load_all", return_value={"test_persona": p}):
multi_agent_conductor.run_worker_lifecycle(
ticket, context, context_files=[str(test_file)]
)
# Should have called summarise_file because of the "summarize" strategy
mock_summarise.assert_called_once()
# Should have called summarise_file because of the "summarize" strategy
mock_summarise.assert_called_once()
+30 -30
View File
@@ -11,40 +11,40 @@ from src import api_hook_client
@pytest.mark.integration
def test_mma_epic_lifecycle(live_gui) -> None:
client = api_hook_client.ApiHookClient()
assert client.wait_for_server(timeout=15)
client = api_hook_client.ApiHookClient()
assert client.wait_for_server(timeout=15)
# Reset
client.click("btn_reset")
time.sleep(2)
# Reset
client.click("btn_reset")
time.sleep(2)
# Set provider and path
client.set_value("current_provider", "gemini_cli")
time.sleep(2)
mock_path = os.path.abspath("tests/mock_gemini_cli.py")
client.set_value("gcli_path", f'"{sys.executable}" "{mock_path}"')
time.sleep(2)
# Set provider and path
client.set_value("current_provider", "gemini_cli")
time.sleep(2)
mock_path = os.path.abspath("tests/mock_gemini_cli.py")
client.set_value("gcli_path", f'"{sys.executable}" "{mock_path}"')
time.sleep(2)
# Set epic and click
client.set_value("mma_epic_input", "Add timestamps")
time.sleep(1)
client.click("btn_mma_plan_epic")
# Set epic and click
client.set_value("mma_epic_input", "Add timestamps")
time.sleep(1)
client.click("btn_mma_plan_epic")
# Wait and check
for i in range(30):
time.sleep(1)
status = client.get_mma_status()
proposed = status.get("proposed_tracks", [])
usage = status.get("mma_tier_usage", {})
t1 = usage.get("Tier 1", {})
# Wait and check
for i in range(30):
time.sleep(1)
status = client.get_mma_status()
proposed = status.get("proposed_tracks", [])
usage = status.get("mma_tier_usage", {})
t1 = usage.get("Tier 1", {})
print(
f"[{i}] Tier1: in={t1.get('input')}, out={t1.get('output')}, proposed={len(proposed)}",
flush=True,
)
print(
f"[{i}] Tier1: in={t1.get('input')}, out={t1.get('output')}, proposed={len(proposed)}",
flush=True,
)
if proposed:
print(f"SUCCESS: {proposed}", flush=True)
break
if proposed:
print(f"SUCCESS: {proposed}", flush=True)
break
assert len(proposed) > 0, f"No tracks: {proposed}"
assert len(proposed) > 0, f"No tracks: {proposed}"