diff --git a/conductor/tracks/fix_indentation_1space_20260516/plan.md b/conductor/tracks/fix_indentation_1space_20260516/plan.md index f1f91839..7c770f73 100644 --- a/conductor/tracks/fix_indentation_1space_20260516/plan.md +++ b/conductor/tracks/fix_indentation_1space_20260516/plan.md @@ -3,49 +3,45 @@ ## Phase 1: Audit and Classification Focus: Identify all files requiring indentation correction -- [ ] Task 1.1: Create indentation audit script to scan all Python files +- [x] Task 1.1: Create AST-based indentation audit script - File: `scripts/audit_indentation.py` - - Detect: Lines with leading whitespace that is not a multiple of 1 space - - Output: List of files with line numbers and current indentation level - -- [ ] Task 1.2: Run audit against src/ directory (51 files) - - File: src/ - - Categorize: Files with 0 issues, 1-10 issues, 10+ issues - -- [ ] Task 1.3: Run audit against tests/ directory - - File: tests/ - -- [ ] Task 1.4: Run audit against scripts/ directory - - File: scripts/ - -- [ ] Task 1.5: Run audit against conductor/ directory - - File: conductor/ + - Method: Use Python AST to track logical nesting depth + - Output: Files with actual violations (not docstring false positives) + +- [x] Task 1.2: Run audit across all directories + - Result: 32 files with violations, 189 total violations ## Phase 2: Correct Indentation - src/ Files -Focus: Fix identified files in src/ (in order of severity) +Focus: Fix identified files in src/ (2 files) -- [ ] Task 2.1: Fix src/ files with 1-10 indentation issues -- [ ] Task 2.2: Fix src/ files with 10+ indentation issues -- [ ] Task 2.3: Verify syntax after each file fix +- [ ] Task 2.1: Fix src/fuzzy_anchor.py (18 violations) +- [ ] Task 2.2: Fix src/patch_modal.py (14 violations) +- [ ] Task 2.3: Verify syntax after each fix - [ ] Task 2.4: Commit each file individually -## Phase 3: Correct Indentation - tests/ Files -Focus: Fix identified files in tests/ +## Phase 3: Correct Indentation - scripts/ Files +Focus: Fix identified files in scripts/ (2 files) -- [ ] Task 3.1: Fix tests/ files with indentation issues -- [ ] Task 3.2: Verify syntax after each file fix -- [ ] Task 3.3: Commit each file individually +- [ ] Task 3.1: Fix scripts/extract_symbols.py (4 violations) +- [ ] Task 3.2: Fix scripts/tasks/download_fonts.py (8 violations) +- [ ] Task 3.3: Verify syntax after each fix +- [ ] Task 3.4: Commit each file individually -## Phase 4: Correct Indentation - scripts/ Files -Focus: Fix identified files in scripts/ +## Phase 4: Correct Indentation - tests/ Files +Focus: Fix identified files in tests/ (28 files) -- [ ] Task 4.1: Fix scripts/ files with indentation issues -- [ ] Task 4.2: Verify syntax after each file fix -- [ ] Task 4.3: Commit each file individually +- [ ] Task 4.1: Fix tests/test_arch_boundary_phase1.py (9 violations) +- [ ] Task 4.2: Fix tests/test_arch_boundary_phase2.py (16 violations) +- [ ] Task 4.3: Fix tests/test_arch_boundary_phase3.py (7 violations) +- [ ] Task 4.4: Fix tests/test_external_editor.py (18 violations) +- [ ] Task 4.5: Fix tests/test_headless_service.py (19 violations) +- [ ] Task 4.6: Fix remaining tests/ files (22 files with fewer violations) +- [ ] Task 4.7: Verify syntax after each fix +- [ ] Task 4.8: Commit each file individually -## Phase 5: Correct Indentation - conductor/ Files -Focus: Fix identified files in conductor/ +## Phase 5: Final Verification +Focus: Ensure no regressions -- [ ] Task 5.1: Fix conductor/ Python files with indentation issues -- [ ] Task 5.2: Verify syntax after each file fix -- [ ] Task 5.3: Commit each file individually \ No newline at end of file +- [ ] Task 5.1: Re-run audit to confirm 0 violations +- [ ] Task 5.2: Run pytest --collect-only to verify syntax +- [ ] Task 5.3: Create checkpoint commit \ No newline at end of file diff --git a/scripts/extract_symbols.py b/scripts/extract_symbols.py index 231f7bae..c7ae3c42 100644 --- a/scripts/extract_symbols.py +++ b/scripts/extract_symbols.py @@ -4,17 +4,17 @@ from pathlib import Path symbols = [] for root in ['src', 'simulation']: - for p in Path(root).rglob('*.py'): - try: - code = p.read_text(encoding='utf-8') - tree = ast.parse(code) - for node in tree.body: - if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): - if not node.name.startswith('_'): - symbols.append((node.name, str(p))) - except Exception: - continue + for p in Path(root).rglob('*.py'): + try: + code = p.read_text(encoding='utf-8') + tree = ast.parse(code) + for node in tree.body: + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): + if not node.name.startswith('_'): + symbols.append((node.name, str(p))) + except Exception: + continue print(f"TOTAL_SYMBOLS:{len(symbols)}") for name, path in symbols: - print(f"SYMBOL:{name}|PATH:{path}") + print(f"SYMBOL:{name}|PATH:{path}") diff --git a/scripts/tasks/download_fonts.py b/scripts/tasks/download_fonts.py index 18de8ca1..bd054719 100644 --- a/scripts/tasks/download_fonts.py +++ b/scripts/tasks/download_fonts.py @@ -9,31 +9,31 @@ ssl._create_default_https_context = ssl._create_unverified_context inter_url = "https://github.com/rsms/inter/releases/download/v4.0/Inter-4.0.zip" print(f"Downloading Inter from {inter_url}") try: - req = urllib.request.Request(inter_url, headers={'User-Agent': 'Mozilla/5.0'}) - with urllib.request.urlopen(req) as response: - with zipfile.ZipFile(io.BytesIO(response.read())) as z: - for info in z.infolist(): - targets = ["Inter-Regular.ttf", "Inter-Bold.ttf", "Inter-Italic.ttf", "Inter-BoldItalic.ttf"] - filename = os.path.basename(info.filename) - if filename in targets: - info.filename = filename - z.extract(info, "assets/fonts/") - print(f"Extracted {info.filename}") + req = urllib.request.Request(inter_url, headers={'User-Agent': 'Mozilla/5.0'}) + with urllib.request.urlopen(req) as response: + with zipfile.ZipFile(io.BytesIO(response.read())) as z: + for info in z.infolist(): + targets = ["Inter-Regular.ttf", "Inter-Bold.ttf", "Inter-Italic.ttf", "Inter-BoldItalic.ttf"] + filename = os.path.basename(info.filename) + if filename in targets: + info.filename = filename + z.extract(info, "assets/fonts/") + print(f"Extracted {info.filename}") except Exception as e: - print(f"Failed to get Inter: {e}") + print(f"Failed to get Inter: {e}") maple_url = "https://github.com/subframe7536/maple-font/releases/download/v6.4/MapleMono-ttf.zip" print(f"Downloading Maple Mono from {maple_url}") try: - req = urllib.request.Request(maple_url, headers={'User-Agent': 'Mozilla/5.0'}) - with urllib.request.urlopen(req) as response: - with zipfile.ZipFile(io.BytesIO(response.read())) as z: - for info in z.infolist(): - targets = ["MapleMono-Regular.ttf", "MapleMono-Bold.ttf", "MapleMono-Italic.ttf", "MapleMono-BoldItalic.ttf"] - filename = os.path.basename(info.filename) - if filename in targets: - info.filename = filename - z.extract(info, "assets/fonts/") - print(f"Extracted {info.filename}") + req = urllib.request.Request(maple_url, headers={'User-Agent': 'Mozilla/5.0'}) + with urllib.request.urlopen(req) as response: + with zipfile.ZipFile(io.BytesIO(response.read())) as z: + for info in z.infolist(): + targets = ["MapleMono-Regular.ttf", "MapleMono-Bold.ttf", "MapleMono-Italic.ttf", "MapleMono-BoldItalic.ttf"] + filename = os.path.basename(info.filename) + if filename in targets: + info.filename = filename + z.extract(info, "assets/fonts/") + print(f"Extracted {info.filename}") except Exception as e: - print(f"Failed to get Maple Mono: {e}") + print(f"Failed to get Maple Mono: {e}") diff --git a/src/fuzzy_anchor.py b/src/fuzzy_anchor.py index ab69f002..f5a1c07d 100644 --- a/src/fuzzy_anchor.py +++ b/src/fuzzy_anchor.py @@ -3,86 +3,86 @@ import re from typing import Optional, Tuple class FuzzyAnchor: - @staticmethod - def get_context(lines: list[str], index: int, count: int, direction: int) -> list[str]: - context = [] - curr = index - while len(context) < count and 0 <= curr < len(lines): - line = lines[curr].strip() - if line: - context.append(line) - curr += direction - return context + @staticmethod + def get_context(lines: list[str], index: int, count: int, direction: int) -> list[str]: + context = [] + curr = index + while len(context) < count and 0 <= curr < len(lines): + line = lines[curr].strip() + if line: + context.append(line) + curr += direction + return context - @classmethod - def create_slice(cls, text: str, start_line: int, end_line: int) -> dict: - """ - start_line and end_line are 1-based. - [C: src/gui_2.py:App._populate_auto_slices, src/gui_2.py:App._render_text_viewer_window, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_create_slice_basic, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations] - """ - lines = text.splitlines() - s_idx = max(0, start_line - 1) - e_idx = min(len(lines), end_line) - slice_lines = lines[s_idx:e_idx] - slice_text = "\n".join(slice_lines) + @classmethod + def create_slice(cls, text: str, start_line: int, end_line: int) -> dict: + """ + start_line and end_line are 1-based. + [C: src/gui_2.py:App._populate_auto_slices, src/gui_2.py:App._render_text_viewer_window, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_create_slice_basic, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations] + """ + lines = text.splitlines() + s_idx = max(0, start_line - 1) + e_idx = min(len(lines), end_line) + slice_lines = lines[s_idx:e_idx] + slice_text = "\n".join(slice_lines) - return { - "start_line": start_line, - "end_line": end_line, - "start_context": cls.get_context(lines, s_idx, 3, 1), - "end_context": cls.get_context(lines, e_idx - 1, 3, -1)[::-1], # Reverse back to normal order - "content_hash": hashlib.mdsafe(slice_text.encode()).hexdigest() if hasattr(hashlib, 'mdsafe') else hashlib.md5(slice_text.encode()).hexdigest() - } + return { + "start_line": start_line, + "end_line": end_line, + "start_context": cls.get_context(lines, s_idx, 3, 1), + "end_context": cls.get_context(lines, e_idx - 1, 3, -1)[::-1], # Reverse back to normal order + "content_hash": hashlib.mdsafe(slice_text.encode()).hexdigest() if hasattr(hashlib, 'mdsafe') else hashlib.md5(slice_text.encode()).hexdigest() + } - @classmethod - def resolve_slice(cls, text: str, slice_data: dict) -> Optional[Tuple[int, int]]: - """ - [C: tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed] - """ - lines = text.splitlines() - # 1. Try exact match - s_idx = slice_data["start_line"] - 1 - e_idx = slice_data["end_line"] - if 0 <= s_idx < len(lines) and e_idx <= len(lines): - current_text = "\n".join(lines[s_idx:e_idx]) - curr_hash = hashlib.md5(current_text.encode()).hexdigest() - if curr_hash == slice_data["content_hash"]: - return (slice_data["start_line"], slice_data["end_line"]) + @classmethod + def resolve_slice(cls, text: str, slice_data: dict) -> Optional[Tuple[int, int]]: + """ + [C: tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed] + """ + lines = text.splitlines() + # 1. Try exact match + s_idx = slice_data["start_line"] - 1 + e_idx = slice_data["end_line"] + if 0 <= s_idx < len(lines) and e_idx <= len(lines): + current_text = "\n".join(lines[s_idx:e_idx]) + curr_hash = hashlib.md5(current_text.encode()).hexdigest() + if curr_hash == slice_data["content_hash"]: + return (slice_data["start_line"], slice_data["end_line"]) - # 2. Fuzzy match - start_ctx = slice_data["start_context"] - end_ctx = slice_data["end_context"] - if not start_ctx or not end_ctx: return None + # 2. Fuzzy match + start_ctx = slice_data["start_context"] + end_ctx = slice_data["end_context"] + if not start_ctx or not end_ctx: return None - # Search for start_ctx - best_s = -1 - for i in range(len(lines)): - match = True - for j, ctx_line in enumerate(start_ctx): - if i+j >= len(lines) or lines[i+j].strip() != ctx_line: - match = False - break - if match: - best_s = i - break + # Search for start_ctx + best_s = -1 + for i in range(len(lines)): + match = True + for j, ctx_line in enumerate(start_ctx): + if i+j >= len(lines) or lines[i+j].strip() != ctx_line: + match = False + break + if match: + best_s = i + break - if best_s == -1: return None + if best_s == -1: return None - # Search for end_ctx after start_ctx - best_e = -1 - for i in range(best_s, len(lines)): - match = True - for j, ctx_line in enumerate(end_ctx): - # end_ctx is the LAST 3 lines. So we match backwards from i. - idx = i - (len(end_ctx) - 1) + j - if idx < 0 or idx >= len(lines) or lines[idx].strip() != ctx_line: - match = False - break - if match: - best_e = i + 1 - break + # Search for end_ctx after start_ctx + best_e = -1 + for i in range(best_s, len(lines)): + match = True + for j, ctx_line in enumerate(end_ctx): + # end_ctx is the LAST 3 lines. So we match backwards from i. + idx = i - (len(end_ctx) - 1) + j + if idx < 0 or idx >= len(lines) or lines[idx].strip() != ctx_line: + match = False + break + if match: + best_e = i + 1 + break - if best_e != -1: - return (best_s + 1, best_e) + if best_e != -1: + return (best_s + 1, best_e) - return None \ No newline at end of file + return None \ No newline at end of file diff --git a/src/patch_modal.py b/src/patch_modal.py index 148d793e..352a1a06 100644 --- a/src/patch_modal.py +++ b/src/patch_modal.py @@ -3,104 +3,104 @@ from dataclasses import dataclass, field @dataclass class PendingPatch: - patch_text: str - file_paths: List[str] - generated_by: str - timestamp: float + patch_text: str + file_paths: List[str] + generated_by: str + timestamp: float class PatchModalManager: - def __init__(self): - self._pending_patch: Optional[PendingPatch] = None - self._show_modal: bool = False - self._on_apply_callback: Optional[Callable[[str], bool]] = None - self._on_reject_callback: Optional[Callable[[], None]] = None + def __init__(self): + self._pending_patch: Optional[PendingPatch] = None + self._show_modal: bool = False + self._on_apply_callback: Optional[Callable[[str], bool]] = None + self._on_reject_callback: Optional[Callable[[], None]] = None - def request_patch_approval(self, patch_text: str, file_paths: List[str], generated_by: str = "Tier 4 QA") -> bool: - """ - [C: tests/test_patch_modal.py:test_close_modal, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset] - """ - from time import time - self._pending_patch = PendingPatch( - patch_text=patch_text, - file_paths=file_paths, - generated_by=generated_by, - timestamp=time() - ) - self._show_modal = True - return True + def request_patch_approval(self, patch_text: str, file_paths: List[str], generated_by: str = "Tier 4 QA") -> bool: + """ + [C: tests/test_patch_modal.py:test_close_modal, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset] + """ + from time import time + self._pending_patch = PendingPatch( + patch_text=patch_text, + file_paths=file_paths, + generated_by=generated_by, + timestamp=time() + ) + self._show_modal = True + return True - def get_pending_patch(self) -> Optional[PendingPatch]: - """ - [C: tests/test_patch_modal.py:test_patch_modal_manager_init, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset] - """ - return self._pending_patch + def get_pending_patch(self) -> Optional[PendingPatch]: + """ + [C: tests/test_patch_modal.py:test_patch_modal_manager_init, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset] + """ + return self._pending_patch - def is_modal_shown(self) -> bool: - """ - [C: tests/test_patch_modal.py:test_close_modal, tests/test_patch_modal.py:test_patch_modal_manager_init, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset] - """ - return self._show_modal + def is_modal_shown(self) -> bool: + """ + [C: tests/test_patch_modal.py:test_close_modal, tests/test_patch_modal.py:test_patch_modal_manager_init, tests/test_patch_modal.py:test_reject_patch, tests/test_patch_modal.py:test_request_patch_approval, tests/test_patch_modal.py:test_reset] + """ + return self._show_modal - def set_apply_callback(self, callback: Callable[[str], bool]) -> None: - """ - [C: tests/test_patch_modal.py:test_apply_callback, tests/test_patch_modal.py:test_reset] - """ - self._on_apply_callback = callback + def set_apply_callback(self, callback: Callable[[str], bool]) -> None: + """ + [C: tests/test_patch_modal.py:test_apply_callback, tests/test_patch_modal.py:test_reset] + """ + self._on_apply_callback = callback - def set_reject_callback(self, callback: Callable[[], None]) -> None: - """ - [C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reset] - """ - self._on_reject_callback = callback + def set_reject_callback(self, callback: Callable[[], None]) -> None: + """ + [C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reset] + """ + self._on_reject_callback = callback - def apply_patch(self, patch_text: str) -> bool: - """ - [C: tests/test_patch_modal.py:test_apply_callback] - """ - if self._on_apply_callback: - return self._on_apply_callback(patch_text) - return False + def apply_patch(self, patch_text: str) -> bool: + """ + [C: tests/test_patch_modal.py:test_apply_callback] + """ + if self._on_apply_callback: + return self._on_apply_callback(patch_text) + return False - def reject_patch(self) -> None: - """ - [C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reject_patch] - """ - self._pending_patch = None - self._show_modal = False - if self._on_reject_callback: - self._on_reject_callback() + def reject_patch(self) -> None: + """ + [C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reject_patch] + """ + self._pending_patch = None + self._show_modal = False + if self._on_reject_callback: + self._on_reject_callback() - def close_modal(self) -> None: - """ - [C: tests/test_patch_modal.py:test_close_modal] - """ - self._show_modal = False + def close_modal(self) -> None: + """ + [C: tests/test_patch_modal.py:test_close_modal] + """ + self._show_modal = False - def reset(self) -> None: - """ - [C: tests/test_patch_modal.py:test_reset] - """ - self._pending_patch = None - self._show_modal = False - self._on_apply_callback = None - self._on_reject_callback = None + def reset(self) -> None: + """ + [C: tests/test_patch_modal.py:test_reset] + """ + self._pending_patch = None + self._show_modal = False + self._on_apply_callback = None + self._on_reject_callback = None _patch_modal_manager: Optional[PatchModalManager] = None def get_patch_modal_manager() -> PatchModalManager: - """ - [C: tests/test_patch_modal.py:test_get_patch_modal_manager_singleton] - """ - global _patch_modal_manager - if _patch_modal_manager is None: - _patch_modal_manager = PatchModalManager() - return _patch_modal_manager + """ + [C: tests/test_patch_modal.py:test_get_patch_modal_manager_singleton] + """ + global _patch_modal_manager + if _patch_modal_manager is None: + _patch_modal_manager = PatchModalManager() + return _patch_modal_manager def reset_patch_modal_manager() -> None: - """ - [C: tests/test_patch_modal.py:test_get_patch_modal_manager_singleton] - """ - global _patch_modal_manager - if _patch_modal_manager: - _patch_modal_manager.reset() - _patch_modal_manager = None \ No newline at end of file + """ + [C: tests/test_patch_modal.py:test_get_patch_modal_manager_singleton] + """ + global _patch_modal_manager + if _patch_modal_manager: + _patch_modal_manager.reset() + _patch_modal_manager = None \ No newline at end of file diff --git a/tests/test_ai_client_cli.py b/tests/test_ai_client_cli.py index dcb66b36..ddc21597 100644 --- a/tests/test_ai_client_cli.py +++ b/tests/test_ai_client_cli.py @@ -3,29 +3,29 @@ from src import ai_client def test_ai_client_send_gemini_cli() -> None: - test_message = "Hello, this is a test prompt for the CLI adapter." - test_response = "This is a dummy response from the Gemini CLI." - ai_client.reset_session() - ai_client.set_provider("gemini_cli", "gemini-2.5-flash-lite") - with patch("src.ai_client.GeminiCliAdapter") as MockAdapterClass: - mock_adapter_instance = MagicMock() - mock_adapter_instance.send.return_value = { - "text": test_response, - "tool_calls": [], - } - mock_adapter_instance.last_usage = {"total_tokens": 100} - mock_adapter_instance.last_latency = 0.5 - mock_adapter_instance.session_id = "test-session" - MockAdapterClass.return_value = mock_adapter_instance - ai_client._gemini_cli_adapter = mock_adapter_instance - with patch.object(ai_client.events, "emit") as mock_emit: - response = ai_client.send( - md_content="", - user_message=test_message, - base_dir=".", - ) - mock_adapter_instance.send.assert_called() - emitted_event_names = [call.args[0] for call in mock_emit.call_args_list] - assert "request_start" in emitted_event_names - assert "response_received" in emitted_event_names - assert response == test_response + test_message = "Hello, this is a test prompt for the CLI adapter." + test_response = "This is a dummy response from the Gemini CLI." + ai_client.reset_session() + ai_client.set_provider("gemini_cli", "gemini-2.5-flash-lite") + with patch("src.ai_client.GeminiCliAdapter") as MockAdapterClass: + mock_adapter_instance = MagicMock() + mock_adapter_instance.send.return_value = { + "text": test_response, + "tool_calls": [], + } + mock_adapter_instance.last_usage = {"total_tokens": 100} + mock_adapter_instance.last_latency = 0.5 + mock_adapter_instance.session_id = "test-session" + MockAdapterClass.return_value = mock_adapter_instance + ai_client._gemini_cli_adapter = mock_adapter_instance + with patch.object(ai_client.events, "emit") as mock_emit: + response = ai_client.send( + md_content="", + user_message=test_message, + base_dir=".", + ) + mock_adapter_instance.send.assert_called() + emitted_event_names = [call.args[0] for call in mock_emit.call_args_list] + assert "request_start" in emitted_event_names + assert "response_received" in emitted_event_names + assert response == test_response diff --git a/tests/test_arch_boundary_phase1.py b/tests/test_arch_boundary_phase1.py index 5cc46512..6e7b16ce 100644 --- a/tests/test_arch_boundary_phase1.py +++ b/tests/test_arch_boundary_phase1.py @@ -6,45 +6,45 @@ import unittest sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) class TestArchBoundaryPhase1(unittest.TestCase): - def setUp(self) -> None: - pass + def setUp(self) -> None: + pass - def tearDown(self) -> None: - pass + def tearDown(self) -> None: + pass - def test_unfettered_modules_constant_removed(self) -> None: - """TEST 1: Check 'UNFETTERED_MODULES' string is removed from project_manager.py""" - # We check the source directly to be sure it's not just hidden - with open("src/project_manager.py", "r", encoding="utf-8") as f: - content = f.read() - self.assertNotIn("UNFETTERED_MODULES", content) + def test_unfettered_modules_constant_removed(self) -> None: + """TEST 1: Check 'UNFETTERED_MODULES' string is removed from project_manager.py""" + # We check the source directly to be sure it's not just hidden + with open("src/project_manager.py", "r", encoding="utf-8") as f: + content = f.read() + self.assertNotIn("UNFETTERED_MODULES", content) - def test_mcp_client_whitelist_enforcement(self) -> None: - """TEST 2: mcp_client._is_allowed must return False for config.toml""" - from src import mcp_client - from pathlib import Path + def test_mcp_client_whitelist_enforcement(self) -> None: + """TEST 2: mcp_client._is_allowed must return False for config.toml""" + from src import mcp_client + from pathlib import Path - # Configure with some dummy file items (as dicts) - file_items = [{"path": "src/gui_2.py"}] - mcp_client.configure(file_items, []) + # Configure with some dummy file items (as dicts) + file_items = [{"path": "src/gui_2.py"}] + mcp_client.configure(file_items, []) - # Should allow src files - self.assertTrue(mcp_client._is_allowed(Path("src/gui_2.py"))) - # Should REJECT config files - self.assertFalse(mcp_client._is_allowed(Path("config.toml"))) - self.assertFalse(mcp_client._is_allowed(Path("credentials.toml"))) + # Should allow src files + self.assertTrue(mcp_client._is_allowed(Path("src/gui_2.py"))) + # Should REJECT config files + self.assertFalse(mcp_client._is_allowed(Path("config.toml"))) + self.assertFalse(mcp_client._is_allowed(Path("credentials.toml"))) - def test_mma_exec_no_hardcoded_path(self) -> None: - """TEST 4: mma_exec.execute_agent must not contain hardcoded machine paths.""" - with open("scripts/mma_exec.py", "r", encoding="utf-8") as f: - content = f.read() - # Check for some common home directory patterns or user paths - self.assertNotIn("C:\\Users\\Ed", content) - self.assertNotIn("/Users/ed", content) + def test_mma_exec_no_hardcoded_path(self) -> None: + """TEST 4: mma_exec.execute_agent must not contain hardcoded machine paths.""" + with open("scripts/mma_exec.py", "r", encoding="utf-8") as f: + content = f.read() + # Check for some common home directory patterns or user paths + self.assertNotIn("C:\\Users\\Ed", content) + self.assertNotIn("/Users/ed", content) - def test_claude_mma_exec_no_hardcoded_path(self) -> None: - """TEST 5: claude_mma_exec.execute_agent must not contain hardcoded machine paths.""" - with open("scripts/claude_mma_exec.py", "r", encoding="utf-8") as f: - content = f.read() - self.assertNotIn("C:\\Users\\Ed", content) - self.assertNotIn("/Users/ed", content) \ No newline at end of file + def test_claude_mma_exec_no_hardcoded_path(self) -> None: + """TEST 5: claude_mma_exec.execute_agent must not contain hardcoded machine paths.""" + with open("scripts/claude_mma_exec.py", "r", encoding="utf-8") as f: + content = f.read() + self.assertNotIn("C:\\Users\\Ed", content) + self.assertNotIn("/Users/ed", content) \ No newline at end of file diff --git a/tests/test_arch_boundary_phase2.py b/tests/test_arch_boundary_phase2.py index 6f51f6b1..e822cb52 100644 --- a/tests/test_arch_boundary_phase2.py +++ b/tests/test_arch_boundary_phase2.py @@ -8,93 +8,93 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) class TestArchBoundaryPhase2(unittest.TestCase): - def setUp(self) -> None: + def setUp(self) -> None: + pass + + def test_toml_exposes_all_dispatch_tools(self) -> None: + """manual_slop.toml [agent.tools] must list every tool in mcp_client.dispatch().""" + from src import models + + # We check the tool names in the source of mcp_client.dispatch + import inspect + import src.mcp_client as mcp + source = inspect.getsource(mcp.dispatch) + # This is a bit dynamic, but we can check if it covers our core tool names + for tool in models.AGENT_TOOL_NAMES: + if tool not in ("set_file_slice", "py_update_definition", "py_set_signature", "py_set_var_declaration"): + # Non-mutating tools should definitely be handled pass + def test_toml_mutating_tools_disabled_by_default(self) -> None: + """Verify that the core set of read-only tools is present.""" + from src.models import AGENT_TOOL_NAMES + # Our architecture now uses a fixed set of high-signal tools + self.assertIn("read_file", AGENT_TOOL_NAMES) + self.assertIn("list_directory", AGENT_TOOL_NAMES) + self.assertIn("py_get_skeleton", AGENT_TOOL_NAMES) - def test_toml_exposes_all_dispatch_tools(self) -> None: - """manual_slop.toml [agent.tools] must list every tool in mcp_client.dispatch().""" - from src import models + def test_mcp_client_dispatch_completeness(self) -> None: + """Verify that all tools in tool_schemas are handled by dispatch().""" + from src import mcp_client + # get_tool_schemas exists + available_tools = [t["name"] for t in mcp_client.get_tool_schemas()] + self.assertGreater(len(available_tools), 0) + + def test_mutating_tool_triggers_callback(self) -> None: + """All mutating tools must trigger the pre_tool_callback.""" + from src.app_controller import AppController - # We check the tool names in the source of mcp_client.dispatch - import inspect - import src.mcp_client as mcp - source = inspect.getsource(mcp.dispatch) - # This is a bit dynamic, but we can check if it covers our core tool names - for tool in models.AGENT_TOOL_NAMES: - if tool not in ("set_file_slice", "py_update_definition", "py_set_signature", "py_set_var_declaration"): - # Non-mutating tools should definitely be handled - pass - def test_toml_mutating_tools_disabled_by_default(self) -> None: - """Verify that the core set of read-only tools is present.""" - from src.models import AGENT_TOOL_NAMES - # Our architecture now uses a fixed set of high-signal tools - self.assertIn("read_file", AGENT_TOOL_NAMES) - self.assertIn("list_directory", AGENT_TOOL_NAMES) - self.assertIn("py_get_skeleton", AGENT_TOOL_NAMES) + # Use a real AppController to test its _confirm_and_run + with patch('src.models.load_config', return_value={}), \ + patch('src.performance_monitor.PerformanceMonitor'), \ + patch('src.session_logger.open_session'), \ + patch('src.session_logger.reset_session'), \ + patch('src.app_controller.AppController._prune_old_logs'), \ + patch('src.app_controller.AppController._init_ai_and_hooks'): + controller = AppController() + + mock_cb = MagicMock(return_value="output") + # AppController implements its own _confirm_and_run, let's see how we can mock the HITL part + # In AppController._confirm_and_run, if test_hooks_enabled=False (default), it waits for a dialog + + with patch("src.shell_runner.run_powershell", return_value="output"): + # Simulate auto-approval for test + controller.test_hooks_enabled = True + controller.ui_manual_approve = False + res = controller._confirm_and_run("echo hello", ".") + self.assertEqual(res, "output") - def test_mcp_client_dispatch_completeness(self) -> None: - """Verify that all tools in tool_schemas are handled by dispatch().""" - from src import mcp_client - # get_tool_schemas exists - available_tools = [t["name"] for t in mcp_client.get_tool_schemas()] - self.assertGreater(len(available_tools), 0) - - def test_mutating_tool_triggers_callback(self) -> None: - """All mutating tools must trigger the pre_tool_callback.""" - from src.app_controller import AppController + def test_rejection_prevents_dispatch(self) -> None: + """When pre_tool_callback returns None (rejected), dispatch must NOT be called.""" + from src.app_controller import AppController - # Use a real AppController to test its _confirm_and_run - with patch('src.models.load_config', return_value={}), \ - patch('src.performance_monitor.PerformanceMonitor'), \ - patch('src.session_logger.open_session'), \ - patch('src.session_logger.reset_session'), \ - patch('src.app_controller.AppController._prune_old_logs'), \ - patch('src.app_controller.AppController._init_ai_and_hooks'): - controller = AppController() + with patch('src.models.load_config', return_value={}), \ + patch('src.performance_monitor.PerformanceMonitor'), \ + patch('src.session_logger.open_session'), \ + patch('src.session_logger.reset_session'), \ + patch('src.app_controller.AppController._prune_old_logs'), \ + patch('src.app_controller.AppController._init_ai_and_hooks'): + controller = AppController() - mock_cb = MagicMock(return_value="output") - # AppController implements its own _confirm_and_run, let's see how we can mock the HITL part - # In AppController._confirm_and_run, if test_hooks_enabled=False (default), it waits for a dialog - - with patch("src.shell_runner.run_powershell", return_value="output"): - # Simulate auto-approval for test - controller.test_hooks_enabled = True - controller.ui_manual_approve = False - res = controller._confirm_and_run("echo hello", ".") - self.assertEqual(res, "output") - - def test_rejection_prevents_dispatch(self) -> None: - """When pre_tool_callback returns None (rejected), dispatch must NOT be called.""" - from src.app_controller import AppController - - with patch('src.models.load_config', return_value={}), \ - patch('src.performance_monitor.PerformanceMonitor'), \ - patch('src.session_logger.open_session'), \ - patch('src.session_logger.reset_session'), \ - patch('src.app_controller.AppController._prune_old_logs'), \ - patch('src.app_controller.AppController._init_ai_and_hooks'): - controller = AppController() - - # Mock the wait() method of ConfirmDialog to return (False, script) - with patch("src.app_controller.ConfirmDialog") as mock_dialog_class: - mock_dialog = mock_dialog_class.return_value - mock_dialog.wait.return_value = (False, "script") - mock_dialog._uid = "test_uid" + # Mock the wait() method of ConfirmDialog to return (False, script) + with patch("src.app_controller.ConfirmDialog") as mock_dialog_class: + mock_dialog = mock_dialog_class.return_value + mock_dialog.wait.return_value = (False, "script") + mock_dialog._uid = "test_uid" - with patch("src.shell_runner.run_powershell") as mock_run: - controller.test_hooks_enabled = False # Force manual approval (dialog) - res = controller._confirm_and_run("script", ".") - self.assertIsNone(res) - self.assertFalse(mock_run.called) + with patch("src.shell_runner.run_powershell") as mock_run: + controller.test_hooks_enabled = False # Force manual approval (dialog) + res = controller._confirm_and_run("script", ".") + self.assertIsNone(res) + self.assertFalse(mock_run.called) - def test_non_mutating_tool_skips_callback(self) -> None: - """Read-only tools must NOT trigger pre_tool_callback.""" - from src import ai_client - # Check internal list or method - if hasattr(ai_client, '_is_mutating_tool'): - mutating = ["run_powershell", "set_file_slice"] - for t in mutating: - self.assertTrue(ai_client._is_mutating_tool(t)) + def test_non_mutating_tool_skips_callback(self) -> None: + """Read-only tools must NOT trigger pre_tool_callback.""" + from src import ai_client + # Check internal list or method + if hasattr(ai_client, '_is_mutating_tool'): + mutating = ["run_powershell", "set_file_slice"] + for t in mutating: + self.assertTrue(ai_client._is_mutating_tool(t)) - self.assertFalse(ai_client._is_mutating_tool("read_file")) - self.assertFalse(ai_client._is_mutating_tool("list_directory")) \ No newline at end of file + self.assertFalse(ai_client._is_mutating_tool("read_file")) + self.assertFalse(ai_client._is_mutating_tool("list_directory")) \ No newline at end of file diff --git a/tests/test_arch_boundary_phase3.py b/tests/test_arch_boundary_phase3.py index 8a237711..c5ce6af0 100644 --- a/tests/test_arch_boundary_phase3.py +++ b/tests/test_arch_boundary_phase3.py @@ -1,91 +1,64 @@ import os import sys import unittest - -# Ensure project root is in path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - class TestArchBoundaryPhase3(unittest.TestCase): - def setUp(self) -> None: - pass - - def test_cascade_blocks_simple(self) -> None: - """Test that a blocked dependency blocks its immediate dependent.""" - from src.models import Ticket, Track - t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1") - t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"]) - track = Track(id="TR1", description="track", tickets=[t1, t2]) - - # ExecutionEngine should identify T2 as blocked during tick - from src.dag_engine import TrackDAG, ExecutionEngine - dag = TrackDAG([t1, t2]) - engine = ExecutionEngine(dag) - engine.tick() - - self.assertEqual(t2.status, "blocked") - if t2.blocked_reason: - self.assertIn("T1", t2.blocked_reason) - - def test_cascade_blocks_multi_hop(self) -> None: - """Test that blocking cascades through multiple dependencies.""" - from src.models import Ticket - from src.dag_engine import TrackDAG, ExecutionEngine - - t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1") - t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"]) - t3 = Ticket(id="T3", description="d3", status="todo", assigned_to="worker1", depends_on=["T2"]) - - dag = TrackDAG([t1, t2, t3]) - engine = ExecutionEngine(dag) - engine.tick() - - self.assertEqual(t2.status, "blocked") - self.assertEqual(t3.status, "blocked") - - def test_manual_unblock_restores_todo(self) -> None: - """Test that unblocking a task manually works if dependencies are met.""" - from src.models import Ticket - from src.dag_engine import TrackDAG, ExecutionEngine - - t1 = Ticket(id="T1", description="d1", status="completed", assigned_to="worker1") - t2 = Ticket(id="T2", description="d2", status="blocked", assigned_to="worker1", blocked_reason="manual") - - dag = TrackDAG([t1, t2]) - engine = ExecutionEngine(dag) - - # Update status to todo - engine.update_task_status("T2", "todo") - self.assertEqual(t2.status, "todo") - - # Next tick should keep it todo (ready) - ready = engine.tick() - self.assertIn(t2, ready) - - def test_in_progress_not_blocked(self) -> None: - """Test that in_progress tasks are not blocked automatically (only todo).""" - from src.models import Ticket - from src.dag_engine import TrackDAG, ExecutionEngine - - t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1") - t2 = Ticket(id="T2", description="d2", status="in_progress", assigned_to="worker1", depends_on=["T1"]) - - dag = TrackDAG([t1, t2]) - engine = ExecutionEngine(dag) - engine.tick() - - # T2 should remain in_progress because it's already running - self.assertEqual(t2.status, "in_progress") - - def test_execution_engine_tick_cascades_blocks(self) -> None: - """Test that ExecutionEngine.tick() triggers the cascading blocks.""" - from src.models import Ticket - from src.dag_engine import TrackDAG, ExecutionEngine - - t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1") - t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"]) - - dag = TrackDAG([t1, t2]) - engine = ExecutionEngine(dag) - engine.tick() - - self.assertEqual(t2.status, "blocked") \ No newline at end of file + def setUp(self) -> None: + pass + def test_cascade_blocks_simple(self) -> None: + """Test that a blocked dependency blocks its immediate dependent.""" + from src.models import Ticket, Track + t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1") + t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"]) + track = Track(id="TR1", description="track", tickets=[t1, t2]) + from src.dag_engine import TrackDAG, ExecutionEngine + dag = TrackDAG([t1, t2]) + engine = ExecutionEngine(dag) + engine.tick() + self.assertEqual(t2.status, "blocked") + if t2.blocked_reason: + self.assertIn("T1", t2.blocked_reason) + def test_cascade_blocks_multi_hop(self) -> None: + """Test that blocking cascades through multiple dependencies.""" + from src.models import Ticket + from src.dag_engine import TrackDAG, ExecutionEngine + t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1") + t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"]) + t3 = Ticket(id="T3", description="d3", status="todo", assigned_to="worker1", depends_on=["T2"]) + dag = TrackDAG([t1, t2, t3]) + engine = ExecutionEngine(dag) + engine.tick() + self.assertEqual(t2.status, "blocked") + self.assertEqual(t3.status, "blocked") + def test_manual_unblock_restores_todo(self) -> None: + """Test that unblocking a task manually works if dependencies are met.""" + from src.models import Ticket + from src.dag_engine import TrackDAG, ExecutionEngine + t1 = Ticket(id="T1", description="d1", status="completed", assigned_to="worker1") + t2 = Ticket(id="T2", description="d2", status="blocked", assigned_to="worker1", blocked_reason="manual") + dag = TrackDAG([t1, t2]) + engine = ExecutionEngine(dag) + engine.update_task_status("T2", "todo") + self.assertEqual(t2.status, "todo") + ready = engine.tick() + self.assertIn(t2, ready) + def test_in_progress_not_blocked(self) -> None: + """Test that in_progress tasks are not blocked automatically (only todo).""" + from src.models import Ticket + from src.dag_engine import TrackDAG, ExecutionEngine + t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1") + t2 = Ticket(id="T2", description="d2", status="in_progress", assigned_to="worker1", depends_on=["T1"]) + dag = TrackDAG([t1, t2]) + engine = ExecutionEngine(dag) + engine.tick() + self.assertEqual(t2.status, "in_progress") + def test_execution_engine_tick_cascades_blocks(self) -> None: + """Test that ExecutionEngine.tick() triggers the cascading blocks.""" + from src.models import Ticket + from src.dag_engine import TrackDAG, ExecutionEngine + t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1") + t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"]) + dag = TrackDAG([t1, t2]) + engine = ExecutionEngine(dag) + engine.tick() + self.assertEqual(t2.status, "blocked") \ No newline at end of file diff --git a/tests/test_context_composition_decoupled.py b/tests/test_context_composition_decoupled.py index 9db77094..3851007f 100644 --- a/tests/test_context_composition_decoupled.py +++ b/tests/test_context_composition_decoupled.py @@ -4,45 +4,45 @@ from src.app_controller import AppController from src.models import FileItem def test_context_files_is_decoupled(): - controller = AppController() + controller = AppController() - # Verify both lists exist and are distinct - assert hasattr(controller, 'files') - assert hasattr(controller, 'context_files') - assert controller.files is not controller.context_files + # Verify both lists exist and are distinct + assert hasattr(controller, 'files') + assert hasattr(controller, 'context_files') + assert controller.files is not controller.context_files - # Modifying one should not affect the other - controller.files.append(FileItem(path="whitelist.txt")) - controller.context_files.append(FileItem(path="context.txt")) + # Modifying one should not affect the other + controller.files.append(FileItem(path="whitelist.txt")) + controller.context_files.append(FileItem(path="context.txt")) - assert len(controller.files) == 1 - assert controller.files[0].path == "whitelist.txt" + assert len(controller.files) == 1 + assert controller.files[0].path == "whitelist.txt" - assert len(controller.context_files) == 1 - assert controller.context_files[0].path == "context.txt" + assert len(controller.context_files) == 1 + assert controller.context_files[0].path == "context.txt" def test_do_generate_uses_context_files(monkeypatch): - controller = AppController() - controller.init_state() - controller.context_files = [FileItem(path="context.txt")] - controller.files = [FileItem(path="whitelist.txt")] + controller = AppController() + controller.init_state() + controller.context_files = [FileItem(path="context.txt")] + controller.files = [FileItem(path="whitelist.txt")] - # Mock project_manager.flat_config and aggregate.run to verify passed data - import src.project_manager as pm - import src.aggregate as agg + # Mock project_manager.flat_config and aggregate.run to verify passed data + import src.project_manager as pm + import src.aggregate as agg - def mock_flat_config(*args, **kwargs): - return {"files": {}} + def mock_flat_config(*args, **kwargs): + return {"files": {}} - def mock_aggregate_run(flat, **kwargs): - assert flat["files"]["paths"] == controller.context_files - return ("md", Path("path"), []) + def mock_aggregate_run(flat, **kwargs): + assert flat["files"]["paths"] == controller.context_files + return ("md", Path("path"), []) - monkeypatch.setattr(pm, "flat_config", mock_flat_config) - monkeypatch.setattr(pm, "save_project", lambda *args: None) - monkeypatch.setattr(agg, "run", mock_aggregate_run) - monkeypatch.setattr(agg, "build_markdown_no_history", lambda *args, **kwargs: "stable") - monkeypatch.setattr(agg, "build_discussion_text", lambda *args, **kwargs: "disc") + monkeypatch.setattr(pm, "flat_config", mock_flat_config) + monkeypatch.setattr(pm, "save_project", lambda *args: None) + monkeypatch.setattr(agg, "run", mock_aggregate_run) + monkeypatch.setattr(agg, "build_markdown_no_history", lambda *args, **kwargs: "stable") + monkeypatch.setattr(agg, "build_discussion_text", lambda *args, **kwargs: "disc") - # Should not raise assertion error - controller._do_generate() + # Should not raise assertion error + controller._do_generate() diff --git a/tests/test_context_composition_phase3.py b/tests/test_context_composition_phase3.py index 1b850b5a..20628cab 100644 --- a/tests/test_context_composition_phase3.py +++ b/tests/test_context_composition_phase3.py @@ -3,28 +3,28 @@ from src.aggregate import group_files_by_dir, compute_file_stats from src.models import FileItem def test_group_files_by_dir(): - files = [ - FileItem(path="src/main.py"), - FileItem(path="src/utils/helpers.py"), - FileItem(path="tests/test_main.py"), - FileItem(path="README.md") - ] - grouped = group_files_by_dir(files) - assert len(grouped) == 4 - assert grouped["src"] == [files[0]] - assert grouped["src/utils"] == [files[1]] - assert grouped["tests"] == [files[2]] - assert grouped["."] == [files[3]] + files = [ + FileItem(path="src/main.py"), + FileItem(path="src/utils/helpers.py"), + FileItem(path="tests/test_main.py"), + FileItem(path="README.md") + ] + grouped = group_files_by_dir(files) + assert len(grouped) == 4 + assert grouped["src"] == [files[0]] + assert grouped["src/utils"] == [files[1]] + assert grouped["tests"] == [files[2]] + assert grouped["."] == [files[3]] def test_compute_file_stats(): - import tempfile - import os - with tempfile.TemporaryDirectory() as temp_dir: - # Create a dummy python file - py_path = os.path.join(temp_dir, "test.py") - with open(py_path, "w") as f: - f.write("def foo():\n pass\n\nclass Bar:\n pass\n") + import tempfile + import os + with tempfile.TemporaryDirectory() as temp_dir: + # Create a dummy python file + py_path = os.path.join(temp_dir, "test.py") + with open(py_path, "w") as f: + f.write("def foo():\n pass\n\nclass Bar:\n pass\n") - stats = compute_file_stats(py_path) - assert stats["lines"] == 5 - assert stats["ast_elements"] == 2 # 1 func, 1 class + stats = compute_file_stats(py_path) + assert stats["lines"] == 5 + assert stats["ast_elements"] == 2 # 1 func, 1 class diff --git a/tests/test_context_composition_phase4.py b/tests/test_context_composition_phase4.py index 09b7ca5a..ea6d0486 100644 --- a/tests/test_context_composition_phase4.py +++ b/tests/test_context_composition_phase4.py @@ -3,33 +3,33 @@ from src.gui_2 import App from src.models import FileItem def test_view_mode_initialization(): - app = App() - # Mock imgui - from imgui_bundle import imgui + app = App() + # Mock imgui + from imgui_bundle import imgui - app.context_files = [ - FileItem(path="test1.py"), - FileItem(path="test2.py", view_mode="full") - ] + app.context_files = [ + FileItem(path="test1.py"), + FileItem(path="test2.py", view_mode="full") + ] - # We test the model defaults and the rendering assignment logic indirectly. - assert app.context_files[0].view_mode == "full" # Default from FileItem Model - assert app.context_files[1].view_mode == "full" + # We test the model defaults and the rendering assignment logic indirectly. + assert app.context_files[0].view_mode == "full" # Default from FileItem Model + assert app.context_files[1].view_mode == "full" def test_batch_view_mode_change(): - app = App() + app = App() - f1 = FileItem(path="test1.py", view_mode="full") - f2 = FileItem(path="test2.py", view_mode="full") + f1 = FileItem(path="test1.py", view_mode="full") + f2 = FileItem(path="test2.py", view_mode="full") - app.context_files = [f1, f2] - app.ui_selected_context_files = {"test1.py"} + app.context_files = [f1, f2] + app.ui_selected_context_files = {"test1.py"} - # Simulate clicking "Skeleton" batch button - for f in app.context_files: - f_path = f.path if hasattr(f, "path") else str(f) - if f_path in app.ui_selected_context_files: - f.view_mode = "skeleton" + # Simulate clicking "Skeleton" batch button + for f in app.context_files: + f_path = f.path if hasattr(f, "path") else str(f) + if f_path in app.ui_selected_context_files: + f.view_mode = "skeleton" - assert f1.view_mode == "skeleton" - assert f2.view_mode == "full" # not selected + assert f1.view_mode == "skeleton" + assert f2.view_mode == "full" # not selected diff --git a/tests/test_diff_viewer.py b/tests/test_diff_viewer.py index 09765b4d..d788401b 100644 --- a/tests/test_diff_viewer.py +++ b/tests/test_diff_viewer.py @@ -3,55 +3,55 @@ import tempfile import os from pathlib import Path from src.diff_viewer import ( - parse_diff, DiffFile, DiffHunk, parse_hunk_header, - get_line_color, apply_patch_to_file + parse_diff, DiffFile, DiffHunk, parse_hunk_header, + get_line_color, apply_patch_to_file ) def test_parse_diff_empty() -> None: - result = parse_diff("") - assert result == [] + result = parse_diff("") + assert result == [] def test_parse_diff_none() -> None: - result = parse_diff(None) # type: ignore - assert result == [] + result = parse_diff(None) # type: ignore + assert result == [] def test_parse_simple_diff() -> None: - diff_text = """--- a/src/test.py + diff_text = """--- a/src/test.py +++ b/src/test.py @@ -1 +1 @@ -old +new""" - result = parse_diff(diff_text) - assert len(result) == 1 - assert result[0].old_path == "src/test.py" - assert result[0].new_path == "src/test.py" - assert len(result[0].hunks) == 1 - assert result[0].hunks[0].header == "@@ -1 +1 @@" + result = parse_diff(diff_text) + assert len(result) == 1 + assert result[0].old_path == "src/test.py" + assert result[0].new_path == "src/test.py" + assert len(result[0].hunks) == 1 + assert result[0].hunks[0].header == "@@ -1 +1 @@" def test_parse_diff_with_context() -> None: - diff_text = """--- a/src/example.py + diff_text = """--- a/src/example.py +++ b/src/example.py @@ -10,5 +10,6 @@ - def existing_function(): - pass +def existing_function(): + pass - old_line + old_line + new_line - more_code""" - result = parse_diff(diff_text) - assert len(result) == 1 - assert result[0].old_path == "src/example.py" - assert len(result[0].hunks) == 1 - hunk = result[0].hunks[0] - assert hunk.old_start == 10 - assert hunk.old_count == 5 - assert hunk.new_start == 10 - assert hunk.new_count == 6 - assert "- old_line" in hunk.lines - assert "+ new_line" in hunk.lines + more_code""" + result = parse_diff(diff_text) + assert len(result) == 1 + assert result[0].old_path == "src/example.py" + assert len(result[0].hunks) == 1 + hunk = result[0].hunks[0] + assert hunk.old_start == 10 + assert hunk.old_count == 5 + assert hunk.new_start == 10 + assert hunk.new_count == 6 + assert "- old_line" in hunk.lines + assert "+ new_line" in hunk.lines def test_parse_multiple_files() -> None: - diff_text = """--- a/file1.py + diff_text = """--- a/file1.py +++ b/file1.py @@ -1 +1 @@ -a @@ -61,70 +61,70 @@ def test_parse_multiple_files() -> None: @@ -1 +1 @@ -c +d""" - result = parse_diff(diff_text) - assert len(result) == 2 - assert result[0].old_path == "file1.py" - assert result[1].old_path == "file2.py" + result = parse_diff(diff_text) + assert len(result) == 2 + assert result[0].old_path == "file1.py" + assert result[1].old_path == "file2.py" def test_parse_hunk_header() -> None: - result = parse_hunk_header("@@ -10,5 +10,6 @@") - assert result == (10, 5, 10, 6) + result = parse_hunk_header("@@ -10,5 +10,6 @@") + assert result == (10, 5, 10, 6) - result = parse_hunk_header("@@ -1 +1 @@") - assert result == (1, 1, 1, 1) + result = parse_hunk_header("@@ -1 +1 @@") + assert result == (1, 1, 1, 1) def test_diff_line_classification() -> None: - diff_text = """--- a/test.py + diff_text = """--- a/test.py +++ b/test.py @@ -1,3 +1,4 @@ - context line +context line -removed line +removed line +added line - another context""" - result = parse_diff(diff_text) - hunk = result[0].hunks[0] - assert any(line.startswith("-") for line in hunk.lines) - assert any(line.startswith("+") for line in hunk.lines) - assert any(line.startswith(" ") or not line.startswith(("-", "+")) for line in hunk.lines) +another context""" + result = parse_diff(diff_text) + hunk = result[0].hunks[0] + assert any(line.startswith("-") for line in hunk.lines) + assert any(line.startswith("+") for line in hunk.lines) + assert any(line.startswith(" ") or not line.startswith(("-", "+")) for line in hunk.lines) def test_get_line_color() -> None: - assert get_line_color("+added") == "green" - assert get_line_color("-removed") == "red" - assert get_line_color("@@ -1,3 +1,4 @@") == "cyan" - assert get_line_color(" context") == None + assert get_line_color("+added") == "green" + assert get_line_color("-removed") == "red" + assert get_line_color("@@ -1,3 +1,4 @@") == "cyan" + assert get_line_color(" context") == None def test_apply_patch_simple() -> None: - with tempfile.TemporaryDirectory() as tmpdir: - test_file = Path(tmpdir) / "test.py" - test_file.write_text("old\n") + with tempfile.TemporaryDirectory() as tmpdir: + test_file = Path(tmpdir) / "test.py" + test_file.write_text("old\n") - patch = f"""--- a/{test_file.name} + patch = f"""--- a/{test_file.name} +++ b/{test_file.name} @@ -1 +1 @@ -old +new""" - success, msg = apply_patch_to_file(patch, tmpdir) - assert success - assert test_file.read_text() == "new\n" + success, msg = apply_patch_to_file(patch, tmpdir) + assert success + assert test_file.read_text() == "new\n" def test_apply_patch_with_context() -> None: - with tempfile.TemporaryDirectory() as tmpdir: - test_file = Path(tmpdir) / "example.py" - test_file.write_text("line 1\nline 2\nline 3\n") + with tempfile.TemporaryDirectory() as tmpdir: + test_file = Path(tmpdir) / "example.py" + test_file.write_text("line 1\nline 2\nline 3\n") - patch = f"""--- a/{test_file.name} + patch = f"""--- a/{test_file.name} +++ b/{test_file.name} @@ -1,3 +1,3 @@ -line 1 -line 2 +line one +line two - line 3""" +line 3""" - success, msg = apply_patch_to_file(patch, tmpdir) - assert success - content = test_file.read_text() - assert "line one" in content - assert "line two" in content + success, msg = apply_patch_to_file(patch, tmpdir) + assert success + content = test_file.read_text() + assert "line one" in content + assert "line two" in content diff --git a/tests/test_external_editor.py b/tests/test_external_editor.py index 72ad0420..6358237a 100644 --- a/tests/test_external_editor.py +++ b/tests/test_external_editor.py @@ -3,127 +3,127 @@ import pytest from unittest.mock import patch, MagicMock from src.models import TextEditorConfig, ExternalEditorConfig from src.external_editor import ( - ExternalEditorLauncher, - get_default_launcher, - create_temp_modified_file, + ExternalEditorLauncher, + get_default_launcher, + create_temp_modified_file, ) @pytest.fixture def vscode_editor(): - return TextEditorConfig(name="vscode", path="C:\\path\\to\\code.exe", diff_args=["--diff"]) + return TextEditorConfig(name="vscode", path="C:\\path\\to\\code.exe", diff_args=["--diff"]) @pytest.fixture def notepadpp_editor(): - return TextEditorConfig(name="notepad++", path="C:\\path\\to\\notepad++.exe", diff_args=["-multiInst", "-nosession"]) + return TextEditorConfig(name="notepad++", path="C:\\path\\to\\notepad++.exe", diff_args=["-multiInst", "-nosession"]) @pytest.fixture def ext_config(vscode_editor, notepadpp_editor): - return ExternalEditorConfig( - editors={"vscode": vscode_editor, "notepad++": notepadpp_editor}, - default_editor="vscode", - ) + return ExternalEditorConfig( + editors={"vscode": vscode_editor, "notepad++": notepadpp_editor}, + default_editor="vscode", + ) @pytest.fixture def launcher(ext_config): - return ExternalEditorLauncher(ext_config) + return ExternalEditorLauncher(ext_config) class TestTextEditorConfig: - def test_from_dict_with_diff_args(self): - data = {"name": "vscode", "path": "C:\\code.exe", "diff_args": ["--diff"]} - editor = TextEditorConfig.from_dict(data) - assert editor.name == "vscode" - assert editor.path == "C:\\code.exe" - assert editor.diff_args == ["--diff"] + def test_from_dict_with_diff_args(self): + data = {"name": "vscode", "path": "C:\\code.exe", "diff_args": ["--diff"]} + editor = TextEditorConfig.from_dict(data) + assert editor.name == "vscode" + assert editor.path == "C:\\code.exe" + assert editor.diff_args == ["--diff"] - def test_from_dict_without_diff_args(self): - data = {"name": "vscode", "path": "C:\\code.exe"} - editor = TextEditorConfig.from_dict(data) - assert editor.diff_args == [] + def test_from_dict_without_diff_args(self): + data = {"name": "vscode", "path": "C:\\code.exe"} + editor = TextEditorConfig.from_dict(data) + assert editor.diff_args == [] - def test_to_dict(self, vscode_editor): - result = vscode_editor.to_dict() - assert result["name"] == "vscode" - assert result["path"] == "C:\\path\\to\\code.exe" - assert result["diff_args"] == ["--diff"] + def test_to_dict(self, vscode_editor): + result = vscode_editor.to_dict() + assert result["name"] == "vscode" + assert result["path"] == "C:\\path\\to\\code.exe" + assert result["diff_args"] == ["--diff"] class TestExternalEditorConfig: - def test_from_dict_with_string_editors(self): - data = {"editors": {"vscode": "C:\\code.exe"}, "default_editor": "vscode"} - config = ExternalEditorConfig.from_dict(data) - assert "vscode" in config.editors - assert config.editors["vscode"].path == "C:\\code.exe" + def test_from_dict_with_string_editors(self): + data = {"editors": {"vscode": "C:\\code.exe"}, "default_editor": "vscode"} + config = ExternalEditorConfig.from_dict(data) + assert "vscode" in config.editors + assert config.editors["vscode"].path == "C:\\code.exe" - def test_from_dict_with_dict_editors(self, vscode_editor): - data = {"editors": {"vscode": {"name": "vscode", "path": "C:\\code.exe", "diff_args": ["--diff"]}}} - config = ExternalEditorConfig.from_dict(data) - assert config.editors["vscode"].diff_args == ["--diff"] + def test_from_dict_with_dict_editors(self, vscode_editor): + data = {"editors": {"vscode": {"name": "vscode", "path": "C:\\code.exe", "diff_args": ["--diff"]}}} + config = ExternalEditorConfig.from_dict(data) + assert config.editors["vscode"].diff_args == ["--diff"] - def test_get_default_returns_configured(self, ext_config): - result = ext_config.get_default() - assert result.name == "vscode" + def test_get_default_returns_configured(self, ext_config): + result = ext_config.get_default() + assert result.name == "vscode" - def test_get_default_fallback_to_first(self): - config = ExternalEditorConfig(editors={"notepad++": TextEditorConfig(name="notepad++", path="C:\\npp.exe")}) - result = config.get_default() - assert result.name == "notepad++" + def test_get_default_fallback_to_first(self): + config = ExternalEditorConfig(editors={"notepad++": TextEditorConfig(name="notepad++", path="C:\\npp.exe")}) + result = config.get_default() + assert result.name == "notepad++" - def test_get_default_returns_none_when_empty(self): - config = ExternalEditorConfig(editors={}) - assert config.get_default() is None + def test_get_default_returns_none_when_empty(self): + config = ExternalEditorConfig(editors={}) + assert config.get_default() is None - def test_to_dict(self, ext_config): - result = ext_config.to_dict() - assert result["default_editor"] == "vscode" - assert "vscode" in result["editors"] + def test_to_dict(self, ext_config): + result = ext_config.to_dict() + assert result["default_editor"] == "vscode" + assert "vscode" in result["editors"] class TestExternalEditorLauncher: - def test_get_editor_by_name(self, launcher): - editor = launcher.get_editor("notepad++") - assert editor.name == "notepad++" + def test_get_editor_by_name(self, launcher): + editor = launcher.get_editor("notepad++") + assert editor.name == "notepad++" - def test_get_editor_returns_default(self, launcher): - editor = launcher.get_editor() - assert editor.name == "vscode" + def test_get_editor_returns_default(self, launcher): + editor = launcher.get_editor() + assert editor.name == "vscode" - def test_get_editor_unknown_name(self, launcher): - editor = launcher.get_editor("unknown") - assert editor is None + def test_get_editor_unknown_name(self, launcher): + editor = launcher.get_editor("unknown") + assert editor is None - def test_build_diff_command(self, launcher, vscode_editor): - cmd = launcher.build_diff_command(vscode_editor, "orig.txt", "mod.txt") - assert cmd == ["C:\\path\\to\\code.exe", "--diff", "orig.txt", "mod.txt"] + def test_build_diff_command(self, launcher, vscode_editor): + cmd = launcher.build_diff_command(vscode_editor, "orig.txt", "mod.txt") + assert cmd == ["C:\\path\\to\\code.exe", "--diff", "orig.txt", "mod.txt"] - def test_launch_diff_missing_editor(self, launcher): - result = launcher.launch_diff("nonexistent", "orig.txt", "mod.txt") - assert result is None + def test_launch_diff_missing_editor(self, launcher): + result = launcher.launch_diff("nonexistent", "orig.txt", "mod.txt") + assert result is None - @patch("subprocess.Popen") - def test_launch_diff_success(self, mock_popen, launcher): - mock_popen.return_value = MagicMock() - result = launcher.launch_diff("vscode", "orig.txt", "mod.txt") - assert result is not None - mock_popen.assert_called_once() + @patch("subprocess.Popen") + def test_launch_diff_success(self, mock_popen, launcher): + mock_popen.return_value = MagicMock() + result = launcher.launch_diff("vscode", "orig.txt", "mod.txt") + assert result is not None + mock_popen.assert_called_once() - @patch("subprocess.Popen") - def test_launch_diff_file_not_found(self, mock_popen, launcher): - mock_popen.side_effect = FileNotFoundError() - result = launcher.launch_diff("vscode", "orig.txt", "mod.txt") - assert result is None + @patch("subprocess.Popen") + def test_launch_diff_file_not_found(self, mock_popen, launcher): + mock_popen.side_effect = FileNotFoundError() + result = launcher.launch_diff("vscode", "orig.txt", "mod.txt") + assert result is None class TestHelperFunctions: - def test_create_temp_modified_file(self): - content = "test content" - path = create_temp_modified_file(content) - assert path.endswith("_modified") - with open(path, encoding="utf-8") as f: - assert f.read() == content - import os - os.unlink(path) + def test_create_temp_modified_file(self): + content = "test content" + path = create_temp_modified_file(content) + assert path.endswith("_modified") + with open(path, encoding="utf-8") as f: + assert f.read() == content + import os + os.unlink(path) diff --git a/tests/test_external_mcp_hitl.py b/tests/test_external_mcp_hitl.py index 5253ed95..8d8a2d50 100644 --- a/tests/test_external_mcp_hitl.py +++ b/tests/test_external_mcp_hitl.py @@ -8,55 +8,55 @@ from src import models @pytest.mark.asyncio async def test_external_mcp_hitl_approval(): - # 1. Setup mock manager and server - mock_manager = mcp_client.ExternalMCPManager() - mock_server = AsyncMock() - mock_server.name = "test-server" - mock_server.tools = {"ext_tool": {"name": "ext_tool", "description": "desc"}} - mock_server.call_tool.return_value = "Success" - mock_manager.servers["test-server"] = mock_server + # 1. Setup mock manager and server + mock_manager = mcp_client.ExternalMCPManager() + mock_server = AsyncMock() + mock_server.name = "test-server" + mock_server.tools = {"ext_tool": {"name": "ext_tool", "description": "desc"}} + mock_server.call_tool.return_value = "Success" + mock_manager.servers["test-server"] = mock_server - with patch("src.mcp_client.get_external_mcp_manager", return_value=mock_manager): - # 2. Setup ai_client callbacks - mock_pre_tool = MagicMock(return_value="Approved") - ai_client.confirm_and_run_callback = mock_pre_tool + with patch("src.mcp_client.get_external_mcp_manager", return_value=mock_manager): + # 2. Setup ai_client callbacks + mock_pre_tool = MagicMock(return_value="Approved") + ai_client.confirm_and_run_callback = mock_pre_tool - # 3. Call _execute_single_tool_call_async - name = "ext_tool" - args = {"arg1": "val1"} - call_id = "call_123" - base_dir = "." + # 3. Call _execute_single_tool_call_async + name = "ext_tool" + args = {"arg1": "val1"} + call_id = "call_123" + base_dir = "." - # We need to pass the callback to the function - name, cid, out, orig_name = await ai_client._execute_single_tool_call_async( - name, args, call_id, base_dir, mock_pre_tool, None, 0 - ) + # We need to pass the callback to the function + name, cid, out, orig_name = await ai_client._execute_single_tool_call_async( + name, args, call_id, base_dir, mock_pre_tool, None, 0 + ) - # 4. Assertions - assert out == "Success" - mock_pre_tool.assert_called_once() - # Check description contains EXTERNAL MCP - call_args = mock_pre_tool.call_args[0] - assert "EXTERNAL MCP TOOL: ext_tool" in call_args[0] - assert "arg1: 'val1'" in call_args[0] + # 4. Assertions + assert out == "Success" + mock_pre_tool.assert_called_once() + # Check description contains EXTERNAL MCP + call_args = mock_pre_tool.call_args[0] + assert "EXTERNAL MCP TOOL: ext_tool" in call_args[0] + assert "arg1: 'val1'" in call_args[0] @pytest.mark.asyncio async def test_external_mcp_hitl_rejection(): - mock_manager = mcp_client.ExternalMCPManager() - mock_server = AsyncMock() - mock_server.name = "test-server" - mock_server.tools = {"ext_tool": {"name": "ext_tool"}} - mock_manager.servers["test-server"] = mock_server + mock_manager = mcp_client.ExternalMCPManager() + mock_server = AsyncMock() + mock_server.name = "test-server" + mock_server.tools = {"ext_tool": {"name": "ext_tool"}} + mock_manager.servers["test-server"] = mock_server - with patch("src.mcp_client.get_external_mcp_manager", return_value=mock_manager): - mock_pre_tool = MagicMock(return_value=None) # Rejection + with patch("src.mcp_client.get_external_mcp_manager", return_value=mock_manager): + mock_pre_tool = MagicMock(return_value=None) # Rejection - name = "ext_tool" - args = {"arg1": "val1"} + name = "ext_tool" + args = {"arg1": "val1"} - name, cid, out, orig_name = await ai_client._execute_single_tool_call_async( - name, args, "id", ".", mock_pre_tool, None, 0 - ) + name, cid, out, orig_name = await ai_client._execute_single_tool_call_async( + name, args, "id", ".", mock_pre_tool, None, 0 + ) - assert out == "USER REJECTED: tool execution cancelled" - mock_server.call_tool.assert_not_called() + assert out == "USER REJECTED: tool execution cancelled" + mock_server.call_tool.assert_not_called() diff --git a/tests/test_fuzzy_anchor.py b/tests/test_fuzzy_anchor.py index e1918b81..1ca8b809 100644 --- a/tests/test_fuzzy_anchor.py +++ b/tests/test_fuzzy_anchor.py @@ -3,57 +3,57 @@ from src.fuzzy_anchor import FuzzyAnchor class TestFuzzyAnchor: - def test_create_slice_basic(self): - text = "line0\nline1\nline2\nline3\nline4\n" - result = FuzzyAnchor.create_slice(text, 2, 4) - assert "start_line" in result - assert "end_line" in result - assert "content_hash" in result - assert "start_context" in result - assert "end_context" in result - assert result["start_line"] == 2 - assert result["end_line"] == 4 - assert result["start_context"] == result["end_context"] + def test_create_slice_basic(self): + text = "line0\nline1\nline2\nline3\nline4\n" + result = FuzzyAnchor.create_slice(text, 2, 4) + assert "start_line" in result + assert "end_line" in result + assert "content_hash" in result + assert "start_context" in result + assert "end_context" in result + assert result["start_line"] == 2 + assert result["end_line"] == 4 + assert result["start_context"] == result["end_context"] - def test_resolve_slice_exact_match(self): - text = "line0\nline1\nline2\nline3\nline4\n" - slc = FuzzyAnchor.create_slice(text, 2, 4) - result = FuzzyAnchor.resolve_slice(text, slc) - assert result is not None - start, end = result - assert start == 2 - assert end == 4 + def test_resolve_slice_exact_match(self): + text = "line0\nline1\nline2\nline3\nline4\n" + slc = FuzzyAnchor.create_slice(text, 2, 4) + result = FuzzyAnchor.resolve_slice(text, slc) + assert result is not None + start, end = result + assert start == 2 + assert end == 4 - def test_resolve_slice_line_inserted_before(self): - original = "line0\nline1\nline2\nline3\nline4\n" - modified = "NEW\nline0\nline1\nline2\nline3\nline4\n" - slc = FuzzyAnchor.create_slice(original, 2, 4) - result = FuzzyAnchor.resolve_slice(modified, slc) - assert result is not None - start, end = result - assert start == 3 - assert end == 5 + def test_resolve_slice_line_inserted_before(self): + original = "line0\nline1\nline2\nline3\nline4\n" + modified = "NEW\nline0\nline1\nline2\nline3\nline4\n" + slc = FuzzyAnchor.create_slice(original, 2, 4) + result = FuzzyAnchor.resolve_slice(modified, slc) + assert result is not None + start, end = result + assert start == 3 + assert end == 5 - def test_resolve_slice_line_deleted_before_returns_none(self): - original = "line0\nline1\nline2\nline3\nline4\n" - modified = "line0\nline2\nline3\nline4\n" - slc = FuzzyAnchor.create_slice(original, 2, 4) - result = FuzzyAnchor.resolve_slice(modified, slc) - assert result is None + def test_resolve_slice_line_deleted_before_returns_none(self): + original = "line0\nline1\nline2\nline3\nline4\n" + modified = "line0\nline2\nline3\nline4\n" + slc = FuzzyAnchor.create_slice(original, 2, 4) + result = FuzzyAnchor.resolve_slice(modified, slc) + assert result is None - def test_resolve_slice_multiple_lines_changed(self): - original = "line0\nline1\nline2\nline3\nline4\n" - modified = "a\nb\nc\nd\ne\nline0\nline1\nline2\nline3\nline4\n" - slc = FuzzyAnchor.create_slice(original, 1, 2) - result = FuzzyAnchor.resolve_slice(modified, slc) - assert result is not None - start, end = result - assert start == 6 - assert end == 7 + def test_resolve_slice_multiple_lines_changed(self): + original = "line0\nline1\nline2\nline3\nline4\n" + modified = "a\nb\nc\nd\ne\nline0\nline1\nline2\nline3\nline4\n" + slc = FuzzyAnchor.create_slice(original, 1, 2) + result = FuzzyAnchor.resolve_slice(modified, slc) + assert result is not None + start, end = result + assert start == 6 + assert end == 7 - def test_resolve_slice_anchor_mismatch_returns_none(self): - original = "alpha\nbeta\ngamma\ndelta\nepsilon\n" - modified = "foo\nbar\nbaz\ndelta\nepsilon\n" - slc = FuzzyAnchor.create_slice(original, 2, 3) - result = FuzzyAnchor.resolve_slice(modified, slc) - assert result is None + def test_resolve_slice_anchor_mismatch_returns_none(self): + original = "alpha\nbeta\ngamma\ndelta\nepsilon\n" + modified = "foo\nbar\nbaz\ndelta\nepsilon\n" + slc = FuzzyAnchor.create_slice(original, 2, 3) + result = FuzzyAnchor.resolve_slice(modified, slc) + assert result is None diff --git a/tests/test_gemini_cli_adapter.py b/tests/test_gemini_cli_adapter.py index 2b09e0f4..2effe2e7 100644 --- a/tests/test_gemini_cli_adapter.py +++ b/tests/test_gemini_cli_adapter.py @@ -4,82 +4,82 @@ from src.gemini_cli_adapter import GeminiCliAdapter class TestGeminiCliAdapter: - @patch("subprocess.Popen") - def test_send_starts_subprocess_with_correct_args( - self, mock_popen: MagicMock - ) -> None: - adapter = GeminiCliAdapter(binary_path="gemini") - mock_process = MagicMock() - mock_process.communicate.return_value = ( - '{"type": "message", "content": "hello"}', - "", - ) - mock_process.returncode = 0 - mock_popen.return_value = mock_process - adapter.send("test prompt") - assert mock_popen.called - args, kwargs = mock_popen.call_args - cmd_list = args[0] - assert "gemini" in cmd_list - assert "--prompt" in cmd_list - assert "--output-format" in cmd_list - assert "stream-json" in cmd_list + @patch("subprocess.Popen") + def test_send_starts_subprocess_with_correct_args( + self, mock_popen: MagicMock + ) -> None: + adapter = GeminiCliAdapter(binary_path="gemini") + mock_process = MagicMock() + mock_process.communicate.return_value = ( + '{"type": "message", "content": "hello"}', + "", + ) + mock_process.returncode = 0 + mock_popen.return_value = mock_process + adapter.send("test prompt") + assert mock_popen.called + args, kwargs = mock_popen.call_args + cmd_list = args[0] + assert "gemini" in cmd_list + assert "--prompt" in cmd_list + assert "--output-format" in cmd_list + assert "stream-json" in cmd_list - @patch("subprocess.Popen") - def test_send_parses_jsonl_output(self, mock_popen: MagicMock) -> None: - adapter = GeminiCliAdapter() - stdout_str = '{"type": "message", "content": "Hello "}\n{"type": "message", "content": "world!"}\n' - mock_process = MagicMock() - mock_process.communicate.return_value = (stdout_str, "") - mock_process.returncode = 0 - mock_popen.return_value = mock_process - result = adapter.send("msg") - assert result["text"] == "Hello world!" + @patch("subprocess.Popen") + def test_send_parses_jsonl_output(self, mock_popen: MagicMock) -> None: + adapter = GeminiCliAdapter() + stdout_str = '{"type": "message", "content": "Hello "}\n{"type": "message", "content": "world!"}\n' + mock_process = MagicMock() + mock_process.communicate.return_value = (stdout_str, "") + mock_process.returncode = 0 + mock_popen.return_value = mock_process + result = adapter.send("msg") + assert result["text"] == "Hello world!" - @patch("subprocess.Popen") - def test_send_handles_tool_use_events(self, mock_popen: MagicMock) -> None: - adapter = GeminiCliAdapter() - tool_json = { - "type": "tool_use", - "tool_name": "read_file", - "parameters": {"path": "test.txt"}, - "tool_id": "call_123", - } - stdout_str = json.dumps(tool_json) + "\n" - mock_process = MagicMock() - mock_process.communicate.return_value = (stdout_str, "") - mock_process.returncode = 0 - mock_popen.return_value = mock_process - result = adapter.send("msg") - assert len(result["tool_calls"]) == 1 - assert result["tool_calls"][0]["name"] == "read_file" - assert result["tool_calls"][0]["args"]["path"] == "test.txt" + @patch("subprocess.Popen") + def test_send_handles_tool_use_events(self, mock_popen: MagicMock) -> None: + adapter = GeminiCliAdapter() + tool_json = { + "type": "tool_use", + "tool_name": "read_file", + "parameters": {"path": "test.txt"}, + "tool_id": "call_123", + } + stdout_str = json.dumps(tool_json) + "\n" + mock_process = MagicMock() + mock_process.communicate.return_value = (stdout_str, "") + mock_process.returncode = 0 + mock_popen.return_value = mock_process + result = adapter.send("msg") + assert len(result["tool_calls"]) == 1 + assert result["tool_calls"][0]["name"] == "read_file" + assert result["tool_calls"][0]["args"]["path"] == "test.txt" - @patch("subprocess.Popen") - def test_send_captures_usage_metadata(self, mock_popen: MagicMock) -> None: - adapter = GeminiCliAdapter() - result_json = {"type": "result", "stats": {"total_tokens": 50}} - stdout_str = json.dumps(result_json) + "\n" - mock_process = MagicMock() - mock_process.communicate.return_value = (stdout_str, "") - mock_process.returncode = 0 - mock_popen.return_value = mock_process - adapter.send("msg") - assert adapter.last_usage is not None - assert adapter.last_usage.get("total_tokens") == 50 + @patch("subprocess.Popen") + def test_send_captures_usage_metadata(self, mock_popen: MagicMock) -> None: + adapter = GeminiCliAdapter() + result_json = {"type": "result", "stats": {"total_tokens": 50}} + stdout_str = json.dumps(result_json) + "\n" + mock_process = MagicMock() + mock_process.communicate.return_value = (stdout_str, "") + mock_process.returncode = 0 + mock_popen.return_value = mock_process + adapter.send("msg") + assert adapter.last_usage is not None + assert adapter.last_usage.get("total_tokens") == 50 - @patch("subprocess.Popen") - def test_full_flow_integration(self, mock_popen: MagicMock) -> None: - adapter = GeminiCliAdapter() - msg_json = {"type": "message", "content": "Final response"} - result_json = { - "type": "result", - "stats": {"total_tokens": 25, "input_tokens": 10, "output_tokens": 15}, - } - stdout_str = json.dumps(msg_json) + "\n" + json.dumps(result_json) + "\n" - mock_process = MagicMock() - mock_process.communicate.return_value = (stdout_str, "") - mock_process.returncode = 0 - mock_popen.return_value = mock_process - result = adapter.send("test") - assert "Final response" in result["text"] + @patch("subprocess.Popen") + def test_full_flow_integration(self, mock_popen: MagicMock) -> None: + adapter = GeminiCliAdapter() + msg_json = {"type": "message", "content": "Final response"} + result_json = { + "type": "result", + "stats": {"total_tokens": 25, "input_tokens": 10, "output_tokens": 15}, + } + stdout_str = json.dumps(msg_json) + "\n" + json.dumps(result_json) + "\n" + mock_process = MagicMock() + mock_process.communicate.return_value = (stdout_str, "") + mock_process.returncode = 0 + mock_popen.return_value = mock_process + result = adapter.send("test") + assert "Final response" in result["text"] diff --git a/tests/test_gui_discussion_tabs.py b/tests/test_gui_discussion_tabs.py index f2901287..76380fcf 100644 --- a/tests/test_gui_discussion_tabs.py +++ b/tests/test_gui_discussion_tabs.py @@ -5,60 +5,60 @@ from src import gui_2 @pytest.fixture def mock_gui(): - gui = gui_2.App() - gui.project = { - 'discussion': { - 'active': 'main', - 'discussions': { - 'main': {'history': []}, - 'main_take_1': {'history': []}, - 'other_topic': {'history': []} - } - } - } - gui.active_discussion = 'main' - gui.perf_profiling_enabled = False - gui.is_viewing_prior_session = False - gui._get_discussion_names = lambda: ['main', 'main_take_1', 'other_topic'] - return gui + gui = gui_2.App() + gui.project = { + 'discussion': { + 'active': 'main', + 'discussions': { + 'main': {'history': []}, + 'main_take_1': {'history': []}, + 'other_topic': {'history': []} + } + } + } + gui.active_discussion = 'main' + gui.perf_profiling_enabled = False + gui.is_viewing_prior_session = False + gui._get_discussion_names = lambda: ['main', 'main_take_1', 'other_topic'] + return gui def test_discussion_tabs_rendered(mock_gui): - with patch('src.gui_2.imgui') as mock_imgui, \ - patch('src.gui_2.imscope') as mock_imscope, \ - patch('src.imgui_scopes.imgui', new=mock_imgui), \ - patch('src.app_controller.AppController.active_project_root', new_callable=PropertyMock, return_value='.'): + with patch('src.gui_2.imgui') as mock_imgui, \ + patch('src.gui_2.imscope') as mock_imscope, \ + patch('src.imgui_scopes.imgui', new=mock_imgui), \ + patch('src.app_controller.AppController.active_project_root', new_callable=PropertyMock, return_value='.'): - # Setup imscope mocks - mock_imscope.window.return_value.__enter__.return_value = (True, True) - mock_imscope.child.return_value.__enter__.return_value = True - mock_imscope.table.return_value.__enter__.return_value = True - mock_imscope.tree_node_ex.return_value.__enter__.return_value = True - mock_imscope.tab_item.return_value.__enter__.return_value = (True, True) - mock_imscope.style_color.return_value.__enter__.return_value = None - mock_imscope.style_var.return_value.__enter__.return_value = None + # Setup imscope mocks + mock_imscope.window.return_value.__enter__.return_value = (True, True) + mock_imscope.child.return_value.__enter__.return_value = True + mock_imscope.table.return_value.__enter__.return_value = True + mock_imscope.tree_node_ex.return_value.__enter__.return_value = True + mock_imscope.tab_item.return_value.__enter__.return_value = (True, True) + mock_imscope.style_color.return_value.__enter__.return_value = None + mock_imscope.style_var.return_value.__enter__.return_value = None - # We expect a combo box for base discussion - mock_imgui.begin_combo.return_value = True - mock_imgui.selectable.return_value = (False, False) + # We expect a combo box for base discussion + mock_imgui.begin_combo.return_value = True + mock_imgui.selectable.return_value = (False, False) - # We expect a tab bar for takes - mock_imgui.begin_tab_bar.return_value = True - mock_imgui.begin_tab_item.return_value = (True, True) - mock_imgui.input_text.return_value = (False, "") - mock_imgui.input_text_multiline.return_value = (False, "") - mock_imgui.checkbox.return_value = (False, False) - mock_imgui.input_int.return_value = (False, 0) + # We expect a tab bar for takes + mock_imgui.begin_tab_bar.return_value = True + mock_imgui.begin_tab_item.return_value = (True, True) + mock_imgui.input_text.return_value = (False, "") + mock_imgui.input_text_multiline.return_value = (False, "") + mock_imgui.checkbox.return_value = (False, False) + mock_imgui.input_int.return_value = (False, 0) - mock_clipper = MagicMock() - mock_clipper.step.return_value = False - mock_imgui.ListClipper.return_value = mock_clipper + mock_clipper = MagicMock() + mock_clipper.step.return_value = False + mock_imgui.ListClipper.return_value = mock_clipper - mock_gui._render_discussion_panel() + mock_gui._render_discussion_panel() - mock_imgui.begin_combo.assert_called_once_with("##disc_sel", 'main') - mock_imgui.begin_tab_bar.assert_called_once_with('discussion_takes_tabs') + mock_imgui.begin_combo.assert_called_once_with("##disc_sel", 'main') + mock_imgui.begin_tab_bar.assert_called_once_with('discussion_takes_tabs') - calls = [c[0][0] for c in mock_imscope.tab_item.call_args_list] - assert 'Original###main' in calls - assert 'Take 1###main_take_1' in calls - assert 'Synthesis###Synthesis' in calls + calls = [c[0][0] for c in mock_imscope.tab_item.call_args_list] + assert 'Original###main' in calls + assert 'Take 1###main_take_1' in calls + assert 'Synthesis###Synthesis' in calls diff --git a/tests/test_gui_stress_performance.py b/tests/test_gui_stress_performance.py index 12ef2067..b7e616d8 100644 --- a/tests/test_gui_stress_performance.py +++ b/tests/test_gui_stress_performance.py @@ -9,32 +9,32 @@ from src.api_hook_client import ApiHookClient def test_comms_volume_stress_performance(live_gui) -> None: - time.sleep(5.0) - client = ApiHookClient() - time.sleep(2.0) - baseline_resp = client.get_performance() - baseline = baseline_resp.get("performance", {}) - baseline_ft = baseline.get("last_frame_time_ms", 0.0) - large_session = [] - for i in range(50): - large_session.append( - { - "role": "User", - "content": f"Stress test entry {i} " * 5, - "ts": time.time(), - "collapsed": False, - } - ) - client.post_session(large_session) - time.sleep(1.0) - stress_resp = client.get_performance() - stress = stress_resp.get("performance", {}) - stress_ft = stress.get("last_frame_time_ms", 0.0) - print(f"Baseline FT: {baseline_ft:.2f}ms, Stress FT: {stress_ft:.2f}ms") - if stress_ft > 0: - assert stress_ft < 100.0, ( - f"Stress frame time {stress_ft:.2f}ms exceeds 10fps threshold" - ) - session_data = client.get_session() - entries = session_data.get("session", {}).get("entries", []) - assert len(entries) >= 50, f"Expected at least 50 entries, got {len(entries)}" + time.sleep(5.0) + client = ApiHookClient() + time.sleep(2.0) + baseline_resp = client.get_performance() + baseline = baseline_resp.get("performance", {}) + baseline_ft = baseline.get("last_frame_time_ms", 0.0) + large_session = [] + for i in range(50): + large_session.append( + { + "role": "User", + "content": f"Stress test entry {i} " * 5, + "ts": time.time(), + "collapsed": False, + } + ) + client.post_session(large_session) + time.sleep(1.0) + stress_resp = client.get_performance() + stress = stress_resp.get("performance", {}) + stress_ft = stress.get("last_frame_time_ms", 0.0) + print(f"Baseline FT: {baseline_ft:.2f}ms, Stress FT: {stress_ft:.2f}ms") + if stress_ft > 0: + assert stress_ft < 100.0, ( + f"Stress frame time {stress_ft:.2f}ms exceeds 10fps threshold" + ) + session_data = client.get_session() + entries = session_data.get("session", {}).get("entries", []) + assert len(entries) >= 50, f"Expected at least 50 entries, got {len(entries)}" diff --git a/tests/test_history_manager.py b/tests/test_history_manager.py index e9f903a5..8af79379 100644 --- a/tests/test_history_manager.py +++ b/tests/test_history_manager.py @@ -3,97 +3,97 @@ from src.history import HistoryManager, UISnapshot class TestHistoryManager: - def test_push_and_undo(self): - hm = HistoryManager(max_capacity=10) - hm.push({"value": 1}, "initial") - hm.push({"value": 2}, "change to 2") - assert hm.can_undo is True - result = hm.undo(current_state={"value": 2}) - assert result is not None - assert result.state["value"] == 2 - assert hm.can_undo is True + def test_push_and_undo(self): + hm = HistoryManager(max_capacity=10) + hm.push({"value": 1}, "initial") + hm.push({"value": 2}, "change to 2") + assert hm.can_undo is True + result = hm.undo(current_state={"value": 2}) + assert result is not None + assert result.state["value"] == 2 + assert hm.can_undo is True - def test_undo_and_redo(self): - hm = HistoryManager(max_capacity=10) - hm.push({"value": 1}, "initial") - hm.push({"value": 2}, "change to 2") - assert hm.can_redo is False - hm.undo(current_state={"value": 2}) - assert hm.can_redo is True - redone = hm.redo(current_state={"value": 1}) - assert redone is not None - assert redone.state["value"] == 2 + def test_undo_and_redo(self): + hm = HistoryManager(max_capacity=10) + hm.push({"value": 1}, "initial") + hm.push({"value": 2}, "change to 2") + assert hm.can_redo is False + hm.undo(current_state={"value": 2}) + assert hm.can_redo is True + redone = hm.redo(current_state={"value": 1}) + assert redone is not None + assert redone.state["value"] == 2 - def test_undo_no_history_returns_none(self): - hm = HistoryManager(max_capacity=10) - assert hm.can_undo is False - result = hm.undo(current_state={"value": 1}) - assert result is None + def test_undo_no_history_returns_none(self): + hm = HistoryManager(max_capacity=10) + assert hm.can_undo is False + result = hm.undo(current_state={"value": 1}) + assert result is None - def test_redo_no_history_returns_none(self): - hm = HistoryManager(max_capacity=10) - assert hm.can_redo is False - result = hm.redo(current_state={"value": 1}) - assert result is None + def test_redo_no_history_returns_none(self): + hm = HistoryManager(max_capacity=10) + assert hm.can_redo is False + result = hm.redo(current_state={"value": 1}) + assert result is None - def test_jump_to_undo(self): - hm = HistoryManager(max_capacity=10) - hm.push({"value": 1}, "initial") - hm.push({"value": 2}, "change to 2") - hm.push({"value": 3}, "change to 3") - result = hm.jump_to_undo(0, current_state={"value": 3}) - assert result is not None - assert result.state["value"] == 1 - assert hm.can_redo is True + def test_jump_to_undo(self): + hm = HistoryManager(max_capacity=10) + hm.push({"value": 1}, "initial") + hm.push({"value": 2}, "change to 2") + hm.push({"value": 3}, "change to 3") + result = hm.jump_to_undo(0, current_state={"value": 3}) + assert result is not None + assert result.state["value"] == 1 + assert hm.can_redo is True - def test_get_history_returns_descriptions(self): - hm = HistoryManager(max_capacity=10) - hm.push({"value": 1}, "first") - hm.push({"value": 2}, "second") - hm.push({"value": 3}, "third") - history = hm.get_history() - assert len(history) == 3 - assert history[0]["description"] == "first" - assert history[1]["description"] == "second" - assert "timestamp" in history[0] + def test_get_history_returns_descriptions(self): + hm = HistoryManager(max_capacity=10) + hm.push({"value": 1}, "first") + hm.push({"value": 2}, "second") + hm.push({"value": 3}, "third") + history = hm.get_history() + assert len(history) == 3 + assert history[0]["description"] == "first" + assert history[1]["description"] == "second" + assert "timestamp" in history[0] - def test_snapshot_roundtrip(self): - snap = UISnapshot( - ai_input="test input", - project_system_prompt="project prompt", - global_system_prompt="global prompt", - base_system_prompt="base prompt", - use_default_base_prompt=True, - temperature=0.5, - top_p=0.9, - max_tokens=8192, - auto_add_history=True, - disc_entries=[{"role": "user", "content": "hello"}], - files=[{"path": "a.txt"}], - context_files=[], - screenshots=["screenshot.png"], - ) - d = snap.to_dict() - restored = UISnapshot.from_dict(d) - assert restored.ai_input == snap.ai_input - assert restored.project_system_prompt == snap.project_system_prompt - assert restored.global_system_prompt == snap.global_system_prompt - assert restored.base_system_prompt == snap.base_system_prompt - assert restored.use_default_base_prompt == snap.use_default_base_prompt - assert restored.temperature == snap.temperature - assert restored.top_p == snap.top_p - assert restored.max_tokens == snap.max_tokens - assert restored.auto_add_history == snap.auto_add_history - assert restored.disc_entries == snap.disc_entries - assert restored.files == snap.files - assert restored.context_files == snap.context_files - assert restored.screenshots == snap.screenshots + def test_snapshot_roundtrip(self): + snap = UISnapshot( + ai_input="test input", + project_system_prompt="project prompt", + global_system_prompt="global prompt", + base_system_prompt="base prompt", + use_default_base_prompt=True, + temperature=0.5, + top_p=0.9, + max_tokens=8192, + auto_add_history=True, + disc_entries=[{"role": "user", "content": "hello"}], + files=[{"path": "a.txt"}], + context_files=[], + screenshots=["screenshot.png"], + ) + d = snap.to_dict() + restored = UISnapshot.from_dict(d) + assert restored.ai_input == snap.ai_input + assert restored.project_system_prompt == snap.project_system_prompt + assert restored.global_system_prompt == snap.global_system_prompt + assert restored.base_system_prompt == snap.base_system_prompt + assert restored.use_default_base_prompt == snap.use_default_base_prompt + assert restored.temperature == snap.temperature + assert restored.top_p == snap.top_p + assert restored.max_tokens == snap.max_tokens + assert restored.auto_add_history == snap.auto_add_history + assert restored.disc_entries == snap.disc_entries + assert restored.files == snap.files + assert restored.context_files == snap.context_files + assert restored.screenshots == snap.screenshots - def test_push_clears_redo_stack(self): - hm = HistoryManager(max_capacity=10) - hm.push({"value": 1}, "initial") - hm.push({"value": 2}, "change to 2") - hm.undo(current_state={"value": 2}) - assert hm.can_redo is True - hm.push({"value": 3}, "change to 3") - assert hm.can_redo is False + def test_push_clears_redo_stack(self): + hm = HistoryManager(max_capacity=10) + hm.push({"value": 1}, "initial") + hm.push({"value": 2}, "change to 2") + hm.undo(current_state={"value": 2}) + assert hm.can_redo is True + hm.push({"value": 3}, "change to 3") + assert hm.can_redo is False diff --git a/tests/test_hot_reloader.py b/tests/test_hot_reloader.py index abb42adb..44f8c34e 100644 --- a/tests/test_hot_reloader.py +++ b/tests/test_hot_reloader.py @@ -6,95 +6,95 @@ import sys import types def test_hot_module_dataclass_fields(): - hm = HotModule( - name="test_module", - file_path="/path/to/test_module.py", - state_keys=["attr1", "attr2"], - delegation_targets=["method_a", "method_b"], - ) - assert hm.name == "test_module" - assert hm.file_path == "/path/to/test_module.py" - assert hm.state_keys == ["attr1", "attr2"] - assert hm.delegation_targets == ["method_a", "method_b"] + hm = HotModule( + name="test_module", + file_path="/path/to/test_module.py", + state_keys=["attr1", "attr2"], + delegation_targets=["method_a", "method_b"], + ) + assert hm.name == "test_module" + assert hm.file_path == "/path/to/test_module.py" + assert hm.state_keys == ["attr1", "attr2"] + assert hm.delegation_targets == ["method_a", "method_b"] def test_hot_reloader_register_and_get(): - HotReloader.HOT_MODULES.clear() - hm = HotModule(name="test_mod", file_path="/fake/path.py", state_keys=[], delegation_targets=[]) - HotReloader.register(hm) - assert "test_mod" in HotReloader.HOT_MODULES - assert HotReloader.HOT_MODULES["test_mod"] is hm + HotReloader.HOT_MODULES.clear() + hm = HotModule(name="test_mod", file_path="/fake/path.py", state_keys=[], delegation_targets=[]) + HotReloader.register(hm) + assert "test_mod" in HotReloader.HOT_MODULES + assert HotReloader.HOT_MODULES["test_mod"] is hm def test_hot_reloader_register_duplicate_raises(): - HotReloader.HOT_MODULES.clear() - hm1 = HotModule(name="dup", file_path="/a.py", state_keys=[], delegation_targets=[]) - HotReloader.register(hm1) - with pytest.raises(ValueError, match="already registered"): - HotReloader.register(hm1) + HotReloader.HOT_MODULES.clear() + hm1 = HotModule(name="dup", file_path="/a.py", state_keys=[], delegation_targets=[]) + HotReloader.register(hm1) + with pytest.raises(ValueError, match="already registered"): + HotReloader.register(hm1) def test_hot_reloader_is_error_state(): - HotReloader.HOT_MODULES.clear() - HotReloader.last_error = None - HotReloader.is_error_state = False - assert HotReloader.is_error_state is False + HotReloader.HOT_MODULES.clear() + HotReloader.last_error = None + HotReloader.is_error_state = False + assert HotReloader.is_error_state is False def test_reload_unknown_module_returns_false(): - HotReloader.HOT_MODULES.clear() - HotReloader.register(HotModule(name="nonexistent_mod", file_path="/nonexistent.py", state_keys=[], delegation_targets=[])) - app = MagicMock() - result = HotReloader.reload("nonexistent_mod", app) - assert result is False - assert HotReloader.is_error_state is True - assert HotReloader.last_error is not None + HotReloader.HOT_MODULES.clear() + HotReloader.register(HotModule(name="nonexistent_mod", file_path="/nonexistent.py", state_keys=[], delegation_targets=[])) + app = MagicMock() + result = HotReloader.reload("nonexistent_mod", app) + assert result is False + assert HotReloader.is_error_state is True + assert HotReloader.last_error is not None def test_reload_success_clears_error_state(): - HotReloader.HOT_MODULES.clear() - test_mod = types.ModuleType("src._test_reload_mod_src") - sys.modules["src._test_reload_mod_src"] = test_mod - HotReloader.register(HotModule(name="src._test_reload_mod_src", file_path="/fake.py", state_keys=[], delegation_targets=[])) - app = MagicMock() - HotReloader.is_error_state = True - HotReloader.last_error = "previous error" - with patch("importlib.reload", return_value=test_mod): - result = HotReloader.reload("src._test_reload_mod_src", app) - assert result is True - assert HotReloader.is_error_state is False - assert HotReloader.last_error is None - del sys.modules["src._test_reload_mod_src"] + HotReloader.HOT_MODULES.clear() + test_mod = types.ModuleType("src._test_reload_mod_src") + sys.modules["src._test_reload_mod_src"] = test_mod + HotReloader.register(HotModule(name="src._test_reload_mod_src", file_path="/fake.py", state_keys=[], delegation_targets=[])) + app = MagicMock() + HotReloader.is_error_state = True + HotReloader.last_error = "previous error" + with patch("importlib.reload", return_value=test_mod): + result = HotReloader.reload("src._test_reload_mod_src", app) + assert result is True + assert HotReloader.is_error_state is False + assert HotReloader.last_error is None + del sys.modules["src._test_reload_mod_src"] def test_reload_captures_and_restores_state_on_failure(): - HotReloader.HOT_MODULES.clear() - HotReloader.register(HotModule(name="bad_mod", file_path="/bad.py", state_keys=["_test_attr"], delegation_targets=[])) - app = MagicMock() - app._test_attr = "preserved_value" - result = HotReloader.reload("bad_mod", app) - assert result is False - assert HotReloader.is_error_state is True - assert app._test_attr == "preserved_value" + HotReloader.HOT_MODULES.clear() + HotReloader.register(HotModule(name="bad_mod", file_path="/bad.py", state_keys=["_test_attr"], delegation_targets=[])) + app = MagicMock() + app._test_attr = "preserved_value" + result = HotReloader.reload("bad_mod", app) + assert result is False + assert HotReloader.is_error_state is True + assert app._test_attr == "preserved_value" def test_reload_all_success(): - HotReloader.HOT_MODULES.clear() - mod1 = types.ModuleType("hr_test_mod1") - mod2 = types.ModuleType("hr_test_mod2") - sys.modules["hr_test_mod1"] = mod1 - sys.modules["hr_test_mod2"] = mod2 - HotReloader.register(HotModule(name="hr_test_mod1", file_path="/fake1.py", state_keys=[], delegation_targets=[])) - HotReloader.register(HotModule(name="hr_test_mod2", file_path="/fake2.py", state_keys=[], delegation_targets=[])) - app = MagicMock() - with patch("importlib.reload", return_value=mod1): - result = HotReloader.reload_all(app) - assert result is True - assert HotReloader.is_error_state is False - del sys.modules["hr_test_mod1"] - del sys.modules["hr_test_mod2"] + HotReloader.HOT_MODULES.clear() + mod1 = types.ModuleType("hr_test_mod1") + mod2 = types.ModuleType("hr_test_mod2") + sys.modules["hr_test_mod1"] = mod1 + sys.modules["hr_test_mod2"] = mod2 + HotReloader.register(HotModule(name="hr_test_mod1", file_path="/fake1.py", state_keys=[], delegation_targets=[])) + HotReloader.register(HotModule(name="hr_test_mod2", file_path="/fake2.py", state_keys=[], delegation_targets=[])) + app = MagicMock() + with patch("importlib.reload", return_value=mod1): + result = HotReloader.reload_all(app) + assert result is True + assert HotReloader.is_error_state is False + del sys.modules["hr_test_mod1"] + del sys.modules["hr_test_mod2"] def test_reload_all_partial_failure(): - HotReloader.HOT_MODULES.clear() - mod1 = types.ModuleType("hr_test_mod1") - sys.modules["hr_test_mod1"] = mod1 - HotReloader.register(HotModule(name="hr_test_mod1", file_path="/fake1.py", state_keys=[], delegation_targets=[])) - HotReloader.register(HotModule(name="hr_nonexistent", file_path="/nonexistent.py", state_keys=[], delegation_targets=[])) - app = MagicMock() - result = HotReloader.reload_all(app) - assert result is False - assert HotReloader.is_error_state is True - del sys.modules["hr_test_mod1"] \ No newline at end of file + HotReloader.HOT_MODULES.clear() + mod1 = types.ModuleType("hr_test_mod1") + sys.modules["hr_test_mod1"] = mod1 + HotReloader.register(HotModule(name="hr_test_mod1", file_path="/fake1.py", state_keys=[], delegation_targets=[])) + HotReloader.register(HotModule(name="hr_nonexistent", file_path="/nonexistent.py", state_keys=[], delegation_targets=[])) + app = MagicMock() + result = HotReloader.reload_all(app) + assert result is False + assert HotReloader.is_error_state is True + del sys.modules["hr_test_mod1"] \ No newline at end of file diff --git a/tests/test_mma_dashboard_refresh.py b/tests/test_mma_dashboard_refresh.py index 8529e99e..dd04ed37 100644 --- a/tests/test_mma_dashboard_refresh.py +++ b/tests/test_mma_dashboard_refresh.py @@ -6,70 +6,70 @@ from src.gui_2 import App @pytest.fixture def app_instance() -> Any: - with ( - patch("src.models.load_config", return_value={"ai": {}, "projects": {}}), - patch("src.models.save_config"), - patch("src.gui_2.project_manager"), - patch("src.app_controller.project_manager") as mock_pm, - patch("src.gui_2.session_logger"), - patch("src.gui_2.immapp.run"), - patch("src.app_controller.AppController._load_active_project"), - patch("src.app_controller.AppController._fetch_models"), - patch.object(App, "_load_fonts"), - patch.object(App, "_post_init"), - patch("src.app_controller.AppController._prune_old_logs"), - patch("src.app_controller.AppController.start_services"), - patch("src.app_controller.AppController._init_ai_and_hooks"), - ): - app = App() - app.project = {} - app.ui_files_base_dir = "." - yield app, mock_pm + with ( + patch("src.models.load_config", return_value={"ai": {}, "projects": {}}), + patch("src.models.save_config"), + patch("src.gui_2.project_manager"), + patch("src.app_controller.project_manager") as mock_pm, + patch("src.gui_2.session_logger"), + patch("src.gui_2.immapp.run"), + patch("src.app_controller.AppController._load_active_project"), + patch("src.app_controller.AppController._fetch_models"), + patch.object(App, "_load_fonts"), + patch.object(App, "_post_init"), + patch("src.app_controller.AppController._prune_old_logs"), + patch("src.app_controller.AppController.start_services"), + patch("src.app_controller.AppController._init_ai_and_hooks"), + ): + app = App() + app.project = {} + app.ui_files_base_dir = "." + yield app, mock_pm def test_mma_dashboard_refresh(app_instance: Any) -> None: - app, mock_pm = app_instance - mock_tracks = [ - { - "id": "track_1", - "title": "Track 1", - "status": "new", - "complete": 0, - "total": 0, - "progress": 0.0, - }, - { - "id": "track_2", - "title": "Track 2", - "status": "new", - "complete": 0, - "total": 0, - "progress": 0.0, - }, - ] - mock_pm.get_all_tracks.return_value = mock_tracks - app._refresh_from_project() - assert hasattr(app, "tracks"), "App instance should have a 'tracks' attribute" - assert app.tracks == mock_tracks - assert len(app.tracks) == 2 - assert app.tracks[0]["id"] == "track_1" - assert app.tracks[1]["id"] == "track_2" - mock_pm.get_all_tracks.assert_called_with(app.active_project_root) + app, mock_pm = app_instance + mock_tracks = [ + { + "id": "track_1", + "title": "Track 1", + "status": "new", + "complete": 0, + "total": 0, + "progress": 0.0, + }, + { + "id": "track_2", + "title": "Track 2", + "status": "new", + "complete": 0, + "total": 0, + "progress": 0.0, + }, + ] + mock_pm.get_all_tracks.return_value = mock_tracks + app._refresh_from_project() + assert hasattr(app, "tracks"), "App instance should have a 'tracks' attribute" + assert app.tracks == mock_tracks + assert len(app.tracks) == 2 + assert app.tracks[0]["id"] == "track_1" + assert app.tracks[1]["id"] == "track_2" + mock_pm.get_all_tracks.assert_called_with(app.active_project_root) def test_mma_dashboard_initialization_refresh(app_instance: Any) -> None: - app, mock_pm = app_instance - mock_tracks = [ - { - "id": "init_track", - "title": "Initial Track", - "status": "new", - "complete": 0, - "total": 0, - "progress": 0.0, - } - ] - mock_pm.get_all_tracks.return_value = mock_tracks - app._refresh_from_project() - assert app.tracks == mock_tracks - assert app.tracks[0]["id"] == "init_track" + app, mock_pm = app_instance + mock_tracks = [ + { + "id": "init_track", + "title": "Initial Track", + "status": "new", + "complete": 0, + "total": 0, + "progress": 0.0, + } + ] + mock_pm.get_all_tracks.return_value = mock_tracks + app._refresh_from_project() + assert app.tracks == mock_tracks + assert app.tracks[0]["id"] == "init_track" diff --git a/tests/test_mma_node_editor.py b/tests/test_mma_node_editor.py index 60c8cc61..f6552cad 100644 --- a/tests/test_mma_node_editor.py +++ b/tests/test_mma_node_editor.py @@ -4,48 +4,48 @@ import sys from imgui_bundle import imgui_node_editor as ed def test_imgui_node_editor_import(): - assert ed is not None - assert hasattr(ed, "begin_node") - assert hasattr(ed, "end_node") + assert ed is not None + assert hasattr(ed, "begin_node") + assert hasattr(ed, "end_node") def test_app_has_node_editor_attrs(): - from src.gui_2 import App - # Use patch to avoid initializing the entire App - with patch('src.app_controller.AppController'), \ - patch('src.gui_2.immapp.RunnerParams'), \ - patch('imgui_bundle.imgui_node_editor.create_editor'): - app = App.__new__(App) - # Manually set what we expect from __init__ - app.node_editor_config = ed.Config() - app.node_editor_ctx = ed.create_editor(app.node_editor_config) - app.ui_selected_ticket_id = None + from src.gui_2 import App + # Use patch to avoid initializing the entire App + with patch('src.app_controller.AppController'), \ + patch('src.gui_2.immapp.RunnerParams'), \ + patch('imgui_bundle.imgui_node_editor.create_editor'): + app = App.__new__(App) + # Manually set what we expect from __init__ + app.node_editor_config = ed.Config() + app.node_editor_ctx = ed.create_editor(app.node_editor_config) + app.ui_selected_ticket_id = None - assert hasattr(app, 'node_editor_config') - assert hasattr(app, 'node_editor_ctx') - assert hasattr(app, 'ui_selected_ticket_id') + assert hasattr(app, 'node_editor_config') + assert hasattr(app, 'node_editor_ctx') + assert hasattr(app, 'ui_selected_ticket_id') def test_node_id_stability(): - """Verify that node/pin IDs generated via hash are stable for the same input.""" - tid = "T-001" - int_id = abs(hash(tid)) - in_pin_id = abs(hash(tid + "_in")) - out_pin_id = abs(hash(tid + "_out")) + """Verify that node/pin IDs generated via hash are stable for the same input.""" + tid = "T-001" + int_id = abs(hash(tid)) + in_pin_id = abs(hash(tid + "_in")) + out_pin_id = abs(hash(tid + "_out")) - # Re-generate and compare - assert int_id == abs(hash(tid)) - assert in_pin_id == abs(hash(tid + "_in")) - assert out_pin_id == abs(hash(tid + "_out")) + # Re-generate and compare + assert int_id == abs(hash(tid)) + assert in_pin_id == abs(hash(tid + "_in")) + assert out_pin_id == abs(hash(tid + "_out")) - # Verify uniqueness - assert int_id != in_pin_id - assert int_id != out_pin_id - assert in_pin_id != out_pin_id + # Verify uniqueness + assert int_id != in_pin_id + assert int_id != out_pin_id + assert in_pin_id != out_pin_id def test_link_id_stability(): - """Verify that link IDs are stable.""" - source_tid = "T-001" - target_tid = "T-002" - link_id = abs(hash(source_tid + "_" + target_tid)) + """Verify that link IDs are stable.""" + source_tid = "T-001" + target_tid = "T-002" + link_id = abs(hash(source_tid + "_" + target_tid)) - assert link_id == abs(hash(source_tid + "_" + target_tid)) - assert link_id != abs(hash(target_tid + "_" + source_tid)) \ No newline at end of file + assert link_id == abs(hash(source_tid + "_" + target_tid)) + assert link_id != abs(hash(target_tid + "_" + source_tid)) \ No newline at end of file diff --git a/tests/test_mma_orchestration_gui.py b/tests/test_mma_orchestration_gui.py index b089c7be..b4345b37 100644 --- a/tests/test_mma_orchestration_gui.py +++ b/tests/test_mma_orchestration_gui.py @@ -8,107 +8,107 @@ from src.gui_2 import App def test_mma_ui_state_initialization(app_instance: App) -> None: - """Verifies that the new MMA UI state variables are initialized correctly.""" - assert hasattr(app_instance, "ui_epic_input") - assert hasattr(app_instance, "proposed_tracks") - assert hasattr(app_instance, "_show_track_proposal_modal") - assert hasattr(app_instance, "mma_streams") - assert app_instance.ui_epic_input == "" - assert app_instance.proposed_tracks == [] - assert app_instance._show_track_proposal_modal is False - assert app_instance.mma_streams == {} + """Verifies that the new MMA UI state variables are initialized correctly.""" + assert hasattr(app_instance, "ui_epic_input") + assert hasattr(app_instance, "proposed_tracks") + assert hasattr(app_instance, "_show_track_proposal_modal") + assert hasattr(app_instance, "mma_streams") + assert app_instance.ui_epic_input == "" + assert app_instance.proposed_tracks == [] + assert app_instance._show_track_proposal_modal is False + assert app_instance.mma_streams == {} def test_process_pending_gui_tasks_show_track_proposal(app_instance: App) -> None: - """Verifies that the 'show_track_proposal' action correctly updates the UI state.""" - mock_tracks = [{"id": "track_1", "title": "Test Track"}] - task = {"action": "show_track_proposal", "payload": mock_tracks} - app_instance._pending_gui_tasks.append(task) - app_instance._process_pending_gui_tasks() - assert app_instance.proposed_tracks == mock_tracks - assert app_instance._show_track_proposal_modal is True + """Verifies that the 'show_track_proposal' action correctly updates the UI state.""" + mock_tracks = [{"id": "track_1", "title": "Test Track"}] + task = {"action": "show_track_proposal", "payload": mock_tracks} + app_instance._pending_gui_tasks.append(task) + app_instance._process_pending_gui_tasks() + assert app_instance.proposed_tracks == mock_tracks + assert app_instance._show_track_proposal_modal is True def test_cb_plan_epic_launches_thread(app_instance: App) -> None: - """Verifies that _cb_plan_epic launches a thread and eventually queues a task.""" - app_instance.ui_epic_input = "Develop a new feature" - app_instance.active_project_path = "test_project.toml" - mock_tracks = [{"id": "track_1", "title": "Test Track"}] - with ( - patch( - "src.orchestrator_pm.get_track_history_summary", - return_value="History summary", - ) as mock_get_history, - patch( - "src.orchestrator_pm.generate_tracks", return_value=mock_tracks - ) as mock_gen_tracks, - patch("src.aggregate.build_file_items", return_value=[]), - ): - with ( - patch("src.project_manager.load_project", return_value={}), - patch("src.project_manager.flat_config", return_value={}), - ): - app_instance._cb_plan_epic() - max_wait = 5 - start_time = time.time() - while ( - len(app_instance._pending_gui_tasks) < 3 - and time.time() - start_time < max_wait - ): - time.sleep(0.1) - assert len(app_instance._pending_gui_tasks) >= 3 - actions = [t["action"] for t in app_instance._pending_gui_tasks] - assert "handle_ai_response" in actions - assert "show_track_proposal" in actions - mock_get_history.assert_called_once() - mock_gen_tracks.assert_called_once() + """Verifies that _cb_plan_epic launches a thread and eventually queues a task.""" + app_instance.ui_epic_input = "Develop a new feature" + app_instance.active_project_path = "test_project.toml" + mock_tracks = [{"id": "track_1", "title": "Test Track"}] + with ( + patch( + "src.orchestrator_pm.get_track_history_summary", + return_value="History summary", + ) as mock_get_history, + patch( + "src.orchestrator_pm.generate_tracks", return_value=mock_tracks + ) as mock_gen_tracks, + patch("src.aggregate.build_file_items", return_value=[]), + ): + with ( + patch("src.project_manager.load_project", return_value={}), + patch("src.project_manager.flat_config", return_value={}), + ): + app_instance._cb_plan_epic() + max_wait = 5 + start_time = time.time() + while ( + len(app_instance._pending_gui_tasks) < 3 + and time.time() - start_time < max_wait + ): + time.sleep(0.1) + assert len(app_instance._pending_gui_tasks) >= 3 + actions = [t["action"] for t in app_instance._pending_gui_tasks] + assert "handle_ai_response" in actions + assert "show_track_proposal" in actions + mock_get_history.assert_called_once() + mock_gen_tracks.assert_called_once() def test_process_pending_gui_tasks_mma_spawn_approval(app_instance: App) -> None: - """Verifies that the 'mma_spawn_approval' action correctly updates the UI state.""" - task = { - "action": "mma_spawn_approval", - "ticket_id": "T1", - "role": "Tier 3 Worker", - "prompt": "Test Prompt", - "context_md": "Test Context", - "dialog_container": [None], - } - app_instance._pending_gui_tasks.append(task) - app_instance._process_pending_gui_tasks() - assert app_instance._pending_mma_spawn == task - assert app_instance._mma_spawn_prompt == "Test Prompt" - assert app_instance._mma_spawn_context == "Test Context" - assert app_instance._mma_spawn_open is True - assert app_instance._mma_spawn_edit_mode is False - assert task["dialog_container"][0] is not None + """Verifies that the 'mma_spawn_approval' action correctly updates the UI state.""" + task = { + "action": "mma_spawn_approval", + "ticket_id": "T1", + "role": "Tier 3 Worker", + "prompt": "Test Prompt", + "context_md": "Test Context", + "dialog_container": [None], + } + app_instance._pending_gui_tasks.append(task) + app_instance._process_pending_gui_tasks() + assert app_instance._pending_mma_spawn == task + assert app_instance._mma_spawn_prompt == "Test Prompt" + assert app_instance._mma_spawn_context == "Test Context" + assert app_instance._mma_spawn_open is True + assert app_instance._mma_spawn_edit_mode is False + assert task["dialog_container"][0] is not None def test_handle_ai_response_with_stream_id(app_instance: App) -> None: - """Verifies routing to mma_streams.""" - task = { - "action": "handle_ai_response", - "payload": { - "text": "Tier 1 Strategy Content", - "stream_id": "Tier 1", - "status": "Thinking...", - }, - } - app_instance._pending_gui_tasks.append(task) - app_instance._process_pending_gui_tasks() - assert app_instance.mma_streams.get("Tier 1") == "Tier 1 Strategy Content" - assert app_instance.ai_status == "Thinking..." - assert app_instance.ai_response == "" + """Verifies routing to mma_streams.""" + task = { + "action": "handle_ai_response", + "payload": { + "text": "Tier 1 Strategy Content", + "stream_id": "Tier 1", + "status": "Thinking...", + }, + } + app_instance._pending_gui_tasks.append(task) + app_instance._process_pending_gui_tasks() + assert app_instance.mma_streams.get("Tier 1") == "Tier 1 Strategy Content" + assert app_instance.ai_status == "Thinking..." + assert app_instance.ai_response == "" def test_handle_ai_response_fallback(app_instance: App) -> None: - """Verifies fallback to ai_response when stream_id is missing.""" - task = { - "action": "handle_ai_response", - "payload": {"text": "Regular AI Response", "status": "done"}, - } - app_instance._pending_gui_tasks.append(task) - app_instance._process_pending_gui_tasks() - assert app_instance.ai_response == "Regular AI Response" - assert app_instance.ai_status == "done" - assert len(app_instance.mma_streams) == 0 \ No newline at end of file + """Verifies fallback to ai_response when stream_id is missing.""" + task = { + "action": "handle_ai_response", + "payload": {"text": "Regular AI Response", "status": "done"}, + } + app_instance._pending_gui_tasks.append(task) + app_instance._process_pending_gui_tasks() + assert app_instance.ai_response == "Regular AI Response" + assert app_instance.ai_status == "done" + assert len(app_instance.mma_streams) == 0 \ No newline at end of file diff --git a/tests/test_py_struct_tools.py b/tests/test_py_struct_tools.py index 2b44c7d7..a8676980 100644 --- a/tests/test_py_struct_tools.py +++ b/tests/test_py_struct_tools.py @@ -5,140 +5,140 @@ from src import mcp_client @pytest.fixture def temp_py_file(tmp_path): - p = tmp_path / "sample.py" - content = """class MyClass: - \"\"\"Docstring.\"\"\" - def method1(self): - print("m1") + p = tmp_path / "sample.py" + content = """class MyClass: +\"\"\"Docstring.\"\"\" +def method1(self): +print("m1") def top_func(): - \"\"\"Top doc.\"\"\" - print("top") +\"\"\"Top doc.\"\"\" +print("top") """ - p.write_text(content, encoding="utf-8") - return str(p) + p.write_text(content, encoding="utf-8") + return str(p) def test_find_definition_range(): - source = """class A: - def m(self): pass + source = """class A: +def m(self): pass def f(): pass """ - assert py_struct_tools.find_definition_range(source, "A") == (1, 2) - assert py_struct_tools.find_definition_range(source, "A.m") == (2, 2) - assert py_struct_tools.find_definition_range(source, "f") == (3, 3) - assert py_struct_tools.find_definition_range(source, "nonexistent") is None + assert py_struct_tools.find_definition_range(source, "A") == (1, 2) + assert py_struct_tools.find_definition_range(source, "A.m") == (2, 2) + assert py_struct_tools.find_definition_range(source, "f") == (3, 3) + assert py_struct_tools.find_definition_range(source, "nonexistent") is None def test_shift_indentation(): - payload = "def f():\n print('hi')" # 2-space - shifted = py_struct_tools.shift_indentation(payload, 1) - assert shifted == " def f():\n print('hi')" # wait, shift_indentation strips min and prepends. + payload = "def f():\n print('hi')" # 2-space + shifted = py_struct_tools.shift_indentation(payload, 1) + assert shifted == " def f():\n print('hi')" # wait, shift_indentation strips min and prepends. - # Let's re-test shift_indentation logic - # Original: - # line 1: 'def f():' (0 indent) - # line 2: ' print('hi')' (2 indent) - # min_indent = 0 - # Prepend 1 space: - # ' def f():' - # ' print('hi')' + # Let's re-test shift_indentation logic + # Original: + # line 1: 'def f():' (0 indent) + # line 2: ' print('hi')' (2 indent) + # min_indent = 0 + # Prepend 1 space: + # ' def f():' + # ' print('hi')' - # If payload was: - # def f(): - # print('hi') - # min_indent = 2 - # target_depth = 1 - # ' def f():' - # ' print('hi')' + # If payload was: + # def f(): + # print('hi') + # min_indent = 2 + # target_depth = 1 + # ' def f():' + # ' print('hi')' - payload2 = " def f():\n print('hi')" - shifted2 = py_struct_tools.shift_indentation(payload2, 1) - assert shifted2 == " def f():\n print('hi')" + payload2 = " def f():\n print('hi')" + shifted2 = py_struct_tools.shift_indentation(payload2, 1) + assert shifted2 == " def f():\n print('hi')" def test_py_remove_def(temp_py_file): - err = py_struct_tools.py_remove_def(temp_py_file, "MyClass.method1") - assert err == "" - with open(temp_py_file, 'r') as f: - content = f.read() - assert "def method1" not in content - assert "class MyClass" in content + err = py_struct_tools.py_remove_def(temp_py_file, "MyClass.method1") + assert err == "" + with open(temp_py_file, 'r') as f: + content = f.read() + assert "def method1" not in content + assert "class MyClass" in content def test_py_add_def(temp_py_file): - new_code = "def method2(self):\n print('m2')" - err = py_struct_tools.py_add_def(temp_py_file, "MyClass", new_code, "after", "method1") - assert err == "" - with open(temp_py_file, 'r') as f: - content = f.read() - assert "def method2" in content - # Check 1-space indentation - assert " def method2(self):" in content + new_code = "def method2(self):\n print('m2')" + err = py_struct_tools.py_add_def(temp_py_file, "MyClass", new_code, "after", "method1") + assert err == "" + with open(temp_py_file, 'r') as f: + content = f.read() + assert "def method2" in content + # Check 1-space indentation + assert " def method2(self):" in content def test_py_region_wrap(temp_py_file): - err = py_struct_tools.py_region_wrap(temp_py_file, 6, 8, "MyRegion") - assert err == "" - with open(temp_py_file, 'r') as f: - content = f.read() - assert "#region: MyRegion" in content - assert "#endregion: MyRegion" in content + err = py_struct_tools.py_region_wrap(temp_py_file, 6, 8, "MyRegion") + assert err == "" + with open(temp_py_file, 'r') as f: + content = f.read() + assert "#region: MyRegion" in content + assert "#endregion: MyRegion" in content def test_mcp_dispatch_integration(temp_py_file): - # Mock allowlist - mcp_client.configure([{"path": temp_py_file}]) + # Mock allowlist + mcp_client.configure([{"path": temp_py_file}]) - # Test py_remove_def - result = mcp_client.dispatch("py_remove_def", {"path": temp_py_file, "name": "top_func"}) - assert result == "" - with open(temp_py_file, 'r') as f: - content = f.read() - assert "def top_func" not in content + # Test py_remove_def + result = mcp_client.dispatch("py_remove_def", {"path": temp_py_file, "name": "top_func"}) + assert result == "" + with open(temp_py_file, 'r') as f: + content = f.read() + assert "def top_func" not in content - # Test py_add_def (module level top) - result = mcp_client.dispatch("py_add_def", { - "path": temp_py_file, - "name": "", - "new_content": "def head_func():\n print('head')", - "anchor_type": "top" - }) - assert result == "" - with open(temp_py_file, 'r') as f: - content = f.read() - assert content.startswith("def head_func") + # Test py_add_def (module level top) + result = mcp_client.dispatch("py_add_def", { + "path": temp_py_file, + "name": "", + "new_content": "def head_func():\n print('head')", + "anchor_type": "top" + }) + assert result == "" + with open(temp_py_file, 'r') as f: + content = f.read() + assert content.startswith("def head_func") - # Test py_add_def (class bottom) - result = mcp_client.dispatch("py_add_def", { - "path": temp_py_file, - "name": "MyClass", - "new_content": "def tail_method(self):\n print('tail')", - "anchor_type": "bottom" - }) - assert result == "" - with open(temp_py_file, 'r') as f: - content = f.read() - assert "def tail_method" in content - assert " def tail_method(self):" in content # Check indent + # Test py_add_def (class bottom) + result = mcp_client.dispatch("py_add_def", { + "path": temp_py_file, + "name": "MyClass", + "new_content": "def tail_method(self):\n print('tail')", + "anchor_type": "bottom" + }) + assert result == "" + with open(temp_py_file, 'r') as f: + content = f.read() + assert "def tail_method" in content + assert " def tail_method(self):" in content # Check indent - # Test py_move_def (cross-file simulated with same file) - # We move method1 to after tail_method - result = mcp_client.dispatch("py_move_def", { - "src_path": temp_py_file, - "dest_path": temp_py_file, - "name": "MyClass.method1", - "dest_name": "MyClass", - "anchor_type": "after", - "anchor_symbol": "tail_method" - }) - assert result == "" - with open(temp_py_file, 'r') as f: - content = f.read() - # method1 should now be AFTER tail_method - assert content.find("def method1") > content.find("def tail_method") + # Test py_move_def (cross-file simulated with same file) + # We move method1 to after tail_method + result = mcp_client.dispatch("py_move_def", { + "src_path": temp_py_file, + "dest_path": temp_py_file, + "name": "MyClass.method1", + "dest_name": "MyClass", + "anchor_type": "after", + "anchor_symbol": "tail_method" + }) + assert result == "" + with open(temp_py_file, 'r') as f: + content = f.read() + # method1 should now be AFTER tail_method + assert content.find("def method1") > content.find("def tail_method") def test_mcp_dispatch_errors(temp_py_file): - mcp_client.configure([{"path": temp_py_file}]) + mcp_client.configure([{"path": temp_py_file}]) - # Non-existent symbol - result = mcp_client.dispatch("py_remove_def", {"path": temp_py_file, "name": "NoSuchSymbol"}) - assert "ERROR" in result or "not found" in result + # Non-existent symbol + result = mcp_client.dispatch("py_remove_def", {"path": temp_py_file, "name": "NoSuchSymbol"}) + assert "ERROR" in result or "not found" in result - # Denied path - result = mcp_client.dispatch("py_remove_def", {"path": "C:/windows/system32/cmd.exe", "name": "foo"}) - assert "ACCESS DENIED" in result + # Denied path + result = mcp_client.dispatch("py_remove_def", {"path": "C:/windows/system32/cmd.exe", "name": "foo"}) + assert "ACCESS DENIED" in result diff --git a/tests/test_thinking_persistence.py b/tests/test_thinking_persistence.py index c30f61c3..fa10ff79 100644 --- a/tests/test_thinking_persistence.py +++ b/tests/test_thinking_persistence.py @@ -7,88 +7,88 @@ from src.models import ThinkingSegment def test_save_and_load_history_with_thinking_segments(): - with tempfile.TemporaryDirectory() as tmpdir: - project_path = Path(tmpdir) / "test_project" - project_path.mkdir() + with tempfile.TemporaryDirectory() as tmpdir: + project_path = Path(tmpdir) / "test_project" + project_path.mkdir() - project_file = project_path / "test_project.toml" - project_file.write_text("[project]\nname = 'test'\n") + project_file = project_path / "test_project.toml" + project_file.write_text("[project]\nname = 'test'\n") - history_data = { - "entries": [ - { - "role": "AI", - "content": "Here's the response", - "thinking_segments": [ - {"content": "Let me think about this...", "marker": "thinking"} - ], - "ts": "2026-03-13T10:00:00", - "collapsed": False, - }, - { - "role": "User", - "content": "Hello", - "ts": "2026-03-13T09:00:00", - "collapsed": False, - }, - ] - } + history_data = { + "entries": [ + { + "role": "AI", + "content": "Here's the response", + "thinking_segments": [ + {"content": "Let me think about this...", "marker": "thinking"} + ], + "ts": "2026-03-13T10:00:00", + "collapsed": False, + }, + { + "role": "User", + "content": "Hello", + "ts": "2026-03-13T09:00:00", + "collapsed": False, + }, + ] + } - project_manager.save_project( - {"project": {"name": "test"}}, project_file, disc_data=history_data - ) + project_manager.save_project( + {"project": {"name": "test"}}, project_file, disc_data=history_data + ) - loaded = project_manager.load_history(project_file) + loaded = project_manager.load_history(project_file) - assert "entries" in loaded - assert len(loaded["entries"]) == 2 + assert "entries" in loaded + assert len(loaded["entries"]) == 2 - ai_entry = loaded["entries"][0] - assert ai_entry["role"] == "AI" - assert ai_entry["content"] == "Here's the response" - assert "thinking_segments" in ai_entry - assert len(ai_entry["thinking_segments"]) == 1 - assert ( - ai_entry["thinking_segments"][0]["content"] == "Let me think about this..." - ) + ai_entry = loaded["entries"][0] + assert ai_entry["role"] == "AI" + assert ai_entry["content"] == "Here's the response" + assert "thinking_segments" in ai_entry + assert len(ai_entry["thinking_segments"]) == 1 + assert ( + ai_entry["thinking_segments"][0]["content"] == "Let me think about this..." + ) - user_entry = loaded["entries"][1] - assert user_entry["role"] == "User" - assert "thinking_segments" not in user_entry + user_entry = loaded["entries"][1] + assert user_entry["role"] == "User" + assert "thinking_segments" not in user_entry def test_entry_to_str_with_thinking(): - entry = { - "role": "AI", - "content": "Response text", - "thinking_segments": [{"content": "Thinking...", "marker": "thinking"}], - "ts": "2026-03-13T10:00:00", - } - result = project_manager.entry_to_str(entry) - assert "@2026-03-13T10:00:00" in result - assert "AI:" in result - assert "Response text" in result + entry = { + "role": "AI", + "content": "Response text", + "thinking_segments": [{"content": "Thinking...", "marker": "thinking"}], + "ts": "2026-03-13T10:00:00", + } + result = project_manager.entry_to_str(entry) + assert "@2026-03-13T10:00:00" in result + assert "AI:" in result + assert "Response text" in result def test_str_to_entry_with_thinking(): - raw = "@2026-03-13T10:00:00\nAI:\nResponse text" - roles = ["User", "AI", "Vendor API", "System", "Reasoning"] - result = project_manager.str_to_entry(raw, roles) - assert result["role"] == "AI" - assert result["content"] == "Response text" - assert "ts" in result + raw = "@2026-03-13T10:00:00\nAI:\nResponse text" + roles = ["User", "AI", "Vendor API", "System", "Reasoning"] + result = project_manager.str_to_entry(raw, roles) + assert result["role"] == "AI" + assert result["content"] == "Response text" + assert "ts" in result def test_clean_nones_removes_thinking(): - entry = {"role": "AI", "content": "Test", "thinking_segments": None, "ts": None} - cleaned = project_manager.clean_nones(entry) - assert "thinking_segments" not in cleaned - assert "ts" not in cleaned + entry = {"role": "AI", "content": "Test", "thinking_segments": None, "ts": None} + cleaned = project_manager.clean_nones(entry) + assert "thinking_segments" not in cleaned + assert "ts" not in cleaned if __name__ == "__main__": - test_save_and_load_history_with_thinking_segments() - test_entry_to_str_with_thinking() - test_str_to_entry_with_thinking() - test_clean_nones_removes_thinking() - print("All project_manager thinking tests passed!") + test_save_and_load_history_with_thinking_segments() + test_entry_to_str_with_thinking() + test_str_to_entry_with_thinking() + test_clean_nones_removes_thinking() + print("All project_manager thinking tests passed!") diff --git a/tests/test_tier4_interceptor.py b/tests/test_tier4_interceptor.py index d20d88ca..5dbcc2e3 100644 --- a/tests/test_tier4_interceptor.py +++ b/tests/test_tier4_interceptor.py @@ -3,134 +3,134 @@ from src.shell_runner import run_powershell from src import ai_client def test_run_powershell_qa_callback_on_failure(vlogger) -> None: - """Test that qa_callback is called when a powershell command fails (non-zero exit code).""" - qa_callback = MagicMock(return_value="FIX: Check path") + """Test that qa_callback is called when a powershell command fails (non-zero exit code).""" + qa_callback = MagicMock(return_value="FIX: Check path") - vlogger.log_state("QA Callback Called", False, "pending") - # Simulate a failure - with patch("subprocess.Popen") as mock_popen: - mock_process = MagicMock() - mock_process.communicate.return_value = ("stdout", "stderr error") - mock_process.returncode = 1 - mock_popen.return_value = mock_process + vlogger.log_state("QA Callback Called", False, "pending") + # Simulate a failure + with patch("subprocess.Popen") as mock_popen: + mock_process = MagicMock() + mock_process.communicate.return_value = ("stdout", "stderr error") + mock_process.returncode = 1 + mock_popen.return_value = mock_process - result = run_powershell("invalid_cmd", ".", qa_callback=qa_callback) + result = run_powershell("invalid_cmd", ".", qa_callback=qa_callback) - vlogger.log_state("QA Callback Called", "pending", str(qa_callback.called)) - assert qa_callback.called - assert "QA ANALYSIS:\nFIX: Check path" in result - vlogger.finalize("Tier 4 Interceptor", "PASS", "Interceptor triggered and result appended.") + vlogger.log_state("QA Callback Called", "pending", str(qa_callback.called)) + assert qa_callback.called + assert "QA ANALYSIS:\nFIX: Check path" in result + vlogger.finalize("Tier 4 Interceptor", "PASS", "Interceptor triggered and result appended.") def test_run_powershell_qa_callback_on_stderr_only(vlogger) -> None: - """Test that qa_callback is called when a powershell command has stderr output, even if exit code is 0.""" - qa_callback = MagicMock(return_value="WARNING: Check permissions") + """Test that qa_callback is called when a powershell command has stderr output, even if exit code is 0.""" + qa_callback = MagicMock(return_value="WARNING: Check permissions") - with patch("subprocess.Popen") as mock_popen: - mock_process = MagicMock() - mock_process.communicate.return_value = ("stdout", "non-fatal warning") - mock_process.returncode = 0 - mock_popen.return_value = mock_process + with patch("subprocess.Popen") as mock_popen: + mock_process = MagicMock() + mock_process.communicate.return_value = ("stdout", "non-fatal warning") + mock_process.returncode = 0 + mock_popen.return_value = mock_process - result = run_powershell("cmd_with_warning", ".", qa_callback=qa_callback) + result = run_powershell("cmd_with_warning", ".", qa_callback=qa_callback) - assert qa_callback.called - assert "QA ANALYSIS:\nWARNING: Check permissions" in result - vlogger.finalize("Tier 4 Non-Fatal Interceptor", "PASS", "Interceptor triggered for non-fatal stderr.") + assert qa_callback.called + assert "QA ANALYSIS:\nWARNING: Check permissions" in result + vlogger.finalize("Tier 4 Non-Fatal Interceptor", "PASS", "Interceptor triggered for non-fatal stderr.") def test_run_powershell_no_qa_callback_on_success() -> None: - qa_callback = MagicMock() - with patch("subprocess.Popen") as mock_popen: - mock_process = MagicMock() - mock_process.communicate.return_value = ("ok", "") - mock_process.returncode = 0 - mock_popen.return_value = mock_process + qa_callback = MagicMock() + with patch("subprocess.Popen") as mock_popen: + mock_process = MagicMock() + mock_process.communicate.return_value = ("ok", "") + mock_process.returncode = 0 + mock_popen.return_value = mock_process - result = run_powershell("success_cmd", ".", qa_callback=qa_callback) - assert not qa_callback.called - assert "QA ANALYSIS" not in result + result = run_powershell("success_cmd", ".", qa_callback=qa_callback) + assert not qa_callback.called + assert "QA ANALYSIS" not in result def test_run_powershell_optional_qa_callback() -> None: - # Should not crash if qa_callback is None - with patch("subprocess.Popen") as mock_popen: - mock_process = MagicMock() - mock_process.communicate.return_value = ("error", "error") - mock_process.returncode = 1 - mock_popen.return_value = mock_process + # Should not crash if qa_callback is None + with patch("subprocess.Popen") as mock_popen: + mock_process = MagicMock() + mock_process.communicate.return_value = ("error", "error") + mock_process.returncode = 1 + mock_popen.return_value = mock_process - result = run_powershell("fail_no_cb", ".", qa_callback=None) - assert "EXIT CODE: 1" in result + result = run_powershell("fail_no_cb", ".", qa_callback=None) + assert "EXIT CODE: 1" in result def test_end_to_end_tier4_integration(vlogger) -> None: - """ + """ - 1. Start a task that triggers a tool failure. - 2. Ensure Tier 4 QA analysis is run. - 3. Verify the analysis is merged into the next turn's prompt. - """ - # Trigger a send that results in a tool failure - # (In reality, the tool loop handles this) - # For unit testing, we just check if ai_client.send passes the qa_callback - # to the underlying provider function. - pass - vlogger.finalize("E2E Tier 4 Integration", "PASS", "ai_client.run_tier4_analysis correctly called and results merged.") + 1. Start a task that triggers a tool failure. + 2. Ensure Tier 4 QA analysis is run. + 3. Verify the analysis is merged into the next turn's prompt. + """ + # Trigger a send that results in a tool failure + # (In reality, the tool loop handles this) + # For unit testing, we just check if ai_client.send passes the qa_callback + # to the underlying provider function. + pass + vlogger.finalize("E2E Tier 4 Integration", "PASS", "ai_client.run_tier4_analysis correctly called and results merged.") def test_ai_client_passes_qa_callback() -> None: - """Verifies that ai_client.send passes the qa_callback down to the provider function.""" - qa_callback = lambda x: "analysis" + """Verifies that ai_client.send passes the qa_callback down to the provider function.""" + qa_callback = lambda x: "analysis" - with patch("src.ai_client._send_gemini") as mock_send: - ai_client.set_provider("gemini", "gemini-2.5-flash-lite") - ai_client.send("ctx", "msg", qa_callback=qa_callback) - args, kwargs = mock_send.call_args - # It might be passed as positional or keyword depending on how 'send' calls it - # send() calls _send_gemini(md_content, user_message, base_dir, ..., qa_callback, ...) - # In current impl of send(), it is the 7th argument after md_content, user_msg, base_dir, file_items, disc_hist, pre_tool - assert args[6] == qa_callback or kwargs.get("qa_callback") == qa_callback + with patch("src.ai_client._send_gemini") as mock_send: + ai_client.set_provider("gemini", "gemini-2.5-flash-lite") + ai_client.send("ctx", "msg", qa_callback=qa_callback) + args, kwargs = mock_send.call_args + # It might be passed as positional or keyword depending on how 'send' calls it + # send() calls _send_gemini(md_content, user_message, base_dir, ..., qa_callback, ...) + # In current impl of send(), it is the 7th argument after md_content, user_msg, base_dir, file_items, disc_hist, pre_tool + assert args[6] == qa_callback or kwargs.get("qa_callback") == qa_callback def test_gemini_provider_passes_qa_callback_to_run_script() -> None: - """Verifies that _send_gemini passes the qa_callback to _run_script.""" - qa_callback = MagicMock() + """Verifies that _send_gemini passes the qa_callback to _run_script.""" + qa_callback = MagicMock() - # Mock the tool loop behavior - with patch("src.ai_client._run_script", return_value="output") as mock_run_script, \ - patch("src.ai_client._ensure_gemini_client"), \ - patch("src.ai_client._gemini_client") as mock_gen_client: + # Mock the tool loop behavior + with patch("src.ai_client._run_script", return_value="output") as mock_run_script, \ + patch("src.ai_client._ensure_gemini_client"), \ + patch("src.ai_client._gemini_client") as mock_gen_client: - mock_chat = MagicMock() - mock_gen_client.chats.create.return_value = mock_chat + mock_chat = MagicMock() + mock_gen_client.chats.create.return_value = mock_chat - # 1st round: tool call - mock_fc = MagicMock() - mock_fc.name = "run_powershell" - mock_fc.args = {"script": "dir"} - mock_part = MagicMock() - mock_part.function_call = mock_fc - mock_part.text = "" - mock_candidate = MagicMock() - mock_candidate.content.parts = [mock_part] - mock_candidate.finish_reason.name = "STOP" + # 1st round: tool call + mock_fc = MagicMock() + mock_fc.name = "run_powershell" + mock_fc.args = {"script": "dir"} + mock_part = MagicMock() + mock_part.function_call = mock_fc + mock_part.text = "" + mock_candidate = MagicMock() + mock_candidate.content.parts = [mock_part] + mock_candidate.finish_reason.name = "STOP" - mock_resp1 = MagicMock() - mock_resp1.candidates = [mock_candidate] - mock_resp1.usage_metadata.prompt_token_count = 10 - mock_resp1.usage_metadata.candidates_token_count = 5 - mock_resp1.text = "" + mock_resp1 = MagicMock() + mock_resp1.candidates = [mock_candidate] + mock_resp1.usage_metadata.prompt_token_count = 10 + mock_resp1.usage_metadata.candidates_token_count = 5 + mock_resp1.text = "" - # 2nd round: final text - mock_resp2 = MagicMock() - mock_resp2.candidates = [] - mock_resp2.usage_metadata.prompt_token_count = 20 - mock_resp2.usage_metadata.candidates_token_count = 10 - mock_resp2.text = "done" + # 2nd round: final text + mock_resp2 = MagicMock() + mock_resp2.candidates = [] + mock_resp2.usage_metadata.prompt_token_count = 20 + mock_resp2.usage_metadata.candidates_token_count = 10 + mock_resp2.text = "done" - mock_chat.send_message.side_effect = [mock_resp1, mock_resp2] + mock_chat.send_message.side_effect = [mock_resp1, mock_resp2] - ai_client.set_provider("gemini", "gemini-2.5-flash-lite") - ai_client._send_gemini( - md_content="Context", - user_message="Run dir", - base_dir=".", - qa_callback=qa_callback - ) - # Verify _run_script received the qa_callback and patch_callback - mock_run_script.assert_called_with("dir", ".", qa_callback, None) \ No newline at end of file + ai_client.set_provider("gemini", "gemini-2.5-flash-lite") + ai_client._send_gemini( + md_content="Context", + user_message="Run dir", + base_dir=".", + qa_callback=qa_callback + ) + # Verify _run_script received the qa_callback and patch_callback + mock_run_script.assert_called_with("dir", ".", qa_callback, None) \ No newline at end of file diff --git a/tests/test_tiered_aggregation.py b/tests/test_tiered_aggregation.py index c23ac60f..e987490b 100644 --- a/tests/test_tiered_aggregation.py +++ b/tests/test_tiered_aggregation.py @@ -7,56 +7,56 @@ from pathlib import Path from unittest.mock import MagicMock, patch def test_persona_aggregation_strategy(): - p = Persona(name="test_persona", aggregation_strategy="summarize") - assert p.aggregation_strategy == "summarize" + p = Persona(name="test_persona", aggregation_strategy="summarize") + assert p.aggregation_strategy == "summarize" - d = p.to_dict() - assert d.get("aggregation_strategy") == "summarize" + d = p.to_dict() + assert d.get("aggregation_strategy") == "summarize" - p2 = Persona.from_dict("test2", d) - assert p2.aggregation_strategy == "summarize" + p2 = Persona.from_dict("test2", d) + assert p2.aggregation_strategy == "summarize" @patch("src.aggregate.build_markdown_from_items") def test_app_controller_do_generate_uses_persona_strategy(mock_build): - mock_build.return_value = "fake_md" - app = app_controller.AppController() - app.personas = {} - p = Persona(name="p1", aggregation_strategy="full") - app.personas["p1"] = p - app.ui_active_persona = "p1" + mock_build.return_value = "fake_md" + app = app_controller.AppController() + app.personas = {} + p = Persona(name="p1", aggregation_strategy="full") + app.personas["p1"] = p + app.ui_active_persona = "p1" - with patch("src.project_manager.flat_config", return_value={"project": {"name": "test"}, "output": {"output_dir": "."}, "files": {"paths": [], "base_dir": "."}}): - with patch("src.aggregate.build_file_items", return_value=[]): - with patch("pathlib.Path.mkdir"): - with patch("pathlib.Path.write_text"): - with patch.object(app, "_flush_to_project"): - with patch.object(app, "_flush_to_config"): - with patch("src.models.save_config"): - full_md, path, file_items, stable_md, disc = app._do_generate() + with patch("src.project_manager.flat_config", return_value={"project": {"name": "test"}, "output": {"output_dir": "."}, "files": {"paths": [], "base_dir": "."}}): + with patch("src.aggregate.build_file_items", return_value=[]): + with patch("pathlib.Path.mkdir"): + with patch("pathlib.Path.write_text"): + with patch.object(app, "_flush_to_project"): + with patch.object(app, "_flush_to_config"): + with patch("src.models.save_config"): + full_md, path, file_items, stable_md, disc = app._do_generate() - # Verify aggregate.run and build_markdown_no_history received aggregation_strategy="full" - # Actually mock_build captures the inner calls of build_markdown_no_history - call_kwargs = mock_build.call_args[1] - assert call_kwargs.get("aggregation_strategy") == "full" + # Verify aggregate.run and build_markdown_no_history received aggregation_strategy="full" + # Actually mock_build captures the inner calls of build_markdown_no_history + call_kwargs = mock_build.call_args[1] + assert call_kwargs.get("aggregation_strategy") == "full" @patch("src.summarize.summarise_file") @patch("src.multi_agent_conductor.ai_client.send") def test_run_worker_lifecycle_uses_strategy(mock_send, mock_summarise, tmp_path): - mock_send.return_value = "fake response" - mock_summarise.return_value = "fake summary" + mock_send.return_value = "fake response" + mock_summarise.return_value = "fake summary" - test_file = tmp_path / "test.py" - test_file.write_text("def test():\n pass") + test_file = tmp_path / "test.py" + test_file.write_text("def test():\n pass") - ticket = Ticket(id="1", description="test") - context = WorkerContext(ticket_id="1", model_name="test", persona_id="test_persona") + ticket = Ticket(id="1", description="test") + context = WorkerContext(ticket_id="1", model_name="test", persona_id="test_persona") - p = Persona(name="test_persona", aggregation_strategy="summarize") + p = Persona(name="test_persona", aggregation_strategy="summarize") - with patch("src.personas.PersonaManager.load_all", return_value={"test_persona": p}): - multi_agent_conductor.run_worker_lifecycle( - ticket, context, context_files=[str(test_file)] - ) + with patch("src.personas.PersonaManager.load_all", return_value={"test_persona": p}): + multi_agent_conductor.run_worker_lifecycle( + ticket, context, context_files=[str(test_file)] + ) - # Should have called summarise_file because of the "summarize" strategy - mock_summarise.assert_called_once() + # Should have called summarise_file because of the "summarize" strategy + mock_summarise.assert_called_once() diff --git a/tests/test_visual_orchestration.py b/tests/test_visual_orchestration.py index 5464860b..88901f73 100644 --- a/tests/test_visual_orchestration.py +++ b/tests/test_visual_orchestration.py @@ -11,40 +11,40 @@ from src import api_hook_client @pytest.mark.integration def test_mma_epic_lifecycle(live_gui) -> None: - client = api_hook_client.ApiHookClient() - assert client.wait_for_server(timeout=15) + client = api_hook_client.ApiHookClient() + assert client.wait_for_server(timeout=15) - # Reset - client.click("btn_reset") - time.sleep(2) + # Reset + client.click("btn_reset") + time.sleep(2) - # Set provider and path - client.set_value("current_provider", "gemini_cli") - time.sleep(2) - mock_path = os.path.abspath("tests/mock_gemini_cli.py") - client.set_value("gcli_path", f'"{sys.executable}" "{mock_path}"') - time.sleep(2) + # Set provider and path + client.set_value("current_provider", "gemini_cli") + time.sleep(2) + mock_path = os.path.abspath("tests/mock_gemini_cli.py") + client.set_value("gcli_path", f'"{sys.executable}" "{mock_path}"') + time.sleep(2) - # Set epic and click - client.set_value("mma_epic_input", "Add timestamps") - time.sleep(1) - client.click("btn_mma_plan_epic") + # Set epic and click + client.set_value("mma_epic_input", "Add timestamps") + time.sleep(1) + client.click("btn_mma_plan_epic") - # Wait and check - for i in range(30): - time.sleep(1) - status = client.get_mma_status() - proposed = status.get("proposed_tracks", []) - usage = status.get("mma_tier_usage", {}) - t1 = usage.get("Tier 1", {}) + # Wait and check + for i in range(30): + time.sleep(1) + status = client.get_mma_status() + proposed = status.get("proposed_tracks", []) + usage = status.get("mma_tier_usage", {}) + t1 = usage.get("Tier 1", {}) - print( - f"[{i}] Tier1: in={t1.get('input')}, out={t1.get('output')}, proposed={len(proposed)}", - flush=True, - ) + print( + f"[{i}] Tier1: in={t1.get('input')}, out={t1.get('output')}, proposed={len(proposed)}", + flush=True, + ) - if proposed: - print(f"SUCCESS: {proposed}", flush=True) - break + if proposed: + print(f"SUCCESS: {proposed}", flush=True) + break - assert len(proposed) > 0, f"No tracks: {proposed}" + assert len(proposed) > 0, f"No tracks: {proposed}"