Private
Public Access
0
0

feat(style): Fix 1-space indentation in 27 files

Files corrected:
- src/fuzzy_anchor.py (18 violations)
- src/patch_modal.py (14 violations)
- scripts/extract_symbols.py (4 violations)
- scripts/tasks/download_fonts.py (8 violations)
- tests/: 23 files with indentation issues

All files verified with py_compile. Remaining 4 files
(test_api_events.py, test_discussion_takes_gui.py,
test_gui_updates.py, test_headless_service.py) have complex
multi-line with statements that require manual correction.
This commit is contained in:
2026-05-16 03:00:20 -04:00
parent 9d40fec46e
commit 31a8949d64
29 changed files with 1580 additions and 1611 deletions
@@ -3,49 +3,45 @@
## Phase 1: Audit and Classification ## Phase 1: Audit and Classification
Focus: Identify all files requiring indentation correction Focus: Identify all files requiring indentation correction
- [ ] Task 1.1: Create indentation audit script to scan all Python files - [x] Task 1.1: Create AST-based indentation audit script
- File: `scripts/audit_indentation.py` - File: `scripts/audit_indentation.py`
- Detect: Lines with leading whitespace that is not a multiple of 1 space - Method: Use Python AST to track logical nesting depth
- Output: List of files with line numbers and current indentation level - Output: Files with actual violations (not docstring false positives)
- [ ] Task 1.2: Run audit against src/ directory (51 files) - [x] Task 1.2: Run audit across all directories
- File: src/ - Result: 32 files with violations, 189 total violations
- Categorize: Files with 0 issues, 1-10 issues, 10+ issues
- [ ] Task 1.3: Run audit against tests/ directory
- File: tests/
- [ ] Task 1.4: Run audit against scripts/ directory
- File: scripts/
- [ ] Task 1.5: Run audit against conductor/ directory
- File: conductor/
## Phase 2: Correct Indentation - src/ Files ## Phase 2: Correct Indentation - src/ Files
Focus: Fix identified files in src/ (in order of severity) Focus: Fix identified files in src/ (2 files)
- [ ] Task 2.1: Fix src/ files with 1-10 indentation issues - [ ] Task 2.1: Fix src/fuzzy_anchor.py (18 violations)
- [ ] Task 2.2: Fix src/ files with 10+ indentation issues - [ ] Task 2.2: Fix src/patch_modal.py (14 violations)
- [ ] Task 2.3: Verify syntax after each file fix - [ ] Task 2.3: Verify syntax after each fix
- [ ] Task 2.4: Commit each file individually - [ ] Task 2.4: Commit each file individually
## Phase 3: Correct Indentation - tests/ Files ## Phase 3: Correct Indentation - scripts/ Files
Focus: Fix identified files in tests/ Focus: Fix identified files in scripts/ (2 files)
- [ ] Task 3.1: Fix tests/ files with indentation issues - [ ] Task 3.1: Fix scripts/extract_symbols.py (4 violations)
- [ ] Task 3.2: Verify syntax after each file fix - [ ] Task 3.2: Fix scripts/tasks/download_fonts.py (8 violations)
- [ ] Task 3.3: Commit each file individually - [ ] Task 3.3: Verify syntax after each fix
- [ ] Task 3.4: Commit each file individually
## Phase 4: Correct Indentation - scripts/ Files ## Phase 4: Correct Indentation - tests/ Files
Focus: Fix identified files in scripts/ Focus: Fix identified files in tests/ (28 files)
- [ ] Task 4.1: Fix scripts/ files with indentation issues - [ ] Task 4.1: Fix tests/test_arch_boundary_phase1.py (9 violations)
- [ ] Task 4.2: Verify syntax after each file fix - [ ] Task 4.2: Fix tests/test_arch_boundary_phase2.py (16 violations)
- [ ] Task 4.3: Commit each file individually - [ ] Task 4.3: Fix tests/test_arch_boundary_phase3.py (7 violations)
- [ ] Task 4.4: Fix tests/test_external_editor.py (18 violations)
- [ ] Task 4.5: Fix tests/test_headless_service.py (19 violations)
- [ ] Task 4.6: Fix remaining tests/ files (22 files with fewer violations)
- [ ] Task 4.7: Verify syntax after each fix
- [ ] Task 4.8: Commit each file individually
## Phase 5: Correct Indentation - conductor/ Files ## Phase 5: Final Verification
Focus: Fix identified files in conductor/ Focus: Ensure no regressions
- [ ] Task 5.1: Fix conductor/ Python files with indentation issues - [ ] Task 5.1: Re-run audit to confirm 0 violations
- [ ] Task 5.2: Verify syntax after each file fix - [ ] Task 5.2: Run pytest --collect-only to verify syntax
- [ ] Task 5.3: Commit each file individually - [ ] Task 5.3: Create checkpoint commit
+11 -11
View File
@@ -4,17 +4,17 @@ from pathlib import Path
symbols = [] symbols = []
for root in ['src', 'simulation']: for root in ['src', 'simulation']:
for p in Path(root).rglob('*.py'): for p in Path(root).rglob('*.py'):
try: try:
code = p.read_text(encoding='utf-8') code = p.read_text(encoding='utf-8')
tree = ast.parse(code) tree = ast.parse(code)
for node in tree.body: for node in tree.body:
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
if not node.name.startswith('_'): if not node.name.startswith('_'):
symbols.append((node.name, str(p))) symbols.append((node.name, str(p)))
except Exception: except Exception:
continue continue
print(f"TOTAL_SYMBOLS:{len(symbols)}") print(f"TOTAL_SYMBOLS:{len(symbols)}")
for name, path in symbols: for name, path in symbols:
print(f"SYMBOL:{name}|PATH:{path}") print(f"SYMBOL:{name}|PATH:{path}")
+22 -22
View File
@@ -9,31 +9,31 @@ ssl._create_default_https_context = ssl._create_unverified_context
inter_url = "https://github.com/rsms/inter/releases/download/v4.0/Inter-4.0.zip" inter_url = "https://github.com/rsms/inter/releases/download/v4.0/Inter-4.0.zip"
print(f"Downloading Inter from {inter_url}") print(f"Downloading Inter from {inter_url}")
try: try:
req = urllib.request.Request(inter_url, headers={'User-Agent': 'Mozilla/5.0'}) req = urllib.request.Request(inter_url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req) as response: with urllib.request.urlopen(req) as response:
with zipfile.ZipFile(io.BytesIO(response.read())) as z: with zipfile.ZipFile(io.BytesIO(response.read())) as z:
for info in z.infolist(): for info in z.infolist():
targets = ["Inter-Regular.ttf", "Inter-Bold.ttf", "Inter-Italic.ttf", "Inter-BoldItalic.ttf"] targets = ["Inter-Regular.ttf", "Inter-Bold.ttf", "Inter-Italic.ttf", "Inter-BoldItalic.ttf"]
filename = os.path.basename(info.filename) filename = os.path.basename(info.filename)
if filename in targets: if filename in targets:
info.filename = filename info.filename = filename
z.extract(info, "assets/fonts/") z.extract(info, "assets/fonts/")
print(f"Extracted {info.filename}") print(f"Extracted {info.filename}")
except Exception as e: except Exception as e:
print(f"Failed to get Inter: {e}") print(f"Failed to get Inter: {e}")
maple_url = "https://github.com/subframe7536/maple-font/releases/download/v6.4/MapleMono-ttf.zip" maple_url = "https://github.com/subframe7536/maple-font/releases/download/v6.4/MapleMono-ttf.zip"
print(f"Downloading Maple Mono from {maple_url}") print(f"Downloading Maple Mono from {maple_url}")
try: try:
req = urllib.request.Request(maple_url, headers={'User-Agent': 'Mozilla/5.0'}) req = urllib.request.Request(maple_url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req) as response: with urllib.request.urlopen(req) as response:
with zipfile.ZipFile(io.BytesIO(response.read())) as z: with zipfile.ZipFile(io.BytesIO(response.read())) as z:
for info in z.infolist(): for info in z.infolist():
targets = ["MapleMono-Regular.ttf", "MapleMono-Bold.ttf", "MapleMono-Italic.ttf", "MapleMono-BoldItalic.ttf"] targets = ["MapleMono-Regular.ttf", "MapleMono-Bold.ttf", "MapleMono-Italic.ttf", "MapleMono-BoldItalic.ttf"]
filename = os.path.basename(info.filename) filename = os.path.basename(info.filename)
if filename in targets: if filename in targets:
info.filename = filename info.filename = filename
z.extract(info, "assets/fonts/") z.extract(info, "assets/fonts/")
print(f"Extracted {info.filename}") print(f"Extracted {info.filename}")
except Exception as e: except Exception as e:
print(f"Failed to get Maple Mono: {e}") print(f"Failed to get Maple Mono: {e}")
+74 -74
View File
@@ -3,86 +3,86 @@ import re
from typing import Optional, Tuple from typing import Optional, Tuple
class FuzzyAnchor: class FuzzyAnchor:
@staticmethod @staticmethod
def get_context(lines: list[str], index: int, count: int, direction: int) -> list[str]: def get_context(lines: list[str], index: int, count: int, direction: int) -> list[str]:
context = [] context = []
curr = index curr = index
while len(context) < count and 0 <= curr < len(lines): while len(context) < count and 0 <= curr < len(lines):
line = lines[curr].strip() line = lines[curr].strip()
if line: if line:
context.append(line) context.append(line)
curr += direction curr += direction
return context return context
@classmethod @classmethod
def create_slice(cls, text: str, start_line: int, end_line: int) -> dict: def create_slice(cls, text: str, start_line: int, end_line: int) -> dict:
""" """
start_line and end_line are 1-based. start_line and end_line are 1-based.
[C: src/gui_2.py:App._populate_auto_slices, src/gui_2.py:App._render_text_viewer_window, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_create_slice_basic, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations] [C: src/gui_2.py:App._populate_auto_slices, src/gui_2.py:App._render_text_viewer_window, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_create_slice_basic, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations]
""" """
lines = text.splitlines() lines = text.splitlines()
s_idx = max(0, start_line - 1) s_idx = max(0, start_line - 1)
e_idx = min(len(lines), end_line) e_idx = min(len(lines), end_line)
slice_lines = lines[s_idx:e_idx] slice_lines = lines[s_idx:e_idx]
slice_text = "\n".join(slice_lines) slice_text = "\n".join(slice_lines)
return { return {
"start_line": start_line, "start_line": start_line,
"end_line": end_line, "end_line": end_line,
"start_context": cls.get_context(lines, s_idx, 3, 1), "start_context": cls.get_context(lines, s_idx, 3, 1),
"end_context": cls.get_context(lines, e_idx - 1, 3, -1)[::-1], # Reverse back to normal order "end_context": cls.get_context(lines, e_idx - 1, 3, -1)[::-1], # Reverse back to normal order
"content_hash": hashlib.mdsafe(slice_text.encode()).hexdigest() if hasattr(hashlib, 'mdsafe') else hashlib.md5(slice_text.encode()).hexdigest() "content_hash": hashlib.mdsafe(slice_text.encode()).hexdigest() if hasattr(hashlib, 'mdsafe') else hashlib.md5(slice_text.encode()).hexdigest()
} }
@classmethod @classmethod
def resolve_slice(cls, text: str, slice_data: dict) -> Optional[Tuple[int, int]]: def resolve_slice(cls, text: str, slice_data: dict) -> Optional[Tuple[int, int]]:
""" """
[C: tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed] [C: tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed]
""" """
lines = text.splitlines() lines = text.splitlines()
# 1. Try exact match # 1. Try exact match
s_idx = slice_data["start_line"] - 1 s_idx = slice_data["start_line"] - 1
e_idx = slice_data["end_line"] e_idx = slice_data["end_line"]
if 0 <= s_idx < len(lines) and e_idx <= len(lines): if 0 <= s_idx < len(lines) and e_idx <= len(lines):
current_text = "\n".join(lines[s_idx:e_idx]) current_text = "\n".join(lines[s_idx:e_idx])
curr_hash = hashlib.md5(current_text.encode()).hexdigest() curr_hash = hashlib.md5(current_text.encode()).hexdigest()
if curr_hash == slice_data["content_hash"]: if curr_hash == slice_data["content_hash"]:
return (slice_data["start_line"], slice_data["end_line"]) return (slice_data["start_line"], slice_data["end_line"])
# 2. Fuzzy match # 2. Fuzzy match
start_ctx = slice_data["start_context"] start_ctx = slice_data["start_context"]
end_ctx = slice_data["end_context"] end_ctx = slice_data["end_context"]
if not start_ctx or not end_ctx: return None if not start_ctx or not end_ctx: return None
# Search for start_ctx # Search for start_ctx
best_s = -1 best_s = -1
for i in range(len(lines)): for i in range(len(lines)):
match = True match = True
for j, ctx_line in enumerate(start_ctx): for j, ctx_line in enumerate(start_ctx):
if i+j >= len(lines) or lines[i+j].strip() != ctx_line: if i+j >= len(lines) or lines[i+j].strip() != ctx_line:
match = False match = False
break break
if match: if match:
best_s = i best_s = i
break break
if best_s == -1: return None if best_s == -1: return None
# Search for end_ctx after start_ctx # Search for end_ctx after start_ctx
best_e = -1 best_e = -1
for i in range(best_s, len(lines)): for i in range(best_s, len(lines)):
match = True match = True
for j, ctx_line in enumerate(end_ctx): for j, ctx_line in enumerate(end_ctx):
# end_ctx is the LAST 3 lines. So we match backwards from i. # end_ctx is the LAST 3 lines. So we match backwards from i.
idx = i - (len(end_ctx) - 1) + j idx = i - (len(end_ctx) - 1) + j
if idx < 0 or idx >= len(lines) or lines[idx].strip() != ctx_line: if idx < 0 or idx >= len(lines) or lines[idx].strip() != ctx_line:
match = False match = False
break break
if match: if match:
best_e = i + 1 best_e = i + 1
break break
if best_e != -1: if best_e != -1:
return (best_s + 1, best_e) return (best_s + 1, best_e)
return None return None
+84 -84
View File
@@ -3,104 +3,104 @@ from dataclasses import dataclass, field
@dataclass @dataclass
class PendingPatch: class PendingPatch:
patch_text: str patch_text: str
file_paths: List[str] file_paths: List[str]
generated_by: str generated_by: str
timestamp: float timestamp: float
class PatchModalManager: class PatchModalManager:
def __init__(self): def __init__(self):
self._pending_patch: Optional[PendingPatch] = None self._pending_patch: Optional[PendingPatch] = None
self._show_modal: bool = False self._show_modal: bool = False
self._on_apply_callback: Optional[Callable[[str], bool]] = None self._on_apply_callback: Optional[Callable[[str], bool]] = None
self._on_reject_callback: Optional[Callable[[], None]] = None self._on_reject_callback: Optional[Callable[[], None]] = None
def request_patch_approval(self, patch_text: str, file_paths: List[str], generated_by: str = "Tier 4 QA") -> bool: def request_patch_approval(self, patch_text: str, file_paths: List[str], generated_by: str = "Tier 4 QA") -> bool:
""" """
[C: tests/test_patch_modal.py:test_close_modal, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset] [C: tests/test_patch_modal.py:test_close_modal, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset]
""" """
from time import time from time import time
self._pending_patch = PendingPatch( self._pending_patch = PendingPatch(
patch_text=patch_text, patch_text=patch_text,
file_paths=file_paths, file_paths=file_paths,
generated_by=generated_by, generated_by=generated_by,
timestamp=time() timestamp=time()
) )
self._show_modal = True self._show_modal = True
return True return True
def get_pending_patch(self) -> Optional[PendingPatch]: def get_pending_patch(self) -> Optional[PendingPatch]:
""" """
[C: tests/test_patch_modal.py:test_patch_modal_manager_init, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset] [C: tests/test_patch_modal.py:test_patch_modal_manager_init, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset]
""" """
return self._pending_patch return self._pending_patch
def is_modal_shown(self) -> bool: def is_modal_shown(self) -> bool:
""" """
[C: tests/test_patch_modal.py:test_close_modal, tests/test_patch_modal.py:test_patch_modal_manager_init, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset] [C: tests/test_patch_modal.py:test_close_modal, tests/test_patch_modal.py:test_patch_modal_manager_init, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset]
""" """
return self._show_modal return self._show_modal
def set_apply_callback(self, callback: Callable[[str], bool]) -> None: def set_apply_callback(self, callback: Callable[[str], bool]) -> None:
""" """
[C: tests/test_patch_modal.py:test_apply_callback, tests/test_patch_modal.py:test_reset] [C: tests/test_patch_modal.py:test_apply_callback, tests/test_patch_modal.py:test_reset]
""" """
self._on_apply_callback = callback self._on_apply_callback = callback
def set_reject_callback(self, callback: Callable[[], None]) -> None: def set_reject_callback(self, callback: Callable[[], None]) -> None:
""" """
[C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reset] [C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reset]
""" """
self._on_reject_callback = callback self._on_reject_callback = callback
def apply_patch(self, patch_text: str) -> bool: def apply_patch(self, patch_text: str) -> bool:
""" """
[C: tests/test_patch_modal.py:test_apply_callback] [C: tests/test_patch_modal.py:test_apply_callback]
""" """
if self._on_apply_callback: if self._on_apply_callback:
return self._on_apply_callback(patch_text) return self._on_apply_callback(patch_text)
return False return False
def reject_patch(self) -> None: def reject_patch(self) -> None:
""" """
[C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reject_patch] [C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reject_patch]
""" """
self._pending_patch = None self._pending_patch = None
self._show_modal = False self._show_modal = False
if self._on_reject_callback: if self._on_reject_callback:
self._on_reject_callback() self._on_reject_callback()
def close_modal(self) -> None: def close_modal(self) -> None:
""" """
[C: tests/test_patch_modal.py:test_close_modal] [C: tests/test_patch_modal.py:test_close_modal]
""" """
self._show_modal = False self._show_modal = False
def reset(self) -> None: def reset(self) -> None:
""" """
[C: tests/test_patch_modal.py:test_reset] [C: tests/test_patch_modal.py:test_reset]
""" """
self._pending_patch = None self._pending_patch = None
self._show_modal = False self._show_modal = False
self._on_apply_callback = None self._on_apply_callback = None
self._on_reject_callback = None self._on_reject_callback = None
_patch_modal_manager: Optional[PatchModalManager] = None _patch_modal_manager: Optional[PatchModalManager] = None
def get_patch_modal_manager() -> PatchModalManager: def get_patch_modal_manager() -> PatchModalManager:
""" """
[C: tests/test_patch_modal.py:test_get_patch_modal_manager_singleton] [C: tests/test_patch_modal.py:test_get_patch_modal_manager_singleton]
""" """
global _patch_modal_manager global _patch_modal_manager
if _patch_modal_manager is None: if _patch_modal_manager is None:
_patch_modal_manager = PatchModalManager() _patch_modal_manager = PatchModalManager()
return _patch_modal_manager return _patch_modal_manager
def reset_patch_modal_manager() -> None: def reset_patch_modal_manager() -> None:
""" """
[C: tests/test_patch_modal.py:test_get_patch_modal_manager_singleton] [C: tests/test_patch_modal.py:test_get_patch_modal_manager_singleton]
""" """
global _patch_modal_manager global _patch_modal_manager
if _patch_modal_manager: if _patch_modal_manager:
_patch_modal_manager.reset() _patch_modal_manager.reset()
_patch_modal_manager = None _patch_modal_manager = None
+26 -26
View File
@@ -3,29 +3,29 @@ from src import ai_client
def test_ai_client_send_gemini_cli() -> None: def test_ai_client_send_gemini_cli() -> None:
test_message = "Hello, this is a test prompt for the CLI adapter." test_message = "Hello, this is a test prompt for the CLI adapter."
test_response = "This is a dummy response from the Gemini CLI." test_response = "This is a dummy response from the Gemini CLI."
ai_client.reset_session() ai_client.reset_session()
ai_client.set_provider("gemini_cli", "gemini-2.5-flash-lite") ai_client.set_provider("gemini_cli", "gemini-2.5-flash-lite")
with patch("src.ai_client.GeminiCliAdapter") as MockAdapterClass: with patch("src.ai_client.GeminiCliAdapter") as MockAdapterClass:
mock_adapter_instance = MagicMock() mock_adapter_instance = MagicMock()
mock_adapter_instance.send.return_value = { mock_adapter_instance.send.return_value = {
"text": test_response, "text": test_response,
"tool_calls": [], "tool_calls": [],
} }
mock_adapter_instance.last_usage = {"total_tokens": 100} mock_adapter_instance.last_usage = {"total_tokens": 100}
mock_adapter_instance.last_latency = 0.5 mock_adapter_instance.last_latency = 0.5
mock_adapter_instance.session_id = "test-session" mock_adapter_instance.session_id = "test-session"
MockAdapterClass.return_value = mock_adapter_instance MockAdapterClass.return_value = mock_adapter_instance
ai_client._gemini_cli_adapter = mock_adapter_instance ai_client._gemini_cli_adapter = mock_adapter_instance
with patch.object(ai_client.events, "emit") as mock_emit: with patch.object(ai_client.events, "emit") as mock_emit:
response = ai_client.send( response = ai_client.send(
md_content="<context></context>", md_content="<context></context>",
user_message=test_message, user_message=test_message,
base_dir=".", base_dir=".",
) )
mock_adapter_instance.send.assert_called() mock_adapter_instance.send.assert_called()
emitted_event_names = [call.args[0] for call in mock_emit.call_args_list] emitted_event_names = [call.args[0] for call in mock_emit.call_args_list]
assert "request_start" in emitted_event_names assert "request_start" in emitted_event_names
assert "response_received" in emitted_event_names assert "response_received" in emitted_event_names
assert response == test_response assert response == test_response
+35 -35
View File
@@ -6,45 +6,45 @@ import unittest
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
class TestArchBoundaryPhase1(unittest.TestCase): class TestArchBoundaryPhase1(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
pass pass
def tearDown(self) -> None: def tearDown(self) -> None:
pass pass
def test_unfettered_modules_constant_removed(self) -> None: def test_unfettered_modules_constant_removed(self) -> None:
"""TEST 1: Check 'UNFETTERED_MODULES' string is removed from project_manager.py""" """TEST 1: Check 'UNFETTERED_MODULES' string is removed from project_manager.py"""
# We check the source directly to be sure it's not just hidden # We check the source directly to be sure it's not just hidden
with open("src/project_manager.py", "r", encoding="utf-8") as f: with open("src/project_manager.py", "r", encoding="utf-8") as f:
content = f.read() content = f.read()
self.assertNotIn("UNFETTERED_MODULES", content) self.assertNotIn("UNFETTERED_MODULES", content)
def test_mcp_client_whitelist_enforcement(self) -> None: def test_mcp_client_whitelist_enforcement(self) -> None:
"""TEST 2: mcp_client._is_allowed must return False for config.toml""" """TEST 2: mcp_client._is_allowed must return False for config.toml"""
from src import mcp_client from src import mcp_client
from pathlib import Path from pathlib import Path
# Configure with some dummy file items (as dicts) # Configure with some dummy file items (as dicts)
file_items = [{"path": "src/gui_2.py"}] file_items = [{"path": "src/gui_2.py"}]
mcp_client.configure(file_items, []) mcp_client.configure(file_items, [])
# Should allow src files # Should allow src files
self.assertTrue(mcp_client._is_allowed(Path("src/gui_2.py"))) self.assertTrue(mcp_client._is_allowed(Path("src/gui_2.py")))
# Should REJECT config files # Should REJECT config files
self.assertFalse(mcp_client._is_allowed(Path("config.toml"))) self.assertFalse(mcp_client._is_allowed(Path("config.toml")))
self.assertFalse(mcp_client._is_allowed(Path("credentials.toml"))) self.assertFalse(mcp_client._is_allowed(Path("credentials.toml")))
def test_mma_exec_no_hardcoded_path(self) -> None: def test_mma_exec_no_hardcoded_path(self) -> None:
"""TEST 4: mma_exec.execute_agent must not contain hardcoded machine paths.""" """TEST 4: mma_exec.execute_agent must not contain hardcoded machine paths."""
with open("scripts/mma_exec.py", "r", encoding="utf-8") as f: with open("scripts/mma_exec.py", "r", encoding="utf-8") as f:
content = f.read() content = f.read()
# Check for some common home directory patterns or user paths # Check for some common home directory patterns or user paths
self.assertNotIn("C:\\Users\\Ed", content) self.assertNotIn("C:\\Users\\Ed", content)
self.assertNotIn("/Users/ed", content) self.assertNotIn("/Users/ed", content)
def test_claude_mma_exec_no_hardcoded_path(self) -> None: def test_claude_mma_exec_no_hardcoded_path(self) -> None:
"""TEST 5: claude_mma_exec.execute_agent must not contain hardcoded machine paths.""" """TEST 5: claude_mma_exec.execute_agent must not contain hardcoded machine paths."""
with open("scripts/claude_mma_exec.py", "r", encoding="utf-8") as f: with open("scripts/claude_mma_exec.py", "r", encoding="utf-8") as f:
content = f.read() content = f.read()
self.assertNotIn("C:\\Users\\Ed", content) self.assertNotIn("C:\\Users\\Ed", content)
self.assertNotIn("/Users/ed", content) self.assertNotIn("/Users/ed", content)
+81 -81
View File
@@ -8,93 +8,93 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
class TestArchBoundaryPhase2(unittest.TestCase): class TestArchBoundaryPhase2(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
pass
def test_toml_exposes_all_dispatch_tools(self) -> None:
"""manual_slop.toml [agent.tools] must list every tool in mcp_client.dispatch()."""
from src import models
# We check the tool names in the source of mcp_client.dispatch
import inspect
import src.mcp_client as mcp
source = inspect.getsource(mcp.dispatch)
# This is a bit dynamic, but we can check if it covers our core tool names
for tool in models.AGENT_TOOL_NAMES:
if tool not in ("set_file_slice", "py_update_definition", "py_set_signature", "py_set_var_declaration"):
# Non-mutating tools should definitely be handled
pass pass
def test_toml_mutating_tools_disabled_by_default(self) -> None:
"""Verify that the core set of read-only tools is present."""
from src.models import AGENT_TOOL_NAMES
# Our architecture now uses a fixed set of high-signal tools
self.assertIn("read_file", AGENT_TOOL_NAMES)
self.assertIn("list_directory", AGENT_TOOL_NAMES)
self.assertIn("py_get_skeleton", AGENT_TOOL_NAMES)
def test_toml_exposes_all_dispatch_tools(self) -> None: def test_mcp_client_dispatch_completeness(self) -> None:
"""manual_slop.toml [agent.tools] must list every tool in mcp_client.dispatch().""" """Verify that all tools in tool_schemas are handled by dispatch()."""
from src import models from src import mcp_client
# get_tool_schemas exists
available_tools = [t["name"] for t in mcp_client.get_tool_schemas()]
self.assertGreater(len(available_tools), 0)
def test_mutating_tool_triggers_callback(self) -> None:
"""All mutating tools must trigger the pre_tool_callback."""
from src.app_controller import AppController
# We check the tool names in the source of mcp_client.dispatch # Use a real AppController to test its _confirm_and_run
import inspect with patch('src.models.load_config', return_value={}), \
import src.mcp_client as mcp patch('src.performance_monitor.PerformanceMonitor'), \
source = inspect.getsource(mcp.dispatch) patch('src.session_logger.open_session'), \
# This is a bit dynamic, but we can check if it covers our core tool names patch('src.session_logger.reset_session'), \
for tool in models.AGENT_TOOL_NAMES: patch('src.app_controller.AppController._prune_old_logs'), \
if tool not in ("set_file_slice", "py_update_definition", "py_set_signature", "py_set_var_declaration"): patch('src.app_controller.AppController._init_ai_and_hooks'):
# Non-mutating tools should definitely be handled controller = AppController()
pass
def test_toml_mutating_tools_disabled_by_default(self) -> None: mock_cb = MagicMock(return_value="output")
"""Verify that the core set of read-only tools is present.""" # AppController implements its own _confirm_and_run, let's see how we can mock the HITL part
from src.models import AGENT_TOOL_NAMES # In AppController._confirm_and_run, if test_hooks_enabled=False (default), it waits for a dialog
# Our architecture now uses a fixed set of high-signal tools
self.assertIn("read_file", AGENT_TOOL_NAMES) with patch("src.shell_runner.run_powershell", return_value="output"):
self.assertIn("list_directory", AGENT_TOOL_NAMES) # Simulate auto-approval for test
self.assertIn("py_get_skeleton", AGENT_TOOL_NAMES) controller.test_hooks_enabled = True
controller.ui_manual_approve = False
res = controller._confirm_and_run("echo hello", ".")
self.assertEqual(res, "output")
def test_mcp_client_dispatch_completeness(self) -> None: def test_rejection_prevents_dispatch(self) -> None:
"""Verify that all tools in tool_schemas are handled by dispatch().""" """When pre_tool_callback returns None (rejected), dispatch must NOT be called."""
from src import mcp_client from src.app_controller import AppController
# get_tool_schemas exists
available_tools = [t["name"] for t in mcp_client.get_tool_schemas()]
self.assertGreater(len(available_tools), 0)
def test_mutating_tool_triggers_callback(self) -> None:
"""All mutating tools must trigger the pre_tool_callback."""
from src.app_controller import AppController
# Use a real AppController to test its _confirm_and_run with patch('src.models.load_config', return_value={}), \
with patch('src.models.load_config', return_value={}), \ patch('src.performance_monitor.PerformanceMonitor'), \
patch('src.performance_monitor.PerformanceMonitor'), \ patch('src.session_logger.open_session'), \
patch('src.session_logger.open_session'), \ patch('src.session_logger.reset_session'), \
patch('src.session_logger.reset_session'), \ patch('src.app_controller.AppController._prune_old_logs'), \
patch('src.app_controller.AppController._prune_old_logs'), \ patch('src.app_controller.AppController._init_ai_and_hooks'):
patch('src.app_controller.AppController._init_ai_and_hooks'): controller = AppController()
controller = AppController()
mock_cb = MagicMock(return_value="output") # Mock the wait() method of ConfirmDialog to return (False, script)
# AppController implements its own _confirm_and_run, let's see how we can mock the HITL part with patch("src.app_controller.ConfirmDialog") as mock_dialog_class:
# In AppController._confirm_and_run, if test_hooks_enabled=False (default), it waits for a dialog mock_dialog = mock_dialog_class.return_value
mock_dialog.wait.return_value = (False, "script")
with patch("src.shell_runner.run_powershell", return_value="output"): mock_dialog._uid = "test_uid"
# Simulate auto-approval for test
controller.test_hooks_enabled = True
controller.ui_manual_approve = False
res = controller._confirm_and_run("echo hello", ".")
self.assertEqual(res, "output")
def test_rejection_prevents_dispatch(self) -> None:
"""When pre_tool_callback returns None (rejected), dispatch must NOT be called."""
from src.app_controller import AppController
with patch('src.models.load_config', return_value={}), \
patch('src.performance_monitor.PerformanceMonitor'), \
patch('src.session_logger.open_session'), \
patch('src.session_logger.reset_session'), \
patch('src.app_controller.AppController._prune_old_logs'), \
patch('src.app_controller.AppController._init_ai_and_hooks'):
controller = AppController()
# Mock the wait() method of ConfirmDialog to return (False, script)
with patch("src.app_controller.ConfirmDialog") as mock_dialog_class:
mock_dialog = mock_dialog_class.return_value
mock_dialog.wait.return_value = (False, "script")
mock_dialog._uid = "test_uid"
with patch("src.shell_runner.run_powershell") as mock_run: with patch("src.shell_runner.run_powershell") as mock_run:
controller.test_hooks_enabled = False # Force manual approval (dialog) controller.test_hooks_enabled = False # Force manual approval (dialog)
res = controller._confirm_and_run("script", ".") res = controller._confirm_and_run("script", ".")
self.assertIsNone(res) self.assertIsNone(res)
self.assertFalse(mock_run.called) self.assertFalse(mock_run.called)
def test_non_mutating_tool_skips_callback(self) -> None: def test_non_mutating_tool_skips_callback(self) -> None:
"""Read-only tools must NOT trigger pre_tool_callback.""" """Read-only tools must NOT trigger pre_tool_callback."""
from src import ai_client from src import ai_client
# Check internal list or method # Check internal list or method
if hasattr(ai_client, '_is_mutating_tool'): if hasattr(ai_client, '_is_mutating_tool'):
mutating = ["run_powershell", "set_file_slice"] mutating = ["run_powershell", "set_file_slice"]
for t in mutating: for t in mutating:
self.assertTrue(ai_client._is_mutating_tool(t)) self.assertTrue(ai_client._is_mutating_tool(t))
self.assertFalse(ai_client._is_mutating_tool("read_file")) self.assertFalse(ai_client._is_mutating_tool("read_file"))
self.assertFalse(ai_client._is_mutating_tool("list_directory")) self.assertFalse(ai_client._is_mutating_tool("list_directory"))
+59 -86
View File
@@ -1,91 +1,64 @@
import os import os
import sys import sys
import unittest import unittest
# Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
class TestArchBoundaryPhase3(unittest.TestCase): class TestArchBoundaryPhase3(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
pass pass
def test_cascade_blocks_simple(self) -> None:
def test_cascade_blocks_simple(self) -> None: """Test that a blocked dependency blocks its immediate dependent."""
"""Test that a blocked dependency blocks its immediate dependent.""" from src.models import Ticket, Track
from src.models import Ticket, Track t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1") t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"])
t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"]) track = Track(id="TR1", description="track", tickets=[t1, t2])
track = Track(id="TR1", description="track", tickets=[t1, t2]) from src.dag_engine import TrackDAG, ExecutionEngine
dag = TrackDAG([t1, t2])
# ExecutionEngine should identify T2 as blocked during tick engine = ExecutionEngine(dag)
from src.dag_engine import TrackDAG, ExecutionEngine engine.tick()
dag = TrackDAG([t1, t2]) self.assertEqual(t2.status, "blocked")
engine = ExecutionEngine(dag) if t2.blocked_reason:
engine.tick() self.assertIn("T1", t2.blocked_reason)
def test_cascade_blocks_multi_hop(self) -> None:
self.assertEqual(t2.status, "blocked") """Test that blocking cascades through multiple dependencies."""
if t2.blocked_reason: from src.models import Ticket
self.assertIn("T1", t2.blocked_reason) from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
def test_cascade_blocks_multi_hop(self) -> None: t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"])
"""Test that blocking cascades through multiple dependencies.""" t3 = Ticket(id="T3", description="d3", status="todo", assigned_to="worker1", depends_on=["T2"])
from src.models import Ticket dag = TrackDAG([t1, t2, t3])
from src.dag_engine import TrackDAG, ExecutionEngine engine = ExecutionEngine(dag)
engine.tick()
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1") self.assertEqual(t2.status, "blocked")
t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"]) self.assertEqual(t3.status, "blocked")
t3 = Ticket(id="T3", description="d3", status="todo", assigned_to="worker1", depends_on=["T2"]) def test_manual_unblock_restores_todo(self) -> None:
"""Test that unblocking a task manually works if dependencies are met."""
dag = TrackDAG([t1, t2, t3]) from src.models import Ticket
engine = ExecutionEngine(dag) from src.dag_engine import TrackDAG, ExecutionEngine
engine.tick() t1 = Ticket(id="T1", description="d1", status="completed", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="blocked", assigned_to="worker1", blocked_reason="manual")
self.assertEqual(t2.status, "blocked") dag = TrackDAG([t1, t2])
self.assertEqual(t3.status, "blocked") engine = ExecutionEngine(dag)
engine.update_task_status("T2", "todo")
def test_manual_unblock_restores_todo(self) -> None: self.assertEqual(t2.status, "todo")
"""Test that unblocking a task manually works if dependencies are met.""" ready = engine.tick()
from src.models import Ticket self.assertIn(t2, ready)
from src.dag_engine import TrackDAG, ExecutionEngine def test_in_progress_not_blocked(self) -> None:
"""Test that in_progress tasks are not blocked automatically (only todo)."""
t1 = Ticket(id="T1", description="d1", status="completed", assigned_to="worker1") from src.models import Ticket
t2 = Ticket(id="T2", description="d2", status="blocked", assigned_to="worker1", blocked_reason="manual") from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
dag = TrackDAG([t1, t2]) t2 = Ticket(id="T2", description="d2", status="in_progress", assigned_to="worker1", depends_on=["T1"])
engine = ExecutionEngine(dag) dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
# Update status to todo engine.tick()
engine.update_task_status("T2", "todo") self.assertEqual(t2.status, "in_progress")
self.assertEqual(t2.status, "todo") def test_execution_engine_tick_cascades_blocks(self) -> None:
"""Test that ExecutionEngine.tick() triggers the cascading blocks."""
# Next tick should keep it todo (ready) from src.models import Ticket
ready = engine.tick() from src.dag_engine import TrackDAG, ExecutionEngine
self.assertIn(t2, ready) t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"])
def test_in_progress_not_blocked(self) -> None: dag = TrackDAG([t1, t2])
"""Test that in_progress tasks are not blocked automatically (only todo).""" engine = ExecutionEngine(dag)
from src.models import Ticket engine.tick()
from src.dag_engine import TrackDAG, ExecutionEngine self.assertEqual(t2.status, "blocked")
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="in_progress", assigned_to="worker1", depends_on=["T1"])
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
engine.tick()
# T2 should remain in_progress because it's already running
self.assertEqual(t2.status, "in_progress")
def test_execution_engine_tick_cascades_blocks(self) -> None:
"""Test that ExecutionEngine.tick() triggers the cascading blocks."""
from src.models import Ticket
from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"])
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
engine.tick()
self.assertEqual(t2.status, "blocked")
+31 -31
View File
@@ -4,45 +4,45 @@ from src.app_controller import AppController
from src.models import FileItem from src.models import FileItem
def test_context_files_is_decoupled(): def test_context_files_is_decoupled():
controller = AppController() controller = AppController()
# Verify both lists exist and are distinct # Verify both lists exist and are distinct
assert hasattr(controller, 'files') assert hasattr(controller, 'files')
assert hasattr(controller, 'context_files') assert hasattr(controller, 'context_files')
assert controller.files is not controller.context_files assert controller.files is not controller.context_files
# Modifying one should not affect the other # Modifying one should not affect the other
controller.files.append(FileItem(path="whitelist.txt")) controller.files.append(FileItem(path="whitelist.txt"))
controller.context_files.append(FileItem(path="context.txt")) controller.context_files.append(FileItem(path="context.txt"))
assert len(controller.files) == 1 assert len(controller.files) == 1
assert controller.files[0].path == "whitelist.txt" assert controller.files[0].path == "whitelist.txt"
assert len(controller.context_files) == 1 assert len(controller.context_files) == 1
assert controller.context_files[0].path == "context.txt" assert controller.context_files[0].path == "context.txt"
def test_do_generate_uses_context_files(monkeypatch): def test_do_generate_uses_context_files(monkeypatch):
controller = AppController() controller = AppController()
controller.init_state() controller.init_state()
controller.context_files = [FileItem(path="context.txt")] controller.context_files = [FileItem(path="context.txt")]
controller.files = [FileItem(path="whitelist.txt")] controller.files = [FileItem(path="whitelist.txt")]
# Mock project_manager.flat_config and aggregate.run to verify passed data # Mock project_manager.flat_config and aggregate.run to verify passed data
import src.project_manager as pm import src.project_manager as pm
import src.aggregate as agg import src.aggregate as agg
def mock_flat_config(*args, **kwargs): def mock_flat_config(*args, **kwargs):
return {"files": {}} return {"files": {}}
def mock_aggregate_run(flat, **kwargs): def mock_aggregate_run(flat, **kwargs):
assert flat["files"]["paths"] == controller.context_files assert flat["files"]["paths"] == controller.context_files
return ("md", Path("path"), []) return ("md", Path("path"), [])
monkeypatch.setattr(pm, "flat_config", mock_flat_config) monkeypatch.setattr(pm, "flat_config", mock_flat_config)
monkeypatch.setattr(pm, "save_project", lambda *args: None) monkeypatch.setattr(pm, "save_project", lambda *args: None)
monkeypatch.setattr(agg, "run", mock_aggregate_run) monkeypatch.setattr(agg, "run", mock_aggregate_run)
monkeypatch.setattr(agg, "build_markdown_no_history", lambda *args, **kwargs: "stable") monkeypatch.setattr(agg, "build_markdown_no_history", lambda *args, **kwargs: "stable")
monkeypatch.setattr(agg, "build_discussion_text", lambda *args, **kwargs: "disc") monkeypatch.setattr(agg, "build_discussion_text", lambda *args, **kwargs: "disc")
# Should not raise assertion error # Should not raise assertion error
controller._do_generate() controller._do_generate()
+22 -22
View File
@@ -3,28 +3,28 @@ from src.aggregate import group_files_by_dir, compute_file_stats
from src.models import FileItem from src.models import FileItem
def test_group_files_by_dir(): def test_group_files_by_dir():
files = [ files = [
FileItem(path="src/main.py"), FileItem(path="src/main.py"),
FileItem(path="src/utils/helpers.py"), FileItem(path="src/utils/helpers.py"),
FileItem(path="tests/test_main.py"), FileItem(path="tests/test_main.py"),
FileItem(path="README.md") FileItem(path="README.md")
] ]
grouped = group_files_by_dir(files) grouped = group_files_by_dir(files)
assert len(grouped) == 4 assert len(grouped) == 4
assert grouped["src"] == [files[0]] assert grouped["src"] == [files[0]]
assert grouped["src/utils"] == [files[1]] assert grouped["src/utils"] == [files[1]]
assert grouped["tests"] == [files[2]] assert grouped["tests"] == [files[2]]
assert grouped["."] == [files[3]] assert grouped["."] == [files[3]]
def test_compute_file_stats(): def test_compute_file_stats():
import tempfile import tempfile
import os import os
with tempfile.TemporaryDirectory() as temp_dir: with tempfile.TemporaryDirectory() as temp_dir:
# Create a dummy python file # Create a dummy python file
py_path = os.path.join(temp_dir, "test.py") py_path = os.path.join(temp_dir, "test.py")
with open(py_path, "w") as f: with open(py_path, "w") as f:
f.write("def foo():\n pass\n\nclass Bar:\n pass\n") f.write("def foo():\n pass\n\nclass Bar:\n pass\n")
stats = compute_file_stats(py_path) stats = compute_file_stats(py_path)
assert stats["lines"] == 5 assert stats["lines"] == 5
assert stats["ast_elements"] == 2 # 1 func, 1 class assert stats["ast_elements"] == 2 # 1 func, 1 class
+22 -22
View File
@@ -3,33 +3,33 @@ from src.gui_2 import App
from src.models import FileItem from src.models import FileItem
def test_view_mode_initialization(): def test_view_mode_initialization():
app = App() app = App()
# Mock imgui # Mock imgui
from imgui_bundle import imgui from imgui_bundle import imgui
app.context_files = [ app.context_files = [
FileItem(path="test1.py"), FileItem(path="test1.py"),
FileItem(path="test2.py", view_mode="full") FileItem(path="test2.py", view_mode="full")
] ]
# We test the model defaults and the rendering assignment logic indirectly. # We test the model defaults and the rendering assignment logic indirectly.
assert app.context_files[0].view_mode == "full" # Default from FileItem Model assert app.context_files[0].view_mode == "full" # Default from FileItem Model
assert app.context_files[1].view_mode == "full" assert app.context_files[1].view_mode == "full"
def test_batch_view_mode_change(): def test_batch_view_mode_change():
app = App() app = App()
f1 = FileItem(path="test1.py", view_mode="full") f1 = FileItem(path="test1.py", view_mode="full")
f2 = FileItem(path="test2.py", view_mode="full") f2 = FileItem(path="test2.py", view_mode="full")
app.context_files = [f1, f2] app.context_files = [f1, f2]
app.ui_selected_context_files = {"test1.py"} app.ui_selected_context_files = {"test1.py"}
# Simulate clicking "Skeleton" batch button # Simulate clicking "Skeleton" batch button
for f in app.context_files: for f in app.context_files:
f_path = f.path if hasattr(f, "path") else str(f) f_path = f.path if hasattr(f, "path") else str(f)
if f_path in app.ui_selected_context_files: if f_path in app.ui_selected_context_files:
f.view_mode = "skeleton" f.view_mode = "skeleton"
assert f1.view_mode == "skeleton" assert f1.view_mode == "skeleton"
assert f2.view_mode == "full" # not selected assert f2.view_mode == "full" # not selected
+66 -66
View File
@@ -3,55 +3,55 @@ import tempfile
import os import os
from pathlib import Path from pathlib import Path
from src.diff_viewer import ( from src.diff_viewer import (
parse_diff, DiffFile, DiffHunk, parse_hunk_header, parse_diff, DiffFile, DiffHunk, parse_hunk_header,
get_line_color, apply_patch_to_file get_line_color, apply_patch_to_file
) )
def test_parse_diff_empty() -> None: def test_parse_diff_empty() -> None:
result = parse_diff("") result = parse_diff("")
assert result == [] assert result == []
def test_parse_diff_none() -> None: def test_parse_diff_none() -> None:
result = parse_diff(None) # type: ignore result = parse_diff(None) # type: ignore
assert result == [] assert result == []
def test_parse_simple_diff() -> None: def test_parse_simple_diff() -> None:
diff_text = """--- a/src/test.py diff_text = """--- a/src/test.py
+++ b/src/test.py +++ b/src/test.py
@@ -1 +1 @@ @@ -1 +1 @@
-old -old
+new""" +new"""
result = parse_diff(diff_text) result = parse_diff(diff_text)
assert len(result) == 1 assert len(result) == 1
assert result[0].old_path == "src/test.py" assert result[0].old_path == "src/test.py"
assert result[0].new_path == "src/test.py" assert result[0].new_path == "src/test.py"
assert len(result[0].hunks) == 1 assert len(result[0].hunks) == 1
assert result[0].hunks[0].header == "@@ -1 +1 @@" assert result[0].hunks[0].header == "@@ -1 +1 @@"
def test_parse_diff_with_context() -> None: def test_parse_diff_with_context() -> None:
diff_text = """--- a/src/example.py diff_text = """--- a/src/example.py
+++ b/src/example.py +++ b/src/example.py
@@ -10,5 +10,6 @@ @@ -10,5 +10,6 @@
def existing_function(): def existing_function():
pass pass
- old_line - old_line
+ old_line + old_line
+ new_line + new_line
more_code""" more_code"""
result = parse_diff(diff_text) result = parse_diff(diff_text)
assert len(result) == 1 assert len(result) == 1
assert result[0].old_path == "src/example.py" assert result[0].old_path == "src/example.py"
assert len(result[0].hunks) == 1 assert len(result[0].hunks) == 1
hunk = result[0].hunks[0] hunk = result[0].hunks[0]
assert hunk.old_start == 10 assert hunk.old_start == 10
assert hunk.old_count == 5 assert hunk.old_count == 5
assert hunk.new_start == 10 assert hunk.new_start == 10
assert hunk.new_count == 6 assert hunk.new_count == 6
assert "- old_line" in hunk.lines assert "- old_line" in hunk.lines
assert "+ new_line" in hunk.lines assert "+ new_line" in hunk.lines
def test_parse_multiple_files() -> None: def test_parse_multiple_files() -> None:
diff_text = """--- a/file1.py diff_text = """--- a/file1.py
+++ b/file1.py +++ b/file1.py
@@ -1 +1 @@ @@ -1 +1 @@
-a -a
@@ -61,70 +61,70 @@ def test_parse_multiple_files() -> None:
@@ -1 +1 @@ @@ -1 +1 @@
-c -c
+d""" +d"""
result = parse_diff(diff_text) result = parse_diff(diff_text)
assert len(result) == 2 assert len(result) == 2
assert result[0].old_path == "file1.py" assert result[0].old_path == "file1.py"
assert result[1].old_path == "file2.py" assert result[1].old_path == "file2.py"
def test_parse_hunk_header() -> None: def test_parse_hunk_header() -> None:
result = parse_hunk_header("@@ -10,5 +10,6 @@") result = parse_hunk_header("@@ -10,5 +10,6 @@")
assert result == (10, 5, 10, 6) assert result == (10, 5, 10, 6)
result = parse_hunk_header("@@ -1 +1 @@") result = parse_hunk_header("@@ -1 +1 @@")
assert result == (1, 1, 1, 1) assert result == (1, 1, 1, 1)
def test_diff_line_classification() -> None: def test_diff_line_classification() -> None:
diff_text = """--- a/test.py diff_text = """--- a/test.py
+++ b/test.py +++ b/test.py
@@ -1,3 +1,4 @@ @@ -1,3 +1,4 @@
context line context line
-removed line -removed line
+removed line +removed line
+added line +added line
another context""" another context"""
result = parse_diff(diff_text) result = parse_diff(diff_text)
hunk = result[0].hunks[0] hunk = result[0].hunks[0]
assert any(line.startswith("-") for line in hunk.lines) assert any(line.startswith("-") for line in hunk.lines)
assert any(line.startswith("+") for line in hunk.lines) assert any(line.startswith("+") for line in hunk.lines)
assert any(line.startswith(" ") or not line.startswith(("-", "+")) for line in hunk.lines) assert any(line.startswith(" ") or not line.startswith(("-", "+")) for line in hunk.lines)
def test_get_line_color() -> None: def test_get_line_color() -> None:
assert get_line_color("+added") == "green" assert get_line_color("+added") == "green"
assert get_line_color("-removed") == "red" assert get_line_color("-removed") == "red"
assert get_line_color("@@ -1,3 +1,4 @@") == "cyan" assert get_line_color("@@ -1,3 +1,4 @@") == "cyan"
assert get_line_color(" context") == None assert get_line_color(" context") == None
def test_apply_patch_simple() -> None: def test_apply_patch_simple() -> None:
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "test.py" test_file = Path(tmpdir) / "test.py"
test_file.write_text("old\n") test_file.write_text("old\n")
patch = f"""--- a/{test_file.name} patch = f"""--- a/{test_file.name}
+++ b/{test_file.name} +++ b/{test_file.name}
@@ -1 +1 @@ @@ -1 +1 @@
-old -old
+new""" +new"""
success, msg = apply_patch_to_file(patch, tmpdir) success, msg = apply_patch_to_file(patch, tmpdir)
assert success assert success
assert test_file.read_text() == "new\n" assert test_file.read_text() == "new\n"
def test_apply_patch_with_context() -> None: def test_apply_patch_with_context() -> None:
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / "example.py" test_file = Path(tmpdir) / "example.py"
test_file.write_text("line 1\nline 2\nline 3\n") test_file.write_text("line 1\nline 2\nline 3\n")
patch = f"""--- a/{test_file.name} patch = f"""--- a/{test_file.name}
+++ b/{test_file.name} +++ b/{test_file.name}
@@ -1,3 +1,3 @@ @@ -1,3 +1,3 @@
-line 1 -line 1
-line 2 -line 2
+line one +line one
+line two +line two
line 3""" line 3"""
success, msg = apply_patch_to_file(patch, tmpdir) success, msg = apply_patch_to_file(patch, tmpdir)
assert success assert success
content = test_file.read_text() content = test_file.read_text()
assert "line one" in content assert "line one" in content
assert "line two" in content assert "line two" in content
+82 -82
View File
@@ -3,127 +3,127 @@ import pytest
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
from src.models import TextEditorConfig, ExternalEditorConfig from src.models import TextEditorConfig, ExternalEditorConfig
from src.external_editor import ( from src.external_editor import (
ExternalEditorLauncher, ExternalEditorLauncher,
get_default_launcher, get_default_launcher,
create_temp_modified_file, create_temp_modified_file,
) )
@pytest.fixture @pytest.fixture
def vscode_editor(): def vscode_editor():
return TextEditorConfig(name="vscode", path="C:\\path\\to\\code.exe", diff_args=["--diff"]) return TextEditorConfig(name="vscode", path="C:\\path\\to\\code.exe", diff_args=["--diff"])
@pytest.fixture @pytest.fixture
def notepadpp_editor(): def notepadpp_editor():
return TextEditorConfig(name="notepad++", path="C:\\path\\to\\notepad++.exe", diff_args=["-multiInst", "-nosession"]) return TextEditorConfig(name="notepad++", path="C:\\path\\to\\notepad++.exe", diff_args=["-multiInst", "-nosession"])
@pytest.fixture @pytest.fixture
def ext_config(vscode_editor, notepadpp_editor): def ext_config(vscode_editor, notepadpp_editor):
return ExternalEditorConfig( return ExternalEditorConfig(
editors={"vscode": vscode_editor, "notepad++": notepadpp_editor}, editors={"vscode": vscode_editor, "notepad++": notepadpp_editor},
default_editor="vscode", default_editor="vscode",
) )
@pytest.fixture @pytest.fixture
def launcher(ext_config): def launcher(ext_config):
return ExternalEditorLauncher(ext_config) return ExternalEditorLauncher(ext_config)
class TestTextEditorConfig: class TestTextEditorConfig:
def test_from_dict_with_diff_args(self): def test_from_dict_with_diff_args(self):
data = {"name": "vscode", "path": "C:\\code.exe", "diff_args": ["--diff"]} data = {"name": "vscode", "path": "C:\\code.exe", "diff_args": ["--diff"]}
editor = TextEditorConfig.from_dict(data) editor = TextEditorConfig.from_dict(data)
assert editor.name == "vscode" assert editor.name == "vscode"
assert editor.path == "C:\\code.exe" assert editor.path == "C:\\code.exe"
assert editor.diff_args == ["--diff"] assert editor.diff_args == ["--diff"]
def test_from_dict_without_diff_args(self): def test_from_dict_without_diff_args(self):
data = {"name": "vscode", "path": "C:\\code.exe"} data = {"name": "vscode", "path": "C:\\code.exe"}
editor = TextEditorConfig.from_dict(data) editor = TextEditorConfig.from_dict(data)
assert editor.diff_args == [] assert editor.diff_args == []
def test_to_dict(self, vscode_editor): def test_to_dict(self, vscode_editor):
result = vscode_editor.to_dict() result = vscode_editor.to_dict()
assert result["name"] == "vscode" assert result["name"] == "vscode"
assert result["path"] == "C:\\path\\to\\code.exe" assert result["path"] == "C:\\path\\to\\code.exe"
assert result["diff_args"] == ["--diff"] assert result["diff_args"] == ["--diff"]
class TestExternalEditorConfig: class TestExternalEditorConfig:
def test_from_dict_with_string_editors(self): def test_from_dict_with_string_editors(self):
data = {"editors": {"vscode": "C:\\code.exe"}, "default_editor": "vscode"} data = {"editors": {"vscode": "C:\\code.exe"}, "default_editor": "vscode"}
config = ExternalEditorConfig.from_dict(data) config = ExternalEditorConfig.from_dict(data)
assert "vscode" in config.editors assert "vscode" in config.editors
assert config.editors["vscode"].path == "C:\\code.exe" assert config.editors["vscode"].path == "C:\\code.exe"
def test_from_dict_with_dict_editors(self, vscode_editor): def test_from_dict_with_dict_editors(self, vscode_editor):
data = {"editors": {"vscode": {"name": "vscode", "path": "C:\\code.exe", "diff_args": ["--diff"]}}} data = {"editors": {"vscode": {"name": "vscode", "path": "C:\\code.exe", "diff_args": ["--diff"]}}}
config = ExternalEditorConfig.from_dict(data) config = ExternalEditorConfig.from_dict(data)
assert config.editors["vscode"].diff_args == ["--diff"] assert config.editors["vscode"].diff_args == ["--diff"]
def test_get_default_returns_configured(self, ext_config): def test_get_default_returns_configured(self, ext_config):
result = ext_config.get_default() result = ext_config.get_default()
assert result.name == "vscode" assert result.name == "vscode"
def test_get_default_fallback_to_first(self): def test_get_default_fallback_to_first(self):
config = ExternalEditorConfig(editors={"notepad++": TextEditorConfig(name="notepad++", path="C:\\npp.exe")}) config = ExternalEditorConfig(editors={"notepad++": TextEditorConfig(name="notepad++", path="C:\\npp.exe")})
result = config.get_default() result = config.get_default()
assert result.name == "notepad++" assert result.name == "notepad++"
def test_get_default_returns_none_when_empty(self): def test_get_default_returns_none_when_empty(self):
config = ExternalEditorConfig(editors={}) config = ExternalEditorConfig(editors={})
assert config.get_default() is None assert config.get_default() is None
def test_to_dict(self, ext_config): def test_to_dict(self, ext_config):
result = ext_config.to_dict() result = ext_config.to_dict()
assert result["default_editor"] == "vscode" assert result["default_editor"] == "vscode"
assert "vscode" in result["editors"] assert "vscode" in result["editors"]
class TestExternalEditorLauncher: class TestExternalEditorLauncher:
def test_get_editor_by_name(self, launcher): def test_get_editor_by_name(self, launcher):
editor = launcher.get_editor("notepad++") editor = launcher.get_editor("notepad++")
assert editor.name == "notepad++" assert editor.name == "notepad++"
def test_get_editor_returns_default(self, launcher): def test_get_editor_returns_default(self, launcher):
editor = launcher.get_editor() editor = launcher.get_editor()
assert editor.name == "vscode" assert editor.name == "vscode"
def test_get_editor_unknown_name(self, launcher): def test_get_editor_unknown_name(self, launcher):
editor = launcher.get_editor("unknown") editor = launcher.get_editor("unknown")
assert editor is None assert editor is None
def test_build_diff_command(self, launcher, vscode_editor): def test_build_diff_command(self, launcher, vscode_editor):
cmd = launcher.build_diff_command(vscode_editor, "orig.txt", "mod.txt") cmd = launcher.build_diff_command(vscode_editor, "orig.txt", "mod.txt")
assert cmd == ["C:\\path\\to\\code.exe", "--diff", "orig.txt", "mod.txt"] assert cmd == ["C:\\path\\to\\code.exe", "--diff", "orig.txt", "mod.txt"]
def test_launch_diff_missing_editor(self, launcher): def test_launch_diff_missing_editor(self, launcher):
result = launcher.launch_diff("nonexistent", "orig.txt", "mod.txt") result = launcher.launch_diff("nonexistent", "orig.txt", "mod.txt")
assert result is None assert result is None
@patch("subprocess.Popen") @patch("subprocess.Popen")
def test_launch_diff_success(self, mock_popen, launcher): def test_launch_diff_success(self, mock_popen, launcher):
mock_popen.return_value = MagicMock() mock_popen.return_value = MagicMock()
result = launcher.launch_diff("vscode", "orig.txt", "mod.txt") result = launcher.launch_diff("vscode", "orig.txt", "mod.txt")
assert result is not None assert result is not None
mock_popen.assert_called_once() mock_popen.assert_called_once()
@patch("subprocess.Popen") @patch("subprocess.Popen")
def test_launch_diff_file_not_found(self, mock_popen, launcher): def test_launch_diff_file_not_found(self, mock_popen, launcher):
mock_popen.side_effect = FileNotFoundError() mock_popen.side_effect = FileNotFoundError()
result = launcher.launch_diff("vscode", "orig.txt", "mod.txt") result = launcher.launch_diff("vscode", "orig.txt", "mod.txt")
assert result is None assert result is None
class TestHelperFunctions: class TestHelperFunctions:
def test_create_temp_modified_file(self): def test_create_temp_modified_file(self):
content = "test content" content = "test content"
path = create_temp_modified_file(content) path = create_temp_modified_file(content)
assert path.endswith("_modified") assert path.endswith("_modified")
with open(path, encoding="utf-8") as f: with open(path, encoding="utf-8") as f:
assert f.read() == content assert f.read() == content
import os import os
os.unlink(path) os.unlink(path)
+41 -41
View File
@@ -8,55 +8,55 @@ from src import models
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_external_mcp_hitl_approval(): async def test_external_mcp_hitl_approval():
# 1. Setup mock manager and server # 1. Setup mock manager and server
mock_manager = mcp_client.ExternalMCPManager() mock_manager = mcp_client.ExternalMCPManager()
mock_server = AsyncMock() mock_server = AsyncMock()
mock_server.name = "test-server" mock_server.name = "test-server"
mock_server.tools = {"ext_tool": {"name": "ext_tool", "description": "desc"}} mock_server.tools = {"ext_tool": {"name": "ext_tool", "description": "desc"}}
mock_server.call_tool.return_value = "Success" mock_server.call_tool.return_value = "Success"
mock_manager.servers["test-server"] = mock_server mock_manager.servers["test-server"] = mock_server
with patch("src.mcp_client.get_external_mcp_manager", return_value=mock_manager): with patch("src.mcp_client.get_external_mcp_manager", return_value=mock_manager):
# 2. Setup ai_client callbacks # 2. Setup ai_client callbacks
mock_pre_tool = MagicMock(return_value="Approved") mock_pre_tool = MagicMock(return_value="Approved")
ai_client.confirm_and_run_callback = mock_pre_tool ai_client.confirm_and_run_callback = mock_pre_tool
# 3. Call _execute_single_tool_call_async # 3. Call _execute_single_tool_call_async
name = "ext_tool" name = "ext_tool"
args = {"arg1": "val1"} args = {"arg1": "val1"}
call_id = "call_123" call_id = "call_123"
base_dir = "." base_dir = "."
# We need to pass the callback to the function # We need to pass the callback to the function
name, cid, out, orig_name = await ai_client._execute_single_tool_call_async( name, cid, out, orig_name = await ai_client._execute_single_tool_call_async(
name, args, call_id, base_dir, mock_pre_tool, None, 0 name, args, call_id, base_dir, mock_pre_tool, None, 0
) )
# 4. Assertions # 4. Assertions
assert out == "Success" assert out == "Success"
mock_pre_tool.assert_called_once() mock_pre_tool.assert_called_once()
# Check description contains EXTERNAL MCP # Check description contains EXTERNAL MCP
call_args = mock_pre_tool.call_args[0] call_args = mock_pre_tool.call_args[0]
assert "EXTERNAL MCP TOOL: ext_tool" in call_args[0] assert "EXTERNAL MCP TOOL: ext_tool" in call_args[0]
assert "arg1: 'val1'" in call_args[0] assert "arg1: 'val1'" in call_args[0]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_external_mcp_hitl_rejection(): async def test_external_mcp_hitl_rejection():
mock_manager = mcp_client.ExternalMCPManager() mock_manager = mcp_client.ExternalMCPManager()
mock_server = AsyncMock() mock_server = AsyncMock()
mock_server.name = "test-server" mock_server.name = "test-server"
mock_server.tools = {"ext_tool": {"name": "ext_tool"}} mock_server.tools = {"ext_tool": {"name": "ext_tool"}}
mock_manager.servers["test-server"] = mock_server mock_manager.servers["test-server"] = mock_server
with patch("src.mcp_client.get_external_mcp_manager", return_value=mock_manager): with patch("src.mcp_client.get_external_mcp_manager", return_value=mock_manager):
mock_pre_tool = MagicMock(return_value=None) # Rejection mock_pre_tool = MagicMock(return_value=None) # Rejection
name = "ext_tool" name = "ext_tool"
args = {"arg1": "val1"} args = {"arg1": "val1"}
name, cid, out, orig_name = await ai_client._execute_single_tool_call_async( name, cid, out, orig_name = await ai_client._execute_single_tool_call_async(
name, args, "id", ".", mock_pre_tool, None, 0 name, args, "id", ".", mock_pre_tool, None, 0
) )
assert out == "USER REJECTED: tool execution cancelled" assert out == "USER REJECTED: tool execution cancelled"
mock_server.call_tool.assert_not_called() mock_server.call_tool.assert_not_called()
+49 -49
View File
@@ -3,57 +3,57 @@ from src.fuzzy_anchor import FuzzyAnchor
class TestFuzzyAnchor: class TestFuzzyAnchor:
def test_create_slice_basic(self): def test_create_slice_basic(self):
text = "line0\nline1\nline2\nline3\nline4\n" text = "line0\nline1\nline2\nline3\nline4\n"
result = FuzzyAnchor.create_slice(text, 2, 4) result = FuzzyAnchor.create_slice(text, 2, 4)
assert "start_line" in result assert "start_line" in result
assert "end_line" in result assert "end_line" in result
assert "content_hash" in result assert "content_hash" in result
assert "start_context" in result assert "start_context" in result
assert "end_context" in result assert "end_context" in result
assert result["start_line"] == 2 assert result["start_line"] == 2
assert result["end_line"] == 4 assert result["end_line"] == 4
assert result["start_context"] == result["end_context"] assert result["start_context"] == result["end_context"]
def test_resolve_slice_exact_match(self): def test_resolve_slice_exact_match(self):
text = "line0\nline1\nline2\nline3\nline4\n" text = "line0\nline1\nline2\nline3\nline4\n"
slc = FuzzyAnchor.create_slice(text, 2, 4) slc = FuzzyAnchor.create_slice(text, 2, 4)
result = FuzzyAnchor.resolve_slice(text, slc) result = FuzzyAnchor.resolve_slice(text, slc)
assert result is not None assert result is not None
start, end = result start, end = result
assert start == 2 assert start == 2
assert end == 4 assert end == 4
def test_resolve_slice_line_inserted_before(self): def test_resolve_slice_line_inserted_before(self):
original = "line0\nline1\nline2\nline3\nline4\n" original = "line0\nline1\nline2\nline3\nline4\n"
modified = "NEW\nline0\nline1\nline2\nline3\nline4\n" modified = "NEW\nline0\nline1\nline2\nline3\nline4\n"
slc = FuzzyAnchor.create_slice(original, 2, 4) slc = FuzzyAnchor.create_slice(original, 2, 4)
result = FuzzyAnchor.resolve_slice(modified, slc) result = FuzzyAnchor.resolve_slice(modified, slc)
assert result is not None assert result is not None
start, end = result start, end = result
assert start == 3 assert start == 3
assert end == 5 assert end == 5
def test_resolve_slice_line_deleted_before_returns_none(self): def test_resolve_slice_line_deleted_before_returns_none(self):
original = "line0\nline1\nline2\nline3\nline4\n" original = "line0\nline1\nline2\nline3\nline4\n"
modified = "line0\nline2\nline3\nline4\n" modified = "line0\nline2\nline3\nline4\n"
slc = FuzzyAnchor.create_slice(original, 2, 4) slc = FuzzyAnchor.create_slice(original, 2, 4)
result = FuzzyAnchor.resolve_slice(modified, slc) result = FuzzyAnchor.resolve_slice(modified, slc)
assert result is None assert result is None
def test_resolve_slice_multiple_lines_changed(self): def test_resolve_slice_multiple_lines_changed(self):
original = "line0\nline1\nline2\nline3\nline4\n" original = "line0\nline1\nline2\nline3\nline4\n"
modified = "a\nb\nc\nd\ne\nline0\nline1\nline2\nline3\nline4\n" modified = "a\nb\nc\nd\ne\nline0\nline1\nline2\nline3\nline4\n"
slc = FuzzyAnchor.create_slice(original, 1, 2) slc = FuzzyAnchor.create_slice(original, 1, 2)
result = FuzzyAnchor.resolve_slice(modified, slc) result = FuzzyAnchor.resolve_slice(modified, slc)
assert result is not None assert result is not None
start, end = result start, end = result
assert start == 6 assert start == 6
assert end == 7 assert end == 7
def test_resolve_slice_anchor_mismatch_returns_none(self): def test_resolve_slice_anchor_mismatch_returns_none(self):
original = "alpha\nbeta\ngamma\ndelta\nepsilon\n" original = "alpha\nbeta\ngamma\ndelta\nepsilon\n"
modified = "foo\nbar\nbaz\ndelta\nepsilon\n" modified = "foo\nbar\nbaz\ndelta\nepsilon\n"
slc = FuzzyAnchor.create_slice(original, 2, 3) slc = FuzzyAnchor.create_slice(original, 2, 3)
result = FuzzyAnchor.resolve_slice(modified, slc) result = FuzzyAnchor.resolve_slice(modified, slc)
assert result is None assert result is None
+75 -75
View File
@@ -4,82 +4,82 @@ from src.gemini_cli_adapter import GeminiCliAdapter
class TestGeminiCliAdapter: class TestGeminiCliAdapter:
@patch("subprocess.Popen") @patch("subprocess.Popen")
def test_send_starts_subprocess_with_correct_args( def test_send_starts_subprocess_with_correct_args(
self, mock_popen: MagicMock self, mock_popen: MagicMock
) -> None: ) -> None:
adapter = GeminiCliAdapter(binary_path="gemini") adapter = GeminiCliAdapter(binary_path="gemini")
mock_process = MagicMock() mock_process = MagicMock()
mock_process.communicate.return_value = ( mock_process.communicate.return_value = (
'{"type": "message", "content": "hello"}', '{"type": "message", "content": "hello"}',
"", "",
) )
mock_process.returncode = 0 mock_process.returncode = 0
mock_popen.return_value = mock_process mock_popen.return_value = mock_process
adapter.send("test prompt") adapter.send("test prompt")
assert mock_popen.called assert mock_popen.called
args, kwargs = mock_popen.call_args args, kwargs = mock_popen.call_args
cmd_list = args[0] cmd_list = args[0]
assert "gemini" in cmd_list assert "gemini" in cmd_list
assert "--prompt" in cmd_list assert "--prompt" in cmd_list
assert "--output-format" in cmd_list assert "--output-format" in cmd_list
assert "stream-json" in cmd_list assert "stream-json" in cmd_list
@patch("subprocess.Popen") @patch("subprocess.Popen")
def test_send_parses_jsonl_output(self, mock_popen: MagicMock) -> None: def test_send_parses_jsonl_output(self, mock_popen: MagicMock) -> None:
adapter = GeminiCliAdapter() adapter = GeminiCliAdapter()
stdout_str = '{"type": "message", "content": "Hello "}\n{"type": "message", "content": "world!"}\n' stdout_str = '{"type": "message", "content": "Hello "}\n{"type": "message", "content": "world!"}\n'
mock_process = MagicMock() mock_process = MagicMock()
mock_process.communicate.return_value = (stdout_str, "") mock_process.communicate.return_value = (stdout_str, "")
mock_process.returncode = 0 mock_process.returncode = 0
mock_popen.return_value = mock_process mock_popen.return_value = mock_process
result = adapter.send("msg") result = adapter.send("msg")
assert result["text"] == "Hello world!" assert result["text"] == "Hello world!"
@patch("subprocess.Popen") @patch("subprocess.Popen")
def test_send_handles_tool_use_events(self, mock_popen: MagicMock) -> None: def test_send_handles_tool_use_events(self, mock_popen: MagicMock) -> None:
adapter = GeminiCliAdapter() adapter = GeminiCliAdapter()
tool_json = { tool_json = {
"type": "tool_use", "type": "tool_use",
"tool_name": "read_file", "tool_name": "read_file",
"parameters": {"path": "test.txt"}, "parameters": {"path": "test.txt"},
"tool_id": "call_123", "tool_id": "call_123",
} }
stdout_str = json.dumps(tool_json) + "\n" stdout_str = json.dumps(tool_json) + "\n"
mock_process = MagicMock() mock_process = MagicMock()
mock_process.communicate.return_value = (stdout_str, "") mock_process.communicate.return_value = (stdout_str, "")
mock_process.returncode = 0 mock_process.returncode = 0
mock_popen.return_value = mock_process mock_popen.return_value = mock_process
result = adapter.send("msg") result = adapter.send("msg")
assert len(result["tool_calls"]) == 1 assert len(result["tool_calls"]) == 1
assert result["tool_calls"][0]["name"] == "read_file" assert result["tool_calls"][0]["name"] == "read_file"
assert result["tool_calls"][0]["args"]["path"] == "test.txt" assert result["tool_calls"][0]["args"]["path"] == "test.txt"
@patch("subprocess.Popen") @patch("subprocess.Popen")
def test_send_captures_usage_metadata(self, mock_popen: MagicMock) -> None: def test_send_captures_usage_metadata(self, mock_popen: MagicMock) -> None:
adapter = GeminiCliAdapter() adapter = GeminiCliAdapter()
result_json = {"type": "result", "stats": {"total_tokens": 50}} result_json = {"type": "result", "stats": {"total_tokens": 50}}
stdout_str = json.dumps(result_json) + "\n" stdout_str = json.dumps(result_json) + "\n"
mock_process = MagicMock() mock_process = MagicMock()
mock_process.communicate.return_value = (stdout_str, "") mock_process.communicate.return_value = (stdout_str, "")
mock_process.returncode = 0 mock_process.returncode = 0
mock_popen.return_value = mock_process mock_popen.return_value = mock_process
adapter.send("msg") adapter.send("msg")
assert adapter.last_usage is not None assert adapter.last_usage is not None
assert adapter.last_usage.get("total_tokens") == 50 assert adapter.last_usage.get("total_tokens") == 50
@patch("subprocess.Popen") @patch("subprocess.Popen")
def test_full_flow_integration(self, mock_popen: MagicMock) -> None: def test_full_flow_integration(self, mock_popen: MagicMock) -> None:
adapter = GeminiCliAdapter() adapter = GeminiCliAdapter()
msg_json = {"type": "message", "content": "Final response"} msg_json = {"type": "message", "content": "Final response"}
result_json = { result_json = {
"type": "result", "type": "result",
"stats": {"total_tokens": 25, "input_tokens": 10, "output_tokens": 15}, "stats": {"total_tokens": 25, "input_tokens": 10, "output_tokens": 15},
} }
stdout_str = json.dumps(msg_json) + "\n" + json.dumps(result_json) + "\n" stdout_str = json.dumps(msg_json) + "\n" + json.dumps(result_json) + "\n"
mock_process = MagicMock() mock_process = MagicMock()
mock_process.communicate.return_value = (stdout_str, "") mock_process.communicate.return_value = (stdout_str, "")
mock_process.returncode = 0 mock_process.returncode = 0
mock_popen.return_value = mock_process mock_popen.return_value = mock_process
result = adapter.send("test") result = adapter.send("test")
assert "Final response" in result["text"] assert "Final response" in result["text"]
+48 -48
View File
@@ -5,60 +5,60 @@ from src import gui_2
@pytest.fixture @pytest.fixture
def mock_gui(): def mock_gui():
gui = gui_2.App() gui = gui_2.App()
gui.project = { gui.project = {
'discussion': { 'discussion': {
'active': 'main', 'active': 'main',
'discussions': { 'discussions': {
'main': {'history': []}, 'main': {'history': []},
'main_take_1': {'history': []}, 'main_take_1': {'history': []},
'other_topic': {'history': []} 'other_topic': {'history': []}
} }
} }
} }
gui.active_discussion = 'main' gui.active_discussion = 'main'
gui.perf_profiling_enabled = False gui.perf_profiling_enabled = False
gui.is_viewing_prior_session = False gui.is_viewing_prior_session = False
gui._get_discussion_names = lambda: ['main', 'main_take_1', 'other_topic'] gui._get_discussion_names = lambda: ['main', 'main_take_1', 'other_topic']
return gui return gui
def test_discussion_tabs_rendered(mock_gui): def test_discussion_tabs_rendered(mock_gui):
with patch('src.gui_2.imgui') as mock_imgui, \ with patch('src.gui_2.imgui') as mock_imgui, \
patch('src.gui_2.imscope') as mock_imscope, \ patch('src.gui_2.imscope') as mock_imscope, \
patch('src.imgui_scopes.imgui', new=mock_imgui), \ patch('src.imgui_scopes.imgui', new=mock_imgui), \
patch('src.app_controller.AppController.active_project_root', new_callable=PropertyMock, return_value='.'): patch('src.app_controller.AppController.active_project_root', new_callable=PropertyMock, return_value='.'):
# Setup imscope mocks # Setup imscope mocks
mock_imscope.window.return_value.__enter__.return_value = (True, True) mock_imscope.window.return_value.__enter__.return_value = (True, True)
mock_imscope.child.return_value.__enter__.return_value = True mock_imscope.child.return_value.__enter__.return_value = True
mock_imscope.table.return_value.__enter__.return_value = True mock_imscope.table.return_value.__enter__.return_value = True
mock_imscope.tree_node_ex.return_value.__enter__.return_value = True mock_imscope.tree_node_ex.return_value.__enter__.return_value = True
mock_imscope.tab_item.return_value.__enter__.return_value = (True, True) mock_imscope.tab_item.return_value.__enter__.return_value = (True, True)
mock_imscope.style_color.return_value.__enter__.return_value = None mock_imscope.style_color.return_value.__enter__.return_value = None
mock_imscope.style_var.return_value.__enter__.return_value = None mock_imscope.style_var.return_value.__enter__.return_value = None
# We expect a combo box for base discussion # We expect a combo box for base discussion
mock_imgui.begin_combo.return_value = True mock_imgui.begin_combo.return_value = True
mock_imgui.selectable.return_value = (False, False) mock_imgui.selectable.return_value = (False, False)
# We expect a tab bar for takes # We expect a tab bar for takes
mock_imgui.begin_tab_bar.return_value = True mock_imgui.begin_tab_bar.return_value = True
mock_imgui.begin_tab_item.return_value = (True, True) mock_imgui.begin_tab_item.return_value = (True, True)
mock_imgui.input_text.return_value = (False, "") mock_imgui.input_text.return_value = (False, "")
mock_imgui.input_text_multiline.return_value = (False, "") mock_imgui.input_text_multiline.return_value = (False, "")
mock_imgui.checkbox.return_value = (False, False) mock_imgui.checkbox.return_value = (False, False)
mock_imgui.input_int.return_value = (False, 0) mock_imgui.input_int.return_value = (False, 0)
mock_clipper = MagicMock() mock_clipper = MagicMock()
mock_clipper.step.return_value = False mock_clipper.step.return_value = False
mock_imgui.ListClipper.return_value = mock_clipper mock_imgui.ListClipper.return_value = mock_clipper
mock_gui._render_discussion_panel() mock_gui._render_discussion_panel()
mock_imgui.begin_combo.assert_called_once_with("##disc_sel", 'main') mock_imgui.begin_combo.assert_called_once_with("##disc_sel", 'main')
mock_imgui.begin_tab_bar.assert_called_once_with('discussion_takes_tabs') mock_imgui.begin_tab_bar.assert_called_once_with('discussion_takes_tabs')
calls = [c[0][0] for c in mock_imscope.tab_item.call_args_list] calls = [c[0][0] for c in mock_imscope.tab_item.call_args_list]
assert 'Original###main' in calls assert 'Original###main' in calls
assert 'Take 1###main_take_1' in calls assert 'Take 1###main_take_1' in calls
assert 'Synthesis###Synthesis' in calls assert 'Synthesis###Synthesis' in calls
+29 -29
View File
@@ -9,32 +9,32 @@ from src.api_hook_client import ApiHookClient
def test_comms_volume_stress_performance(live_gui) -> None: def test_comms_volume_stress_performance(live_gui) -> None:
time.sleep(5.0) time.sleep(5.0)
client = ApiHookClient() client = ApiHookClient()
time.sleep(2.0) time.sleep(2.0)
baseline_resp = client.get_performance() baseline_resp = client.get_performance()
baseline = baseline_resp.get("performance", {}) baseline = baseline_resp.get("performance", {})
baseline_ft = baseline.get("last_frame_time_ms", 0.0) baseline_ft = baseline.get("last_frame_time_ms", 0.0)
large_session = [] large_session = []
for i in range(50): for i in range(50):
large_session.append( large_session.append(
{ {
"role": "User", "role": "User",
"content": f"Stress test entry {i} " * 5, "content": f"Stress test entry {i} " * 5,
"ts": time.time(), "ts": time.time(),
"collapsed": False, "collapsed": False,
} }
) )
client.post_session(large_session) client.post_session(large_session)
time.sleep(1.0) time.sleep(1.0)
stress_resp = client.get_performance() stress_resp = client.get_performance()
stress = stress_resp.get("performance", {}) stress = stress_resp.get("performance", {})
stress_ft = stress.get("last_frame_time_ms", 0.0) stress_ft = stress.get("last_frame_time_ms", 0.0)
print(f"Baseline FT: {baseline_ft:.2f}ms, Stress FT: {stress_ft:.2f}ms") print(f"Baseline FT: {baseline_ft:.2f}ms, Stress FT: {stress_ft:.2f}ms")
if stress_ft > 0: if stress_ft > 0:
assert stress_ft < 100.0, ( assert stress_ft < 100.0, (
f"Stress frame time {stress_ft:.2f}ms exceeds 10fps threshold" f"Stress frame time {stress_ft:.2f}ms exceeds 10fps threshold"
) )
session_data = client.get_session() session_data = client.get_session()
entries = session_data.get("session", {}).get("entries", []) entries = session_data.get("session", {}).get("entries", [])
assert len(entries) >= 50, f"Expected at least 50 entries, got {len(entries)}" assert len(entries) >= 50, f"Expected at least 50 entries, got {len(entries)}"
+87 -87
View File
@@ -3,97 +3,97 @@ from src.history import HistoryManager, UISnapshot
class TestHistoryManager: class TestHistoryManager:
def test_push_and_undo(self): def test_push_and_undo(self):
hm = HistoryManager(max_capacity=10) hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "initial") hm.push({"value": 1}, "initial")
hm.push({"value": 2}, "change to 2") hm.push({"value": 2}, "change to 2")
assert hm.can_undo is True assert hm.can_undo is True
result = hm.undo(current_state={"value": 2}) result = hm.undo(current_state={"value": 2})
assert result is not None assert result is not None
assert result.state["value"] == 2 assert result.state["value"] == 2
assert hm.can_undo is True assert hm.can_undo is True
def test_undo_and_redo(self): def test_undo_and_redo(self):
hm = HistoryManager(max_capacity=10) hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "initial") hm.push({"value": 1}, "initial")
hm.push({"value": 2}, "change to 2") hm.push({"value": 2}, "change to 2")
assert hm.can_redo is False assert hm.can_redo is False
hm.undo(current_state={"value": 2}) hm.undo(current_state={"value": 2})
assert hm.can_redo is True assert hm.can_redo is True
redone = hm.redo(current_state={"value": 1}) redone = hm.redo(current_state={"value": 1})
assert redone is not None assert redone is not None
assert redone.state["value"] == 2 assert redone.state["value"] == 2
def test_undo_no_history_returns_none(self): def test_undo_no_history_returns_none(self):
hm = HistoryManager(max_capacity=10) hm = HistoryManager(max_capacity=10)
assert hm.can_undo is False assert hm.can_undo is False
result = hm.undo(current_state={"value": 1}) result = hm.undo(current_state={"value": 1})
assert result is None assert result is None
def test_redo_no_history_returns_none(self): def test_redo_no_history_returns_none(self):
hm = HistoryManager(max_capacity=10) hm = HistoryManager(max_capacity=10)
assert hm.can_redo is False assert hm.can_redo is False
result = hm.redo(current_state={"value": 1}) result = hm.redo(current_state={"value": 1})
assert result is None assert result is None
def test_jump_to_undo(self): def test_jump_to_undo(self):
hm = HistoryManager(max_capacity=10) hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "initial") hm.push({"value": 1}, "initial")
hm.push({"value": 2}, "change to 2") hm.push({"value": 2}, "change to 2")
hm.push({"value": 3}, "change to 3") hm.push({"value": 3}, "change to 3")
result = hm.jump_to_undo(0, current_state={"value": 3}) result = hm.jump_to_undo(0, current_state={"value": 3})
assert result is not None assert result is not None
assert result.state["value"] == 1 assert result.state["value"] == 1
assert hm.can_redo is True assert hm.can_redo is True
def test_get_history_returns_descriptions(self): def test_get_history_returns_descriptions(self):
hm = HistoryManager(max_capacity=10) hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "first") hm.push({"value": 1}, "first")
hm.push({"value": 2}, "second") hm.push({"value": 2}, "second")
hm.push({"value": 3}, "third") hm.push({"value": 3}, "third")
history = hm.get_history() history = hm.get_history()
assert len(history) == 3 assert len(history) == 3
assert history[0]["description"] == "first" assert history[0]["description"] == "first"
assert history[1]["description"] == "second" assert history[1]["description"] == "second"
assert "timestamp" in history[0] assert "timestamp" in history[0]
def test_snapshot_roundtrip(self): def test_snapshot_roundtrip(self):
snap = UISnapshot( snap = UISnapshot(
ai_input="test input", ai_input="test input",
project_system_prompt="project prompt", project_system_prompt="project prompt",
global_system_prompt="global prompt", global_system_prompt="global prompt",
base_system_prompt="base prompt", base_system_prompt="base prompt",
use_default_base_prompt=True, use_default_base_prompt=True,
temperature=0.5, temperature=0.5,
top_p=0.9, top_p=0.9,
max_tokens=8192, max_tokens=8192,
auto_add_history=True, auto_add_history=True,
disc_entries=[{"role": "user", "content": "hello"}], disc_entries=[{"role": "user", "content": "hello"}],
files=[{"path": "a.txt"}], files=[{"path": "a.txt"}],
context_files=[], context_files=[],
screenshots=["screenshot.png"], screenshots=["screenshot.png"],
) )
d = snap.to_dict() d = snap.to_dict()
restored = UISnapshot.from_dict(d) restored = UISnapshot.from_dict(d)
assert restored.ai_input == snap.ai_input assert restored.ai_input == snap.ai_input
assert restored.project_system_prompt == snap.project_system_prompt assert restored.project_system_prompt == snap.project_system_prompt
assert restored.global_system_prompt == snap.global_system_prompt assert restored.global_system_prompt == snap.global_system_prompt
assert restored.base_system_prompt == snap.base_system_prompt assert restored.base_system_prompt == snap.base_system_prompt
assert restored.use_default_base_prompt == snap.use_default_base_prompt assert restored.use_default_base_prompt == snap.use_default_base_prompt
assert restored.temperature == snap.temperature assert restored.temperature == snap.temperature
assert restored.top_p == snap.top_p assert restored.top_p == snap.top_p
assert restored.max_tokens == snap.max_tokens assert restored.max_tokens == snap.max_tokens
assert restored.auto_add_history == snap.auto_add_history assert restored.auto_add_history == snap.auto_add_history
assert restored.disc_entries == snap.disc_entries assert restored.disc_entries == snap.disc_entries
assert restored.files == snap.files assert restored.files == snap.files
assert restored.context_files == snap.context_files assert restored.context_files == snap.context_files
assert restored.screenshots == snap.screenshots assert restored.screenshots == snap.screenshots
def test_push_clears_redo_stack(self): def test_push_clears_redo_stack(self):
hm = HistoryManager(max_capacity=10) hm = HistoryManager(max_capacity=10)
hm.push({"value": 1}, "initial") hm.push({"value": 1}, "initial")
hm.push({"value": 2}, "change to 2") hm.push({"value": 2}, "change to 2")
hm.undo(current_state={"value": 2}) hm.undo(current_state={"value": 2})
assert hm.can_redo is True assert hm.can_redo is True
hm.push({"value": 3}, "change to 3") hm.push({"value": 3}, "change to 3")
assert hm.can_redo is False assert hm.can_redo is False
+76 -76
View File
@@ -6,95 +6,95 @@ import sys
import types import types
def test_hot_module_dataclass_fields(): def test_hot_module_dataclass_fields():
hm = HotModule( hm = HotModule(
name="test_module", name="test_module",
file_path="/path/to/test_module.py", file_path="/path/to/test_module.py",
state_keys=["attr1", "attr2"], state_keys=["attr1", "attr2"],
delegation_targets=["method_a", "method_b"], delegation_targets=["method_a", "method_b"],
) )
assert hm.name == "test_module" assert hm.name == "test_module"
assert hm.file_path == "/path/to/test_module.py" assert hm.file_path == "/path/to/test_module.py"
assert hm.state_keys == ["attr1", "attr2"] assert hm.state_keys == ["attr1", "attr2"]
assert hm.delegation_targets == ["method_a", "method_b"] assert hm.delegation_targets == ["method_a", "method_b"]
def test_hot_reloader_register_and_get(): def test_hot_reloader_register_and_get():
HotReloader.HOT_MODULES.clear() HotReloader.HOT_MODULES.clear()
hm = HotModule(name="test_mod", file_path="/fake/path.py", state_keys=[], delegation_targets=[]) hm = HotModule(name="test_mod", file_path="/fake/path.py", state_keys=[], delegation_targets=[])
HotReloader.register(hm) HotReloader.register(hm)
assert "test_mod" in HotReloader.HOT_MODULES assert "test_mod" in HotReloader.HOT_MODULES
assert HotReloader.HOT_MODULES["test_mod"] is hm assert HotReloader.HOT_MODULES["test_mod"] is hm
def test_hot_reloader_register_duplicate_raises(): def test_hot_reloader_register_duplicate_raises():
HotReloader.HOT_MODULES.clear() HotReloader.HOT_MODULES.clear()
hm1 = HotModule(name="dup", file_path="/a.py", state_keys=[], delegation_targets=[]) hm1 = HotModule(name="dup", file_path="/a.py", state_keys=[], delegation_targets=[])
HotReloader.register(hm1) HotReloader.register(hm1)
with pytest.raises(ValueError, match="already registered"): with pytest.raises(ValueError, match="already registered"):
HotReloader.register(hm1) HotReloader.register(hm1)
def test_hot_reloader_is_error_state(): def test_hot_reloader_is_error_state():
HotReloader.HOT_MODULES.clear() HotReloader.HOT_MODULES.clear()
HotReloader.last_error = None HotReloader.last_error = None
HotReloader.is_error_state = False HotReloader.is_error_state = False
assert HotReloader.is_error_state is False assert HotReloader.is_error_state is False
def test_reload_unknown_module_returns_false(): def test_reload_unknown_module_returns_false():
HotReloader.HOT_MODULES.clear() HotReloader.HOT_MODULES.clear()
HotReloader.register(HotModule(name="nonexistent_mod", file_path="/nonexistent.py", state_keys=[], delegation_targets=[])) HotReloader.register(HotModule(name="nonexistent_mod", file_path="/nonexistent.py", state_keys=[], delegation_targets=[]))
app = MagicMock() app = MagicMock()
result = HotReloader.reload("nonexistent_mod", app) result = HotReloader.reload("nonexistent_mod", app)
assert result is False assert result is False
assert HotReloader.is_error_state is True assert HotReloader.is_error_state is True
assert HotReloader.last_error is not None assert HotReloader.last_error is not None
def test_reload_success_clears_error_state(): def test_reload_success_clears_error_state():
HotReloader.HOT_MODULES.clear() HotReloader.HOT_MODULES.clear()
test_mod = types.ModuleType("src._test_reload_mod_src") test_mod = types.ModuleType("src._test_reload_mod_src")
sys.modules["src._test_reload_mod_src"] = test_mod sys.modules["src._test_reload_mod_src"] = test_mod
HotReloader.register(HotModule(name="src._test_reload_mod_src", file_path="/fake.py", state_keys=[], delegation_targets=[])) HotReloader.register(HotModule(name="src._test_reload_mod_src", file_path="/fake.py", state_keys=[], delegation_targets=[]))
app = MagicMock() app = MagicMock()
HotReloader.is_error_state = True HotReloader.is_error_state = True
HotReloader.last_error = "previous error" HotReloader.last_error = "previous error"
with patch("importlib.reload", return_value=test_mod): with patch("importlib.reload", return_value=test_mod):
result = HotReloader.reload("src._test_reload_mod_src", app) result = HotReloader.reload("src._test_reload_mod_src", app)
assert result is True assert result is True
assert HotReloader.is_error_state is False assert HotReloader.is_error_state is False
assert HotReloader.last_error is None assert HotReloader.last_error is None
del sys.modules["src._test_reload_mod_src"] del sys.modules["src._test_reload_mod_src"]
def test_reload_captures_and_restores_state_on_failure(): def test_reload_captures_and_restores_state_on_failure():
HotReloader.HOT_MODULES.clear() HotReloader.HOT_MODULES.clear()
HotReloader.register(HotModule(name="bad_mod", file_path="/bad.py", state_keys=["_test_attr"], delegation_targets=[])) HotReloader.register(HotModule(name="bad_mod", file_path="/bad.py", state_keys=["_test_attr"], delegation_targets=[]))
app = MagicMock() app = MagicMock()
app._test_attr = "preserved_value" app._test_attr = "preserved_value"
result = HotReloader.reload("bad_mod", app) result = HotReloader.reload("bad_mod", app)
assert result is False assert result is False
assert HotReloader.is_error_state is True assert HotReloader.is_error_state is True
assert app._test_attr == "preserved_value" assert app._test_attr == "preserved_value"
def test_reload_all_success(): def test_reload_all_success():
HotReloader.HOT_MODULES.clear() HotReloader.HOT_MODULES.clear()
mod1 = types.ModuleType("hr_test_mod1") mod1 = types.ModuleType("hr_test_mod1")
mod2 = types.ModuleType("hr_test_mod2") mod2 = types.ModuleType("hr_test_mod2")
sys.modules["hr_test_mod1"] = mod1 sys.modules["hr_test_mod1"] = mod1
sys.modules["hr_test_mod2"] = mod2 sys.modules["hr_test_mod2"] = mod2
HotReloader.register(HotModule(name="hr_test_mod1", file_path="/fake1.py", state_keys=[], delegation_targets=[])) HotReloader.register(HotModule(name="hr_test_mod1", file_path="/fake1.py", state_keys=[], delegation_targets=[]))
HotReloader.register(HotModule(name="hr_test_mod2", file_path="/fake2.py", state_keys=[], delegation_targets=[])) HotReloader.register(HotModule(name="hr_test_mod2", file_path="/fake2.py", state_keys=[], delegation_targets=[]))
app = MagicMock() app = MagicMock()
with patch("importlib.reload", return_value=mod1): with patch("importlib.reload", return_value=mod1):
result = HotReloader.reload_all(app) result = HotReloader.reload_all(app)
assert result is True assert result is True
assert HotReloader.is_error_state is False assert HotReloader.is_error_state is False
del sys.modules["hr_test_mod1"] del sys.modules["hr_test_mod1"]
del sys.modules["hr_test_mod2"] del sys.modules["hr_test_mod2"]
def test_reload_all_partial_failure(): def test_reload_all_partial_failure():
HotReloader.HOT_MODULES.clear() HotReloader.HOT_MODULES.clear()
mod1 = types.ModuleType("hr_test_mod1") mod1 = types.ModuleType("hr_test_mod1")
sys.modules["hr_test_mod1"] = mod1 sys.modules["hr_test_mod1"] = mod1
HotReloader.register(HotModule(name="hr_test_mod1", file_path="/fake1.py", state_keys=[], delegation_targets=[])) HotReloader.register(HotModule(name="hr_test_mod1", file_path="/fake1.py", state_keys=[], delegation_targets=[]))
HotReloader.register(HotModule(name="hr_nonexistent", file_path="/nonexistent.py", state_keys=[], delegation_targets=[])) HotReloader.register(HotModule(name="hr_nonexistent", file_path="/nonexistent.py", state_keys=[], delegation_targets=[]))
app = MagicMock() app = MagicMock()
result = HotReloader.reload_all(app) result = HotReloader.reload_all(app)
assert result is False assert result is False
assert HotReloader.is_error_state is True assert HotReloader.is_error_state is True
del sys.modules["hr_test_mod1"] del sys.modules["hr_test_mod1"]
+61 -61
View File
@@ -6,70 +6,70 @@ from src.gui_2 import App
@pytest.fixture @pytest.fixture
def app_instance() -> Any: def app_instance() -> Any:
with ( with (
patch("src.models.load_config", return_value={"ai": {}, "projects": {}}), patch("src.models.load_config", return_value={"ai": {}, "projects": {}}),
patch("src.models.save_config"), patch("src.models.save_config"),
patch("src.gui_2.project_manager"), patch("src.gui_2.project_manager"),
patch("src.app_controller.project_manager") as mock_pm, patch("src.app_controller.project_manager") as mock_pm,
patch("src.gui_2.session_logger"), patch("src.gui_2.session_logger"),
patch("src.gui_2.immapp.run"), patch("src.gui_2.immapp.run"),
patch("src.app_controller.AppController._load_active_project"), patch("src.app_controller.AppController._load_active_project"),
patch("src.app_controller.AppController._fetch_models"), patch("src.app_controller.AppController._fetch_models"),
patch.object(App, "_load_fonts"), patch.object(App, "_load_fonts"),
patch.object(App, "_post_init"), patch.object(App, "_post_init"),
patch("src.app_controller.AppController._prune_old_logs"), patch("src.app_controller.AppController._prune_old_logs"),
patch("src.app_controller.AppController.start_services"), patch("src.app_controller.AppController.start_services"),
patch("src.app_controller.AppController._init_ai_and_hooks"), patch("src.app_controller.AppController._init_ai_and_hooks"),
): ):
app = App() app = App()
app.project = {} app.project = {}
app.ui_files_base_dir = "." app.ui_files_base_dir = "."
yield app, mock_pm yield app, mock_pm
def test_mma_dashboard_refresh(app_instance: Any) -> None: def test_mma_dashboard_refresh(app_instance: Any) -> None:
app, mock_pm = app_instance app, mock_pm = app_instance
mock_tracks = [ mock_tracks = [
{ {
"id": "track_1", "id": "track_1",
"title": "Track 1", "title": "Track 1",
"status": "new", "status": "new",
"complete": 0, "complete": 0,
"total": 0, "total": 0,
"progress": 0.0, "progress": 0.0,
}, },
{ {
"id": "track_2", "id": "track_2",
"title": "Track 2", "title": "Track 2",
"status": "new", "status": "new",
"complete": 0, "complete": 0,
"total": 0, "total": 0,
"progress": 0.0, "progress": 0.0,
}, },
] ]
mock_pm.get_all_tracks.return_value = mock_tracks mock_pm.get_all_tracks.return_value = mock_tracks
app._refresh_from_project() app._refresh_from_project()
assert hasattr(app, "tracks"), "App instance should have a 'tracks' attribute" assert hasattr(app, "tracks"), "App instance should have a 'tracks' attribute"
assert app.tracks == mock_tracks assert app.tracks == mock_tracks
assert len(app.tracks) == 2 assert len(app.tracks) == 2
assert app.tracks[0]["id"] == "track_1" assert app.tracks[0]["id"] == "track_1"
assert app.tracks[1]["id"] == "track_2" assert app.tracks[1]["id"] == "track_2"
mock_pm.get_all_tracks.assert_called_with(app.active_project_root) mock_pm.get_all_tracks.assert_called_with(app.active_project_root)
def test_mma_dashboard_initialization_refresh(app_instance: Any) -> None: def test_mma_dashboard_initialization_refresh(app_instance: Any) -> None:
app, mock_pm = app_instance app, mock_pm = app_instance
mock_tracks = [ mock_tracks = [
{ {
"id": "init_track", "id": "init_track",
"title": "Initial Track", "title": "Initial Track",
"status": "new", "status": "new",
"complete": 0, "complete": 0,
"total": 0, "total": 0,
"progress": 0.0, "progress": 0.0,
} }
] ]
mock_pm.get_all_tracks.return_value = mock_tracks mock_pm.get_all_tracks.return_value = mock_tracks
app._refresh_from_project() app._refresh_from_project()
assert app.tracks == mock_tracks assert app.tracks == mock_tracks
assert app.tracks[0]["id"] == "init_track" assert app.tracks[0]["id"] == "init_track"
+35 -35
View File
@@ -4,48 +4,48 @@ import sys
from imgui_bundle import imgui_node_editor as ed from imgui_bundle import imgui_node_editor as ed
def test_imgui_node_editor_import(): def test_imgui_node_editor_import():
assert ed is not None assert ed is not None
assert hasattr(ed, "begin_node") assert hasattr(ed, "begin_node")
assert hasattr(ed, "end_node") assert hasattr(ed, "end_node")
def test_app_has_node_editor_attrs(): def test_app_has_node_editor_attrs():
from src.gui_2 import App from src.gui_2 import App
# Use patch to avoid initializing the entire App # Use patch to avoid initializing the entire App
with patch('src.app_controller.AppController'), \ with patch('src.app_controller.AppController'), \
patch('src.gui_2.immapp.RunnerParams'), \ patch('src.gui_2.immapp.RunnerParams'), \
patch('imgui_bundle.imgui_node_editor.create_editor'): patch('imgui_bundle.imgui_node_editor.create_editor'):
app = App.__new__(App) app = App.__new__(App)
# Manually set what we expect from __init__ # Manually set what we expect from __init__
app.node_editor_config = ed.Config() app.node_editor_config = ed.Config()
app.node_editor_ctx = ed.create_editor(app.node_editor_config) app.node_editor_ctx = ed.create_editor(app.node_editor_config)
app.ui_selected_ticket_id = None app.ui_selected_ticket_id = None
assert hasattr(app, 'node_editor_config') assert hasattr(app, 'node_editor_config')
assert hasattr(app, 'node_editor_ctx') assert hasattr(app, 'node_editor_ctx')
assert hasattr(app, 'ui_selected_ticket_id') assert hasattr(app, 'ui_selected_ticket_id')
def test_node_id_stability(): def test_node_id_stability():
"""Verify that node/pin IDs generated via hash are stable for the same input.""" """Verify that node/pin IDs generated via hash are stable for the same input."""
tid = "T-001" tid = "T-001"
int_id = abs(hash(tid)) int_id = abs(hash(tid))
in_pin_id = abs(hash(tid + "_in")) in_pin_id = abs(hash(tid + "_in"))
out_pin_id = abs(hash(tid + "_out")) out_pin_id = abs(hash(tid + "_out"))
# Re-generate and compare # Re-generate and compare
assert int_id == abs(hash(tid)) assert int_id == abs(hash(tid))
assert in_pin_id == abs(hash(tid + "_in")) assert in_pin_id == abs(hash(tid + "_in"))
assert out_pin_id == abs(hash(tid + "_out")) assert out_pin_id == abs(hash(tid + "_out"))
# Verify uniqueness # Verify uniqueness
assert int_id != in_pin_id assert int_id != in_pin_id
assert int_id != out_pin_id assert int_id != out_pin_id
assert in_pin_id != out_pin_id assert in_pin_id != out_pin_id
def test_link_id_stability(): def test_link_id_stability():
"""Verify that link IDs are stable.""" """Verify that link IDs are stable."""
source_tid = "T-001" source_tid = "T-001"
target_tid = "T-002" target_tid = "T-002"
link_id = abs(hash(source_tid + "_" + target_tid)) link_id = abs(hash(source_tid + "_" + target_tid))
assert link_id == abs(hash(source_tid + "_" + target_tid)) assert link_id == abs(hash(source_tid + "_" + target_tid))
assert link_id != abs(hash(target_tid + "_" + source_tid)) assert link_id != abs(hash(target_tid + "_" + source_tid))
+89 -89
View File
@@ -8,107 +8,107 @@ from src.gui_2 import App
def test_mma_ui_state_initialization(app_instance: App) -> None: def test_mma_ui_state_initialization(app_instance: App) -> None:
"""Verifies that the new MMA UI state variables are initialized correctly.""" """Verifies that the new MMA UI state variables are initialized correctly."""
assert hasattr(app_instance, "ui_epic_input") assert hasattr(app_instance, "ui_epic_input")
assert hasattr(app_instance, "proposed_tracks") assert hasattr(app_instance, "proposed_tracks")
assert hasattr(app_instance, "_show_track_proposal_modal") assert hasattr(app_instance, "_show_track_proposal_modal")
assert hasattr(app_instance, "mma_streams") assert hasattr(app_instance, "mma_streams")
assert app_instance.ui_epic_input == "" assert app_instance.ui_epic_input == ""
assert app_instance.proposed_tracks == [] assert app_instance.proposed_tracks == []
assert app_instance._show_track_proposal_modal is False assert app_instance._show_track_proposal_modal is False
assert app_instance.mma_streams == {} assert app_instance.mma_streams == {}
def test_process_pending_gui_tasks_show_track_proposal(app_instance: App) -> None: def test_process_pending_gui_tasks_show_track_proposal(app_instance: App) -> None:
"""Verifies that the 'show_track_proposal' action correctly updates the UI state.""" """Verifies that the 'show_track_proposal' action correctly updates the UI state."""
mock_tracks = [{"id": "track_1", "title": "Test Track"}] mock_tracks = [{"id": "track_1", "title": "Test Track"}]
task = {"action": "show_track_proposal", "payload": mock_tracks} task = {"action": "show_track_proposal", "payload": mock_tracks}
app_instance._pending_gui_tasks.append(task) app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks() app_instance._process_pending_gui_tasks()
assert app_instance.proposed_tracks == mock_tracks assert app_instance.proposed_tracks == mock_tracks
assert app_instance._show_track_proposal_modal is True assert app_instance._show_track_proposal_modal is True
def test_cb_plan_epic_launches_thread(app_instance: App) -> None: def test_cb_plan_epic_launches_thread(app_instance: App) -> None:
"""Verifies that _cb_plan_epic launches a thread and eventually queues a task.""" """Verifies that _cb_plan_epic launches a thread and eventually queues a task."""
app_instance.ui_epic_input = "Develop a new feature" app_instance.ui_epic_input = "Develop a new feature"
app_instance.active_project_path = "test_project.toml" app_instance.active_project_path = "test_project.toml"
mock_tracks = [{"id": "track_1", "title": "Test Track"}] mock_tracks = [{"id": "track_1", "title": "Test Track"}]
with ( with (
patch( patch(
"src.orchestrator_pm.get_track_history_summary", "src.orchestrator_pm.get_track_history_summary",
return_value="History summary", return_value="History summary",
) as mock_get_history, ) as mock_get_history,
patch( patch(
"src.orchestrator_pm.generate_tracks", return_value=mock_tracks "src.orchestrator_pm.generate_tracks", return_value=mock_tracks
) as mock_gen_tracks, ) as mock_gen_tracks,
patch("src.aggregate.build_file_items", return_value=[]), patch("src.aggregate.build_file_items", return_value=[]),
): ):
with ( with (
patch("src.project_manager.load_project", return_value={}), patch("src.project_manager.load_project", return_value={}),
patch("src.project_manager.flat_config", return_value={}), patch("src.project_manager.flat_config", return_value={}),
): ):
app_instance._cb_plan_epic() app_instance._cb_plan_epic()
max_wait = 5 max_wait = 5
start_time = time.time() start_time = time.time()
while ( while (
len(app_instance._pending_gui_tasks) < 3 len(app_instance._pending_gui_tasks) < 3
and time.time() - start_time < max_wait and time.time() - start_time < max_wait
): ):
time.sleep(0.1) time.sleep(0.1)
assert len(app_instance._pending_gui_tasks) >= 3 assert len(app_instance._pending_gui_tasks) >= 3
actions = [t["action"] for t in app_instance._pending_gui_tasks] actions = [t["action"] for t in app_instance._pending_gui_tasks]
assert "handle_ai_response" in actions assert "handle_ai_response" in actions
assert "show_track_proposal" in actions assert "show_track_proposal" in actions
mock_get_history.assert_called_once() mock_get_history.assert_called_once()
mock_gen_tracks.assert_called_once() mock_gen_tracks.assert_called_once()
def test_process_pending_gui_tasks_mma_spawn_approval(app_instance: App) -> None: def test_process_pending_gui_tasks_mma_spawn_approval(app_instance: App) -> None:
"""Verifies that the 'mma_spawn_approval' action correctly updates the UI state.""" """Verifies that the 'mma_spawn_approval' action correctly updates the UI state."""
task = { task = {
"action": "mma_spawn_approval", "action": "mma_spawn_approval",
"ticket_id": "T1", "ticket_id": "T1",
"role": "Tier 3 Worker", "role": "Tier 3 Worker",
"prompt": "Test Prompt", "prompt": "Test Prompt",
"context_md": "Test Context", "context_md": "Test Context",
"dialog_container": [None], "dialog_container": [None],
} }
app_instance._pending_gui_tasks.append(task) app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks() app_instance._process_pending_gui_tasks()
assert app_instance._pending_mma_spawn == task assert app_instance._pending_mma_spawn == task
assert app_instance._mma_spawn_prompt == "Test Prompt" assert app_instance._mma_spawn_prompt == "Test Prompt"
assert app_instance._mma_spawn_context == "Test Context" assert app_instance._mma_spawn_context == "Test Context"
assert app_instance._mma_spawn_open is True assert app_instance._mma_spawn_open is True
assert app_instance._mma_spawn_edit_mode is False assert app_instance._mma_spawn_edit_mode is False
assert task["dialog_container"][0] is not None assert task["dialog_container"][0] is not None
def test_handle_ai_response_with_stream_id(app_instance: App) -> None: def test_handle_ai_response_with_stream_id(app_instance: App) -> None:
"""Verifies routing to mma_streams.""" """Verifies routing to mma_streams."""
task = { task = {
"action": "handle_ai_response", "action": "handle_ai_response",
"payload": { "payload": {
"text": "Tier 1 Strategy Content", "text": "Tier 1 Strategy Content",
"stream_id": "Tier 1", "stream_id": "Tier 1",
"status": "Thinking...", "status": "Thinking...",
}, },
} }
app_instance._pending_gui_tasks.append(task) app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks() app_instance._process_pending_gui_tasks()
assert app_instance.mma_streams.get("Tier 1") == "Tier 1 Strategy Content" assert app_instance.mma_streams.get("Tier 1") == "Tier 1 Strategy Content"
assert app_instance.ai_status == "Thinking..." assert app_instance.ai_status == "Thinking..."
assert app_instance.ai_response == "" assert app_instance.ai_response == ""
def test_handle_ai_response_fallback(app_instance: App) -> None: def test_handle_ai_response_fallback(app_instance: App) -> None:
"""Verifies fallback to ai_response when stream_id is missing.""" """Verifies fallback to ai_response when stream_id is missing."""
task = { task = {
"action": "handle_ai_response", "action": "handle_ai_response",
"payload": {"text": "Regular AI Response", "status": "done"}, "payload": {"text": "Regular AI Response", "status": "done"},
} }
app_instance._pending_gui_tasks.append(task) app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks() app_instance._process_pending_gui_tasks()
assert app_instance.ai_response == "Regular AI Response" assert app_instance.ai_response == "Regular AI Response"
assert app_instance.ai_status == "done" assert app_instance.ai_status == "done"
assert len(app_instance.mma_streams) == 0 assert len(app_instance.mma_streams) == 0
+109 -109
View File
@@ -5,140 +5,140 @@ from src import mcp_client
@pytest.fixture @pytest.fixture
def temp_py_file(tmp_path): def temp_py_file(tmp_path):
p = tmp_path / "sample.py" p = tmp_path / "sample.py"
content = """class MyClass: content = """class MyClass:
\"\"\"Docstring.\"\"\" \"\"\"Docstring.\"\"\"
def method1(self): def method1(self):
print("m1") print("m1")
def top_func(): def top_func():
\"\"\"Top doc.\"\"\" \"\"\"Top doc.\"\"\"
print("top") print("top")
""" """
p.write_text(content, encoding="utf-8") p.write_text(content, encoding="utf-8")
return str(p) return str(p)
def test_find_definition_range(): def test_find_definition_range():
source = """class A: source = """class A:
def m(self): pass def m(self): pass
def f(): pass def f(): pass
""" """
assert py_struct_tools.find_definition_range(source, "A") == (1, 2) assert py_struct_tools.find_definition_range(source, "A") == (1, 2)
assert py_struct_tools.find_definition_range(source, "A.m") == (2, 2) assert py_struct_tools.find_definition_range(source, "A.m") == (2, 2)
assert py_struct_tools.find_definition_range(source, "f") == (3, 3) assert py_struct_tools.find_definition_range(source, "f") == (3, 3)
assert py_struct_tools.find_definition_range(source, "nonexistent") is None assert py_struct_tools.find_definition_range(source, "nonexistent") is None
def test_shift_indentation(): def test_shift_indentation():
payload = "def f():\n print('hi')" # 2-space payload = "def f():\n print('hi')" # 2-space
shifted = py_struct_tools.shift_indentation(payload, 1) shifted = py_struct_tools.shift_indentation(payload, 1)
assert shifted == " def f():\n print('hi')" # wait, shift_indentation strips min and prepends. assert shifted == " def f():\n print('hi')" # wait, shift_indentation strips min and prepends.
# Let's re-test shift_indentation logic # Let's re-test shift_indentation logic
# Original: # Original:
# line 1: 'def f():' (0 indent) # line 1: 'def f():' (0 indent)
# line 2: ' print('hi')' (2 indent) # line 2: ' print('hi')' (2 indent)
# min_indent = 0 # min_indent = 0
# Prepend 1 space: # Prepend 1 space:
# ' def f():' # ' def f():'
# ' print('hi')' # ' print('hi')'
# If payload was: # If payload was:
# def f(): # def f():
# print('hi') # print('hi')
# min_indent = 2 # min_indent = 2
# target_depth = 1 # target_depth = 1
# ' def f():' # ' def f():'
# ' print('hi')' # ' print('hi')'
payload2 = " def f():\n print('hi')" payload2 = " def f():\n print('hi')"
shifted2 = py_struct_tools.shift_indentation(payload2, 1) shifted2 = py_struct_tools.shift_indentation(payload2, 1)
assert shifted2 == " def f():\n print('hi')" assert shifted2 == " def f():\n print('hi')"
def test_py_remove_def(temp_py_file): def test_py_remove_def(temp_py_file):
err = py_struct_tools.py_remove_def(temp_py_file, "MyClass.method1") err = py_struct_tools.py_remove_def(temp_py_file, "MyClass.method1")
assert err == "" assert err == ""
with open(temp_py_file, 'r') as f: with open(temp_py_file, 'r') as f:
content = f.read() content = f.read()
assert "def method1" not in content assert "def method1" not in content
assert "class MyClass" in content assert "class MyClass" in content
def test_py_add_def(temp_py_file): def test_py_add_def(temp_py_file):
new_code = "def method2(self):\n print('m2')" new_code = "def method2(self):\n print('m2')"
err = py_struct_tools.py_add_def(temp_py_file, "MyClass", new_code, "after", "method1") err = py_struct_tools.py_add_def(temp_py_file, "MyClass", new_code, "after", "method1")
assert err == "" assert err == ""
with open(temp_py_file, 'r') as f: with open(temp_py_file, 'r') as f:
content = f.read() content = f.read()
assert "def method2" in content assert "def method2" in content
# Check 1-space indentation # Check 1-space indentation
assert " def method2(self):" in content assert " def method2(self):" in content
def test_py_region_wrap(temp_py_file): def test_py_region_wrap(temp_py_file):
err = py_struct_tools.py_region_wrap(temp_py_file, 6, 8, "MyRegion") err = py_struct_tools.py_region_wrap(temp_py_file, 6, 8, "MyRegion")
assert err == "" assert err == ""
with open(temp_py_file, 'r') as f: with open(temp_py_file, 'r') as f:
content = f.read() content = f.read()
assert "#region: MyRegion" in content assert "#region: MyRegion" in content
assert "#endregion: MyRegion" in content assert "#endregion: MyRegion" in content
def test_mcp_dispatch_integration(temp_py_file): def test_mcp_dispatch_integration(temp_py_file):
# Mock allowlist # Mock allowlist
mcp_client.configure([{"path": temp_py_file}]) mcp_client.configure([{"path": temp_py_file}])
# Test py_remove_def # Test py_remove_def
result = mcp_client.dispatch("py_remove_def", {"path": temp_py_file, "name": "top_func"}) result = mcp_client.dispatch("py_remove_def", {"path": temp_py_file, "name": "top_func"})
assert result == "" assert result == ""
with open(temp_py_file, 'r') as f: with open(temp_py_file, 'r') as f:
content = f.read() content = f.read()
assert "def top_func" not in content assert "def top_func" not in content
# Test py_add_def (module level top) # Test py_add_def (module level top)
result = mcp_client.dispatch("py_add_def", { result = mcp_client.dispatch("py_add_def", {
"path": temp_py_file, "path": temp_py_file,
"name": "", "name": "",
"new_content": "def head_func():\n print('head')", "new_content": "def head_func():\n print('head')",
"anchor_type": "top" "anchor_type": "top"
}) })
assert result == "" assert result == ""
with open(temp_py_file, 'r') as f: with open(temp_py_file, 'r') as f:
content = f.read() content = f.read()
assert content.startswith("def head_func") assert content.startswith("def head_func")
# Test py_add_def (class bottom) # Test py_add_def (class bottom)
result = mcp_client.dispatch("py_add_def", { result = mcp_client.dispatch("py_add_def", {
"path": temp_py_file, "path": temp_py_file,
"name": "MyClass", "name": "MyClass",
"new_content": "def tail_method(self):\n print('tail')", "new_content": "def tail_method(self):\n print('tail')",
"anchor_type": "bottom" "anchor_type": "bottom"
}) })
assert result == "" assert result == ""
with open(temp_py_file, 'r') as f: with open(temp_py_file, 'r') as f:
content = f.read() content = f.read()
assert "def tail_method" in content assert "def tail_method" in content
assert " def tail_method(self):" in content # Check indent assert " def tail_method(self):" in content # Check indent
# Test py_move_def (cross-file simulated with same file) # Test py_move_def (cross-file simulated with same file)
# We move method1 to after tail_method # We move method1 to after tail_method
result = mcp_client.dispatch("py_move_def", { result = mcp_client.dispatch("py_move_def", {
"src_path": temp_py_file, "src_path": temp_py_file,
"dest_path": temp_py_file, "dest_path": temp_py_file,
"name": "MyClass.method1", "name": "MyClass.method1",
"dest_name": "MyClass", "dest_name": "MyClass",
"anchor_type": "after", "anchor_type": "after",
"anchor_symbol": "tail_method" "anchor_symbol": "tail_method"
}) })
assert result == "" assert result == ""
with open(temp_py_file, 'r') as f: with open(temp_py_file, 'r') as f:
content = f.read() content = f.read()
# method1 should now be AFTER tail_method # method1 should now be AFTER tail_method
assert content.find("def method1") > content.find("def tail_method") assert content.find("def method1") > content.find("def tail_method")
def test_mcp_dispatch_errors(temp_py_file): def test_mcp_dispatch_errors(temp_py_file):
mcp_client.configure([{"path": temp_py_file}]) mcp_client.configure([{"path": temp_py_file}])
# Non-existent symbol # Non-existent symbol
result = mcp_client.dispatch("py_remove_def", {"path": temp_py_file, "name": "NoSuchSymbol"}) result = mcp_client.dispatch("py_remove_def", {"path": temp_py_file, "name": "NoSuchSymbol"})
assert "ERROR" in result or "not found" in result assert "ERROR" in result or "not found" in result
# Denied path # Denied path
result = mcp_client.dispatch("py_remove_def", {"path": "C:/windows/system32/cmd.exe", "name": "foo"}) result = mcp_client.dispatch("py_remove_def", {"path": "C:/windows/system32/cmd.exe", "name": "foo"})
assert "ACCESS DENIED" in result assert "ACCESS DENIED" in result
+66 -66
View File
@@ -7,88 +7,88 @@ from src.models import ThinkingSegment
def test_save_and_load_history_with_thinking_segments(): def test_save_and_load_history_with_thinking_segments():
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
project_path = Path(tmpdir) / "test_project" project_path = Path(tmpdir) / "test_project"
project_path.mkdir() project_path.mkdir()
project_file = project_path / "test_project.toml" project_file = project_path / "test_project.toml"
project_file.write_text("[project]\nname = 'test'\n") project_file.write_text("[project]\nname = 'test'\n")
history_data = { history_data = {
"entries": [ "entries": [
{ {
"role": "AI", "role": "AI",
"content": "Here's the response", "content": "Here's the response",
"thinking_segments": [ "thinking_segments": [
{"content": "Let me think about this...", "marker": "thinking"} {"content": "Let me think about this...", "marker": "thinking"}
], ],
"ts": "2026-03-13T10:00:00", "ts": "2026-03-13T10:00:00",
"collapsed": False, "collapsed": False,
}, },
{ {
"role": "User", "role": "User",
"content": "Hello", "content": "Hello",
"ts": "2026-03-13T09:00:00", "ts": "2026-03-13T09:00:00",
"collapsed": False, "collapsed": False,
}, },
] ]
} }
project_manager.save_project( project_manager.save_project(
{"project": {"name": "test"}}, project_file, disc_data=history_data {"project": {"name": "test"}}, project_file, disc_data=history_data
) )
loaded = project_manager.load_history(project_file) loaded = project_manager.load_history(project_file)
assert "entries" in loaded assert "entries" in loaded
assert len(loaded["entries"]) == 2 assert len(loaded["entries"]) == 2
ai_entry = loaded["entries"][0] ai_entry = loaded["entries"][0]
assert ai_entry["role"] == "AI" assert ai_entry["role"] == "AI"
assert ai_entry["content"] == "Here's the response" assert ai_entry["content"] == "Here's the response"
assert "thinking_segments" in ai_entry assert "thinking_segments" in ai_entry
assert len(ai_entry["thinking_segments"]) == 1 assert len(ai_entry["thinking_segments"]) == 1
assert ( assert (
ai_entry["thinking_segments"][0]["content"] == "Let me think about this..." ai_entry["thinking_segments"][0]["content"] == "Let me think about this..."
) )
user_entry = loaded["entries"][1] user_entry = loaded["entries"][1]
assert user_entry["role"] == "User" assert user_entry["role"] == "User"
assert "thinking_segments" not in user_entry assert "thinking_segments" not in user_entry
def test_entry_to_str_with_thinking(): def test_entry_to_str_with_thinking():
entry = { entry = {
"role": "AI", "role": "AI",
"content": "Response text", "content": "Response text",
"thinking_segments": [{"content": "Thinking...", "marker": "thinking"}], "thinking_segments": [{"content": "Thinking...", "marker": "thinking"}],
"ts": "2026-03-13T10:00:00", "ts": "2026-03-13T10:00:00",
} }
result = project_manager.entry_to_str(entry) result = project_manager.entry_to_str(entry)
assert "@2026-03-13T10:00:00" in result assert "@2026-03-13T10:00:00" in result
assert "AI:" in result assert "AI:" in result
assert "Response text" in result assert "Response text" in result
def test_str_to_entry_with_thinking(): def test_str_to_entry_with_thinking():
raw = "@2026-03-13T10:00:00\nAI:\nResponse text" raw = "@2026-03-13T10:00:00\nAI:\nResponse text"
roles = ["User", "AI", "Vendor API", "System", "Reasoning"] roles = ["User", "AI", "Vendor API", "System", "Reasoning"]
result = project_manager.str_to_entry(raw, roles) result = project_manager.str_to_entry(raw, roles)
assert result["role"] == "AI" assert result["role"] == "AI"
assert result["content"] == "Response text" assert result["content"] == "Response text"
assert "ts" in result assert "ts" in result
def test_clean_nones_removes_thinking(): def test_clean_nones_removes_thinking():
entry = {"role": "AI", "content": "Test", "thinking_segments": None, "ts": None} entry = {"role": "AI", "content": "Test", "thinking_segments": None, "ts": None}
cleaned = project_manager.clean_nones(entry) cleaned = project_manager.clean_nones(entry)
assert "thinking_segments" not in cleaned assert "thinking_segments" not in cleaned
assert "ts" not in cleaned assert "ts" not in cleaned
if __name__ == "__main__": if __name__ == "__main__":
test_save_and_load_history_with_thinking_segments() test_save_and_load_history_with_thinking_segments()
test_entry_to_str_with_thinking() test_entry_to_str_with_thinking()
test_str_to_entry_with_thinking() test_str_to_entry_with_thinking()
test_clean_nones_removes_thinking() test_clean_nones_removes_thinking()
print("All project_manager thinking tests passed!") print("All project_manager thinking tests passed!")
+102 -102
View File
@@ -3,134 +3,134 @@ from src.shell_runner import run_powershell
from src import ai_client from src import ai_client
def test_run_powershell_qa_callback_on_failure(vlogger) -> None: def test_run_powershell_qa_callback_on_failure(vlogger) -> None:
"""Test that qa_callback is called when a powershell command fails (non-zero exit code).""" """Test that qa_callback is called when a powershell command fails (non-zero exit code)."""
qa_callback = MagicMock(return_value="FIX: Check path") qa_callback = MagicMock(return_value="FIX: Check path")
vlogger.log_state("QA Callback Called", False, "pending") vlogger.log_state("QA Callback Called", False, "pending")
# Simulate a failure # Simulate a failure
with patch("subprocess.Popen") as mock_popen: with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock() mock_process = MagicMock()
mock_process.communicate.return_value = ("stdout", "stderr error") mock_process.communicate.return_value = ("stdout", "stderr error")
mock_process.returncode = 1 mock_process.returncode = 1
mock_popen.return_value = mock_process mock_popen.return_value = mock_process
result = run_powershell("invalid_cmd", ".", qa_callback=qa_callback) result = run_powershell("invalid_cmd", ".", qa_callback=qa_callback)
vlogger.log_state("QA Callback Called", "pending", str(qa_callback.called)) vlogger.log_state("QA Callback Called", "pending", str(qa_callback.called))
assert qa_callback.called assert qa_callback.called
assert "QA ANALYSIS:\nFIX: Check path" in result assert "QA ANALYSIS:\nFIX: Check path" in result
vlogger.finalize("Tier 4 Interceptor", "PASS", "Interceptor triggered and result appended.") vlogger.finalize("Tier 4 Interceptor", "PASS", "Interceptor triggered and result appended.")
def test_run_powershell_qa_callback_on_stderr_only(vlogger) -> None: def test_run_powershell_qa_callback_on_stderr_only(vlogger) -> None:
"""Test that qa_callback is called when a powershell command has stderr output, even if exit code is 0.""" """Test that qa_callback is called when a powershell command has stderr output, even if exit code is 0."""
qa_callback = MagicMock(return_value="WARNING: Check permissions") qa_callback = MagicMock(return_value="WARNING: Check permissions")
with patch("subprocess.Popen") as mock_popen: with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock() mock_process = MagicMock()
mock_process.communicate.return_value = ("stdout", "non-fatal warning") mock_process.communicate.return_value = ("stdout", "non-fatal warning")
mock_process.returncode = 0 mock_process.returncode = 0
mock_popen.return_value = mock_process mock_popen.return_value = mock_process
result = run_powershell("cmd_with_warning", ".", qa_callback=qa_callback) result = run_powershell("cmd_with_warning", ".", qa_callback=qa_callback)
assert qa_callback.called assert qa_callback.called
assert "QA ANALYSIS:\nWARNING: Check permissions" in result assert "QA ANALYSIS:\nWARNING: Check permissions" in result
vlogger.finalize("Tier 4 Non-Fatal Interceptor", "PASS", "Interceptor triggered for non-fatal stderr.") vlogger.finalize("Tier 4 Non-Fatal Interceptor", "PASS", "Interceptor triggered for non-fatal stderr.")
def test_run_powershell_no_qa_callback_on_success() -> None: def test_run_powershell_no_qa_callback_on_success() -> None:
qa_callback = MagicMock() qa_callback = MagicMock()
with patch("subprocess.Popen") as mock_popen: with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock() mock_process = MagicMock()
mock_process.communicate.return_value = ("ok", "") mock_process.communicate.return_value = ("ok", "")
mock_process.returncode = 0 mock_process.returncode = 0
mock_popen.return_value = mock_process mock_popen.return_value = mock_process
result = run_powershell("success_cmd", ".", qa_callback=qa_callback) result = run_powershell("success_cmd", ".", qa_callback=qa_callback)
assert not qa_callback.called assert not qa_callback.called
assert "QA ANALYSIS" not in result assert "QA ANALYSIS" not in result
def test_run_powershell_optional_qa_callback() -> None: def test_run_powershell_optional_qa_callback() -> None:
# Should not crash if qa_callback is None # Should not crash if qa_callback is None
with patch("subprocess.Popen") as mock_popen: with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock() mock_process = MagicMock()
mock_process.communicate.return_value = ("error", "error") mock_process.communicate.return_value = ("error", "error")
mock_process.returncode = 1 mock_process.returncode = 1
mock_popen.return_value = mock_process mock_popen.return_value = mock_process
result = run_powershell("fail_no_cb", ".", qa_callback=None) result = run_powershell("fail_no_cb", ".", qa_callback=None)
assert "EXIT CODE: 1" in result assert "EXIT CODE: 1" in result
def test_end_to_end_tier4_integration(vlogger) -> None: def test_end_to_end_tier4_integration(vlogger) -> None:
""" """
1. Start a task that triggers a tool failure. 1. Start a task that triggers a tool failure.
2. Ensure Tier 4 QA analysis is run. 2. Ensure Tier 4 QA analysis is run.
3. Verify the analysis is merged into the next turn's prompt. 3. Verify the analysis is merged into the next turn's prompt.
""" """
# Trigger a send that results in a tool failure # Trigger a send that results in a tool failure
# (In reality, the tool loop handles this) # (In reality, the tool loop handles this)
# For unit testing, we just check if ai_client.send passes the qa_callback # For unit testing, we just check if ai_client.send passes the qa_callback
# to the underlying provider function. # to the underlying provider function.
pass pass
vlogger.finalize("E2E Tier 4 Integration", "PASS", "ai_client.run_tier4_analysis correctly called and results merged.") vlogger.finalize("E2E Tier 4 Integration", "PASS", "ai_client.run_tier4_analysis correctly called and results merged.")
def test_ai_client_passes_qa_callback() -> None: def test_ai_client_passes_qa_callback() -> None:
"""Verifies that ai_client.send passes the qa_callback down to the provider function.""" """Verifies that ai_client.send passes the qa_callback down to the provider function."""
qa_callback = lambda x: "analysis" qa_callback = lambda x: "analysis"
with patch("src.ai_client._send_gemini") as mock_send: with patch("src.ai_client._send_gemini") as mock_send:
ai_client.set_provider("gemini", "gemini-2.5-flash-lite") ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
ai_client.send("ctx", "msg", qa_callback=qa_callback) ai_client.send("ctx", "msg", qa_callback=qa_callback)
args, kwargs = mock_send.call_args args, kwargs = mock_send.call_args
# It might be passed as positional or keyword depending on how 'send' calls it # It might be passed as positional or keyword depending on how 'send' calls it
# send() calls _send_gemini(md_content, user_message, base_dir, ..., qa_callback, ...) # send() calls _send_gemini(md_content, user_message, base_dir, ..., qa_callback, ...)
# In current impl of send(), it is the 7th argument after md_content, user_msg, base_dir, file_items, disc_hist, pre_tool # In current impl of send(), it is the 7th argument after md_content, user_msg, base_dir, file_items, disc_hist, pre_tool
assert args[6] == qa_callback or kwargs.get("qa_callback") == qa_callback assert args[6] == qa_callback or kwargs.get("qa_callback") == qa_callback
def test_gemini_provider_passes_qa_callback_to_run_script() -> None: def test_gemini_provider_passes_qa_callback_to_run_script() -> None:
"""Verifies that _send_gemini passes the qa_callback to _run_script.""" """Verifies that _send_gemini passes the qa_callback to _run_script."""
qa_callback = MagicMock() qa_callback = MagicMock()
# Mock the tool loop behavior # Mock the tool loop behavior
with patch("src.ai_client._run_script", return_value="output") as mock_run_script, \ with patch("src.ai_client._run_script", return_value="output") as mock_run_script, \
patch("src.ai_client._ensure_gemini_client"), \ patch("src.ai_client._ensure_gemini_client"), \
patch("src.ai_client._gemini_client") as mock_gen_client: patch("src.ai_client._gemini_client") as mock_gen_client:
mock_chat = MagicMock() mock_chat = MagicMock()
mock_gen_client.chats.create.return_value = mock_chat mock_gen_client.chats.create.return_value = mock_chat
# 1st round: tool call # 1st round: tool call
mock_fc = MagicMock() mock_fc = MagicMock()
mock_fc.name = "run_powershell" mock_fc.name = "run_powershell"
mock_fc.args = {"script": "dir"} mock_fc.args = {"script": "dir"}
mock_part = MagicMock() mock_part = MagicMock()
mock_part.function_call = mock_fc mock_part.function_call = mock_fc
mock_part.text = "" mock_part.text = ""
mock_candidate = MagicMock() mock_candidate = MagicMock()
mock_candidate.content.parts = [mock_part] mock_candidate.content.parts = [mock_part]
mock_candidate.finish_reason.name = "STOP" mock_candidate.finish_reason.name = "STOP"
mock_resp1 = MagicMock() mock_resp1 = MagicMock()
mock_resp1.candidates = [mock_candidate] mock_resp1.candidates = [mock_candidate]
mock_resp1.usage_metadata.prompt_token_count = 10 mock_resp1.usage_metadata.prompt_token_count = 10
mock_resp1.usage_metadata.candidates_token_count = 5 mock_resp1.usage_metadata.candidates_token_count = 5
mock_resp1.text = "" mock_resp1.text = ""
# 2nd round: final text # 2nd round: final text
mock_resp2 = MagicMock() mock_resp2 = MagicMock()
mock_resp2.candidates = [] mock_resp2.candidates = []
mock_resp2.usage_metadata.prompt_token_count = 20 mock_resp2.usage_metadata.prompt_token_count = 20
mock_resp2.usage_metadata.candidates_token_count = 10 mock_resp2.usage_metadata.candidates_token_count = 10
mock_resp2.text = "done" mock_resp2.text = "done"
mock_chat.send_message.side_effect = [mock_resp1, mock_resp2] mock_chat.send_message.side_effect = [mock_resp1, mock_resp2]
ai_client.set_provider("gemini", "gemini-2.5-flash-lite") ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
ai_client._send_gemini( ai_client._send_gemini(
md_content="Context", md_content="Context",
user_message="Run dir", user_message="Run dir",
base_dir=".", base_dir=".",
qa_callback=qa_callback qa_callback=qa_callback
) )
# Verify _run_script received the qa_callback and patch_callback # Verify _run_script received the qa_callback and patch_callback
mock_run_script.assert_called_with("dir", ".", qa_callback, None) mock_run_script.assert_called_with("dir", ".", qa_callback, None)
+37 -37
View File
@@ -7,56 +7,56 @@ from pathlib import Path
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
def test_persona_aggregation_strategy(): def test_persona_aggregation_strategy():
p = Persona(name="test_persona", aggregation_strategy="summarize") p = Persona(name="test_persona", aggregation_strategy="summarize")
assert p.aggregation_strategy == "summarize" assert p.aggregation_strategy == "summarize"
d = p.to_dict() d = p.to_dict()
assert d.get("aggregation_strategy") == "summarize" assert d.get("aggregation_strategy") == "summarize"
p2 = Persona.from_dict("test2", d) p2 = Persona.from_dict("test2", d)
assert p2.aggregation_strategy == "summarize" assert p2.aggregation_strategy == "summarize"
@patch("src.aggregate.build_markdown_from_items") @patch("src.aggregate.build_markdown_from_items")
def test_app_controller_do_generate_uses_persona_strategy(mock_build): def test_app_controller_do_generate_uses_persona_strategy(mock_build):
mock_build.return_value = "fake_md" mock_build.return_value = "fake_md"
app = app_controller.AppController() app = app_controller.AppController()
app.personas = {} app.personas = {}
p = Persona(name="p1", aggregation_strategy="full") p = Persona(name="p1", aggregation_strategy="full")
app.personas["p1"] = p app.personas["p1"] = p
app.ui_active_persona = "p1" app.ui_active_persona = "p1"
with patch("src.project_manager.flat_config", return_value={"project": {"name": "test"}, "output": {"output_dir": "."}, "files": {"paths": [], "base_dir": "."}}): with patch("src.project_manager.flat_config", return_value={"project": {"name": "test"}, "output": {"output_dir": "."}, "files": {"paths": [], "base_dir": "."}}):
with patch("src.aggregate.build_file_items", return_value=[]): with patch("src.aggregate.build_file_items", return_value=[]):
with patch("pathlib.Path.mkdir"): with patch("pathlib.Path.mkdir"):
with patch("pathlib.Path.write_text"): with patch("pathlib.Path.write_text"):
with patch.object(app, "_flush_to_project"): with patch.object(app, "_flush_to_project"):
with patch.object(app, "_flush_to_config"): with patch.object(app, "_flush_to_config"):
with patch("src.models.save_config"): with patch("src.models.save_config"):
full_md, path, file_items, stable_md, disc = app._do_generate() full_md, path, file_items, stable_md, disc = app._do_generate()
# Verify aggregate.run and build_markdown_no_history received aggregation_strategy="full" # Verify aggregate.run and build_markdown_no_history received aggregation_strategy="full"
# Actually mock_build captures the inner calls of build_markdown_no_history # Actually mock_build captures the inner calls of build_markdown_no_history
call_kwargs = mock_build.call_args[1] call_kwargs = mock_build.call_args[1]
assert call_kwargs.get("aggregation_strategy") == "full" assert call_kwargs.get("aggregation_strategy") == "full"
@patch("src.summarize.summarise_file") @patch("src.summarize.summarise_file")
@patch("src.multi_agent_conductor.ai_client.send") @patch("src.multi_agent_conductor.ai_client.send")
def test_run_worker_lifecycle_uses_strategy(mock_send, mock_summarise, tmp_path): def test_run_worker_lifecycle_uses_strategy(mock_send, mock_summarise, tmp_path):
mock_send.return_value = "fake response" mock_send.return_value = "fake response"
mock_summarise.return_value = "fake summary" mock_summarise.return_value = "fake summary"
test_file = tmp_path / "test.py" test_file = tmp_path / "test.py"
test_file.write_text("def test():\n pass") test_file.write_text("def test():\n pass")
ticket = Ticket(id="1", description="test") ticket = Ticket(id="1", description="test")
context = WorkerContext(ticket_id="1", model_name="test", persona_id="test_persona") context = WorkerContext(ticket_id="1", model_name="test", persona_id="test_persona")
p = Persona(name="test_persona", aggregation_strategy="summarize") p = Persona(name="test_persona", aggregation_strategy="summarize")
with patch("src.personas.PersonaManager.load_all", return_value={"test_persona": p}): with patch("src.personas.PersonaManager.load_all", return_value={"test_persona": p}):
multi_agent_conductor.run_worker_lifecycle( multi_agent_conductor.run_worker_lifecycle(
ticket, context, context_files=[str(test_file)] ticket, context, context_files=[str(test_file)]
) )
# Should have called summarise_file because of the "summarize" strategy # Should have called summarise_file because of the "summarize" strategy
mock_summarise.assert_called_once() mock_summarise.assert_called_once()
+30 -30
View File
@@ -11,40 +11,40 @@ from src import api_hook_client
@pytest.mark.integration @pytest.mark.integration
def test_mma_epic_lifecycle(live_gui) -> None: def test_mma_epic_lifecycle(live_gui) -> None:
client = api_hook_client.ApiHookClient() client = api_hook_client.ApiHookClient()
assert client.wait_for_server(timeout=15) assert client.wait_for_server(timeout=15)
# Reset # Reset
client.click("btn_reset") client.click("btn_reset")
time.sleep(2) time.sleep(2)
# Set provider and path # Set provider and path
client.set_value("current_provider", "gemini_cli") client.set_value("current_provider", "gemini_cli")
time.sleep(2) time.sleep(2)
mock_path = os.path.abspath("tests/mock_gemini_cli.py") mock_path = os.path.abspath("tests/mock_gemini_cli.py")
client.set_value("gcli_path", f'"{sys.executable}" "{mock_path}"') client.set_value("gcli_path", f'"{sys.executable}" "{mock_path}"')
time.sleep(2) time.sleep(2)
# Set epic and click # Set epic and click
client.set_value("mma_epic_input", "Add timestamps") client.set_value("mma_epic_input", "Add timestamps")
time.sleep(1) time.sleep(1)
client.click("btn_mma_plan_epic") client.click("btn_mma_plan_epic")
# Wait and check # Wait and check
for i in range(30): for i in range(30):
time.sleep(1) time.sleep(1)
status = client.get_mma_status() status = client.get_mma_status()
proposed = status.get("proposed_tracks", []) proposed = status.get("proposed_tracks", [])
usage = status.get("mma_tier_usage", {}) usage = status.get("mma_tier_usage", {})
t1 = usage.get("Tier 1", {}) t1 = usage.get("Tier 1", {})
print( print(
f"[{i}] Tier1: in={t1.get('input')}, out={t1.get('output')}, proposed={len(proposed)}", f"[{i}] Tier1: in={t1.get('input')}, out={t1.get('output')}, proposed={len(proposed)}",
flush=True, flush=True,
) )
if proposed: if proposed:
print(f"SUCCESS: {proposed}", flush=True) print(f"SUCCESS: {proposed}", flush=True)
break break
assert len(proposed) > 0, f"No tracks: {proposed}" assert len(proposed) > 0, f"No tracks: {proposed}"