Compare commits

...

7 Commits

50 changed files with 557 additions and 544 deletions

View File

@@ -1,5 +1,6 @@
import pytest import pytest
import os import os
from pathlib import Path
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
from scripts.mma_exec import create_parser, get_role_documents, execute_agent, get_model_for_role, get_dependencies from scripts.mma_exec import create_parser, get_role_documents, execute_agent, get_model_for_role, get_dependencies
@@ -80,7 +81,7 @@ def test_execute_agent() -> None:
assert kwargs.get("text") is True assert kwargs.get("text") is True
assert result == mock_stdout assert result == mock_stdout
def test_get_dependencies(tmp_path): def test_get_dependencies(tmp_path: Path) -> None:
content = ( content = (
"import os\n" "import os\n"
"import sys\n" "import sys\n"
@@ -94,7 +95,7 @@ def test_get_dependencies(tmp_path):
import re import re
def test_execute_agent_logging(tmp_path): def test_execute_agent_logging(tmp_path: Path) -> None:
log_file = tmp_path / "mma_delegation.log" log_file = tmp_path / "mma_delegation.log"
# mma_exec now uses logs/agents/ for individual logs and logs/mma_delegation.log for master # mma_exec now uses logs/agents/ for individual logs and logs/mma_delegation.log for master
# We will patch LOG_FILE to point to our temp location # We will patch LOG_FILE to point to our temp location
@@ -113,7 +114,7 @@ def test_execute_agent_logging(tmp_path):
assert test_prompt in log_content # Master log should now have the summary prompt assert test_prompt in log_content # Master log should now have the summary prompt
assert re.search(r"\d{4}-\d{2}-\d{2}", log_content) assert re.search(r"\d{4}-\d{2}-\d{2}", log_content)
def test_execute_agent_tier3_injection(tmp_path): def test_execute_agent_tier3_injection(tmp_path: Path) -> None:
main_content = "import dependency\n\ndef run():\n dependency.do_work()\n" main_content = "import dependency\n\ndef run():\n dependency.do_work()\n"
main_file = tmp_path / "main.py" main_file = tmp_path / "main.py"
main_file.write_text(main_content) main_file.write_text(main_content)

View File

@@ -1,4 +1,4 @@
# Implementation Plan: AI-Optimized Python Style Refactor # Implementation Plan: AI-Optimized Python Style Refactor
## Phase 1: Research and Pilot Tooling ## Phase 1: Research and Pilot Tooling
- [x] Task: Conductor - Define and Test Style Transformation Logic. (Develop or adapt a tool to perform 1-space indentation and newline reduction safely). [c75b926] - [x] Task: Conductor - Define and Test Style Transformation Logic. (Develop or adapt a tool to perform 1-space indentation and newline reduction safely). [c75b926]
@@ -24,11 +24,12 @@
## Phase 4: Codebase-Wide Type Hint Sweep ## Phase 4: Codebase-Wide Type Hint Sweep
- [x] Task: Conductor - Type hint pass on core modules (`api_hook_client.py`, `api_hooks.py`, `log_registry.py`, `performance_monitor.py`, `theme.py`, `theme_2.py`, `gemini_cli_adapter.py`, `multi_agent_conductor.py`, `dag_engine.py`, `events.py`, `file_cache.py`, `models.py`, `log_pruner.py`, `gemini.py`, `orchestrator_pm.py`, `conductor_tech_lead.py`, `outline_tool.py`, `summarize.py`). 46c2f9a - [x] Task: Conductor - Type hint pass on core modules (`api_hook_client.py`, `api_hooks.py`, `log_registry.py`, `performance_monitor.py`, `theme.py`, `theme_2.py`, `gemini_cli_adapter.py`, `multi_agent_conductor.py`, `dag_engine.py`, `events.py`, `file_cache.py`, `models.py`, `log_pruner.py`, `gemini.py`, `orchestrator_pm.py`, `conductor_tech_lead.py`, `outline_tool.py`, `summarize.py`). 46c2f9a
- [~] Task: Conductor - Type hint pass on remaining variable-only files (`ai_client.py` vars, `mcp_client.py` vars, `mma_prompts.py` vars) - [~] Task: Conductor - Type hint pass on remaining variable-only files (`ai_client.py` vars, `mcp_client.py` vars, `mma_prompts.py` vars)
- [ ] Task: Conductor - Type hint pass on scripts (`scripts/*.py`) - [x] Task: Conductor - Type hint pass on scripts (`scripts/*.py`) 53c2bbf
- [ ] Task: Conductor - Type hint pass on simulation modules (`simulation/*.py`) - [x] Task: Conductor - Type hint pass on simulation modules (`simulation/*.py`) ec91c90
- [ ] Task: Conductor - Type hint pass on test files (`tests/*.py`, `conductor/tests/*.py`) - [~] Task: Conductor - Type hint pass on test files (`tests/*.py`, `conductor/tests/*.py`)
- [ ] Task: Conductor - User Manual Verification 'Phase 4: Codebase-Wide Type Hint Sweep' (Protocol in workflow.md) - [ ] Task: Conductor - User Manual Verification 'Phase 4: Codebase-Wide Type Hint Sweep' (Protocol in workflow.md)
--- ---
**Protocol Note:** Each task will follow the Standard Task Workflow (Red/Green phases with Tier 3 Worker delegation). Phase completion will trigger the mandatory Verification and Checkpointing protocol. **Protocol Note:** Each task will follow the Standard Task Workflow (Red/Green phases with Tier 3 Worker delegation). Phase completion will trigger the mandatory Verification and Checkpointing protocol.

View File

@@ -9,9 +9,10 @@ import ast
import re import re
import sys import sys
import os import os
from typing import Any, Callable
BASE = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) BASE: str = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
stats = {"auto_none": 0, "manual_sig": 0, "vars": 0, "errors": []} stats: dict[str, Any] = {"auto_none": 0, "manual_sig": 0, "vars": 0, "errors": []}
def abs_path(filename: str) -> str: def abs_path(filename: str) -> str:
return os.path.join(BASE, filename) return os.path.join(BASE, filename)
@@ -167,7 +168,7 @@ def verify_syntax(filepath: str) -> str:
# ============================================================ # ============================================================
# gui_2.py manual signatures (Tier 3 items) # gui_2.py manual signatures (Tier 3 items)
# ============================================================ # ============================================================
GUI2_MANUAL_SIGS = [ GUI2_MANUAL_SIGS: list[tuple[str, str]] = [
(r'def resolve_pending_action\(self, action_id: str, approved: bool\):', (r'def resolve_pending_action\(self, action_id: str, approved: bool\):',
r'def resolve_pending_action(self, action_id: str, approved: bool) -> bool:'), r'def resolve_pending_action(self, action_id: str, approved: bool) -> bool:'),
(r'def _cb_start_track\(self, user_data=None\):', (r'def _cb_start_track\(self, user_data=None\):',
@@ -185,7 +186,7 @@ GUI2_MANUAL_SIGS = [
# ============================================================ # ============================================================
# gui_legacy.py manual signatures (Tier 3 items) # gui_legacy.py manual signatures (Tier 3 items)
# ============================================================ # ============================================================
LEGACY_MANUAL_SIGS = [ LEGACY_MANUAL_SIGS: list[tuple[str, str]] = [
(r'def _add_kv_row\(parent: str, key: str, val, val_color=None\):', (r'def _add_kv_row\(parent: str, key: str, val, val_color=None\):',
r'def _add_kv_row(parent: str, key: str, val: Any, val_color: tuple[int, int, int] | None = None) -> None:'), r'def _add_kv_row(parent: str, key: str, val: Any, val_color: tuple[int, int, int] | None = None) -> None:'),
(r'def _make_remove_file_cb\(self, idx: int\):', (r'def _make_remove_file_cb\(self, idx: int\):',
@@ -229,7 +230,7 @@ LEGACY_MANUAL_SIGS = [
# ============================================================ # ============================================================
# gui_2.py variable type annotations # gui_2.py variable type annotations
# ============================================================ # ============================================================
GUI2_VAR_REPLACEMENTS = [ GUI2_VAR_REPLACEMENTS: list[tuple[str, str]] = [
(r'^CONFIG_PATH = ', 'CONFIG_PATH: Path = '), (r'^CONFIG_PATH = ', 'CONFIG_PATH: Path = '),
(r'^PROVIDERS = ', 'PROVIDERS: list[str] = '), (r'^PROVIDERS = ', 'PROVIDERS: list[str] = '),
(r'^COMMS_CLAMP_CHARS = ', 'COMMS_CLAMP_CHARS: int = '), (r'^COMMS_CLAMP_CHARS = ', 'COMMS_CLAMP_CHARS: int = '),
@@ -255,7 +256,7 @@ GUI2_VAR_REPLACEMENTS = [
# ============================================================ # ============================================================
# gui_legacy.py variable type annotations # gui_legacy.py variable type annotations
# ============================================================ # ============================================================
LEGACY_VAR_REPLACEMENTS = [ LEGACY_VAR_REPLACEMENTS: list[tuple[str, str]] = [
(r'^CONFIG_PATH = ', 'CONFIG_PATH: Path = '), (r'^CONFIG_PATH = ', 'CONFIG_PATH: Path = '),
(r'^PROVIDERS = ', 'PROVIDERS: list[str] = '), (r'^PROVIDERS = ', 'PROVIDERS: list[str] = '),
(r'^COMMS_CLAMP_CHARS = ', 'COMMS_CLAMP_CHARS: int = '), (r'^COMMS_CLAMP_CHARS = ', 'COMMS_CLAMP_CHARS: int = '),

View File

@@ -8,9 +8,9 @@ import tomllib
import tree_sitter import tree_sitter
import tree_sitter_python import tree_sitter_python
LOG_FILE = 'logs/claude_mma_delegation.log' LOG_FILE: str = 'logs/claude_mma_delegation.log'
MODEL_MAP = { MODEL_MAP: dict[str, str] = {
'tier1-orchestrator': 'claude-opus-4-6', 'tier1-orchestrator': 'claude-opus-4-6',
'tier1': 'claude-opus-4-6', 'tier1': 'claude-opus-4-6',
'tier2-tech-lead': 'claude-sonnet-4-6', 'tier2-tech-lead': 'claude-sonnet-4-6',
@@ -86,7 +86,7 @@ def get_role_documents(role: str) -> list[str]:
return [] return []
def log_delegation(role, full_prompt, result=None, summary_prompt=None): def log_delegation(role: str, full_prompt: str, result: str | None = None, summary_prompt: str | None = None) -> str:
os.makedirs('logs/claude_agents', exist_ok=True) os.makedirs('logs/claude_agents', exist_ok=True)
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
log_file = f'logs/claude_agents/claude_{role}_task_{timestamp}.log' log_file = f'logs/claude_agents/claude_{role}_task_{timestamp}.log'
@@ -137,7 +137,7 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
# Advanced Context: Dependency skeletons for Tier 3 # Advanced Context: Dependency skeletons for Tier 3
injected_context = "" injected_context = ""
UNFETTERED_MODULES = ['mcp_client', 'project_manager', 'events', 'aggregate'] UNFETTERED_MODULES: list[str] = ['mcp_client', 'project_manager', 'events', 'aggregate']
if role in ['tier3', 'tier3-worker']: if role in ['tier3', 'tier3-worker']:
for doc in docs: for doc in docs:
@@ -231,7 +231,7 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
return err_msg return err_msg
def create_parser(): def create_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Claude MMA Execution Script") parser = argparse.ArgumentParser(description="Claude MMA Execution Script")
parser.add_argument( parser.add_argument(
"--role", "--role",
@@ -275,7 +275,7 @@ def main() -> None:
docs = get_role_documents(role) docs = get_role_documents(role)
# Extract @file references from the prompt # Extract @file references from the prompt
file_refs = re.findall(r"@([\w./\\]+)", prompt) file_refs: list[str] = re.findall(r"@([\w./\\]+)", prompt)
for ref in file_refs: for ref in file_refs:
if os.path.exists(ref) and ref not in docs: if os.path.exists(ref) and ref not in docs:
docs.append(ref) docs.append(ref)

View File

@@ -2,14 +2,14 @@ import os
import re import re
with open('mcp_client.py', 'r', encoding='utf-8') as f: with open('mcp_client.py', 'r', encoding='utf-8') as f:
content = f.read() content: str = f.read()
# 1. Add import os if not there # 1. Add import os if not there
if 'import os' not in content: if 'import os' not in content:
content = content.replace('import summarize', 'import os\nimport summarize') content: str = content.replace('import summarize', 'import os\nimport summarize')
# 2. Add the functions before "# ------------------------------------------------------------------ web tools" # 2. Add the functions before "# ------------------------------------------------------------------ web tools"
functions_code = r''' functions_code: str = r'''
def py_find_usages(path: str, name: str) -> str: def py_find_usages(path: str, name: str) -> str:
"""Finds exact string matches of a symbol in a given file or directory.""" """Finds exact string matches of a symbol in a given file or directory."""
p, err = _resolve_and_check(path) p, err = _resolve_and_check(path)
@@ -179,17 +179,17 @@ def get_tree(path: str, max_depth: int = 2) -> str:
# ------------------------------------------------------------------ web tools''' # ------------------------------------------------------------------ web tools'''
content = content.replace('# ------------------------------------------------------------------ web tools', functions_code) content: str = content.replace('# ------------------------------------------------------------------ web tools', functions_code)
# 3. Update TOOL_NAMES # 3. Update TOOL_NAMES
old_tool_names_match = re.search(r'TOOL_NAMES\s*=\s*\{([^}]*)\}', content) old_tool_names_match: re.Match | None = re.search(r'TOOL_NAMES\s*=\s*\{([^}]*)\}', content)
if old_tool_names_match: if old_tool_names_match:
old_names = old_tool_names_match.group(1) old_names: str = old_tool_names_match.group(1)
new_names = old_names + ', "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy", "py_get_docstring", "get_tree"' new_names: str = old_names + ', "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy", "py_get_docstring", "get_tree"'
content = content.replace(old_tool_names_match.group(0), f'TOOL_NAMES = {{{new_names}}}') content: str = content.replace(old_tool_names_match.group(0), f'TOOL_NAMES = {{{new_names}}}')
# 4. Update dispatch # 4. Update dispatch
dispatch_additions = r''' dispatch_additions: str = r'''
if tool_name == "py_find_usages": if tool_name == "py_find_usages":
return py_find_usages(tool_input.get("path", ""), tool_input.get("name", "")) return py_find_usages(tool_input.get("path", ""), tool_input.get("name", ""))
if tool_name == "py_get_imports": if tool_name == "py_get_imports":
@@ -204,10 +204,11 @@ dispatch_additions = r'''
return get_tree(tool_input.get("path", ""), tool_input.get("max_depth", 2)) return get_tree(tool_input.get("path", ""), tool_input.get("max_depth", 2))
return f"ERROR: unknown MCP tool '{tool_name}'" return f"ERROR: unknown MCP tool '{tool_name}'"
''' '''
content = re.sub(r' return f"ERROR: unknown MCP tool \'{tool_name}\'"', dispatch_additions.strip(), content) content: str = re.sub(
r' return f"ERROR: unknown MCP tool \'{tool_name}\'"', dispatch_additions.strip(), content)
# 5. Update MCP_TOOL_SPECS # 5. Update MCP_TOOL_SPECS
mcp_tool_specs_addition = r''' mcp_tool_specs_addition: str = r'''
{ {
"name": "py_find_usages", "name": "py_find_usages",
"description": "Finds exact string matches of a symbol in a given file or directory.", "description": "Finds exact string matches of a symbol in a given file or directory.",
@@ -281,7 +282,8 @@ mcp_tool_specs_addition = r'''
] ]
''' '''
content = re.sub(r'\]\s*$', mcp_tool_specs_addition.strip(), content) content: str = re.sub(
r'\]\s*$', mcp_tool_specs_addition.strip(), content)
with open('mcp_client.py', 'w', encoding='utf-8') as f: with open('mcp_client.py', 'w', encoding='utf-8') as f:
f.write(content) f.write(content)

View File

@@ -8,7 +8,7 @@ import tree_sitter_python
import ast import ast
import datetime import datetime
LOG_FILE = 'logs/mma_delegation.log' LOG_FILE: str = 'logs/mma_delegation.log'
def generate_skeleton(code: str) -> str: def generate_skeleton(code: str) -> str:
""" """
@@ -79,7 +79,7 @@ def get_role_documents(role: str) -> list[str]:
return ['conductor/workflow.md'] return ['conductor/workflow.md']
return [] return []
def log_delegation(role, full_prompt, result=None, summary_prompt=None): def log_delegation(role: str, full_prompt: str, result: str | None = None, summary_prompt: str | None = None) -> str:
os.makedirs('logs/agents', exist_ok=True) os.makedirs('logs/agents', exist_ok=True)
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
log_file = f'logs/agents/mma_{role}_task_{timestamp}.log' log_file = f'logs/agents/mma_{role}_task_{timestamp}.log'
@@ -130,7 +130,7 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
injected_context = "" injected_context = ""
# Whitelist of modules that sub-agents have "unfettered" (full) access to. # Whitelist of modules that sub-agents have "unfettered" (full) access to.
# These will be provided in full if imported, instead of just skeletons. # These will be provided in full if imported, instead of just skeletons.
UNFETTERED_MODULES = ['mcp_client', 'project_manager', 'events', 'aggregate'] UNFETTERED_MODULES: list[str] = ['mcp_client', 'project_manager', 'events', 'aggregate']
if role in ['tier3', 'tier3-worker']: if role in ['tier3', 'tier3-worker']:
for doc in docs: for doc in docs:
if doc.endswith('.py') and os.path.exists(doc): if doc.endswith('.py') and os.path.exists(doc):
@@ -219,7 +219,7 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
log_delegation(role, command_text, err_msg) log_delegation(role, command_text, err_msg)
return err_msg return err_msg
def create_parser(): def create_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="MMA Execution Script") parser = argparse.ArgumentParser(description="MMA Execution Script")
parser.add_argument( parser.add_argument(
"--role", "--role",

View File

@@ -1,24 +1,24 @@
"""Scan all .py files for missing type hints. Writes scan_report.txt.""" """Scan all .py files for missing type hints. Writes scan_report.txt."""
import ast, os import ast, os
SKIP = {'.git', '__pycache__', '.venv', 'venv', 'node_modules', '.claude', '.gemini'} SKIP: set[str] = {'.git', '__pycache__', '.venv', 'venv', 'node_modules', '.claude', '.gemini'}
BASE = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) BASE: str = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
os.chdir(BASE) os.chdir(BASE)
results = {} results: dict[str, tuple[int, int, int, int]] = {}
for root, dirs, files in os.walk('.'): for root, dirs, files in os.walk('.'):
dirs[:] = [d for d in dirs if d not in SKIP] dirs[:] = [d for d in dirs if d not in SKIP]
for f in files: for f in files:
if not f.endswith('.py'): if not f.endswith('.py'):
continue continue
path = os.path.join(root, f).replace('\\', '/') path: str = os.path.join(root, f).replace('\\', '/')
try: try:
with open(path, 'r', encoding='utf-8-sig') as fh: with open(path, 'r', encoding='utf-8-sig') as fh:
tree = ast.parse(fh.read()) tree = ast.parse(fh.read())
except Exception: except Exception:
continue continue
counts = [0, 0, 0] # nr, up, uv counts: list[int] = [0, 0, 0] # nr, up, uv
def scan(scope, prefix=''): def scan(scope: ast.AST, prefix: str = '') -> None:
for node in ast.iter_child_nodes(scope): for node in ast.iter_child_nodes(scope):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
if node.returns is None: if node.returns is None:
@@ -34,16 +34,16 @@ for root, dirs, files in os.walk('.'):
scan(node, prefix=f'{node.name}.') scan(node, prefix=f'{node.name}.')
scan(tree) scan(tree)
nr, up, uv = counts nr, up, uv = counts
total = nr + up + uv total: int = nr + up + uv
if total > 0: if total > 0:
results[path] = (nr, up, uv, total) results[path] = (nr, up, uv, total)
lines = [] lines: list[str] = []
lines.append(f'Files with untyped items: {len(results)}') lines.append(f'Files with untyped items: {len(results)}')
lines.append('') lines.append('')
lines.append(f'{"File":<58} {"NoRet":>6} {"Params":>7} {"Vars":>5} {"Total":>6}') lines.append(f'{"File":<58} {"NoRet":>6} {"Params":>7} {"Vars":>5} {"Total":>6}')
lines.append('-' * 85) lines.append('-' * 85)
gt = 0 gt: int = 0
for path in sorted(results, key=lambda x: results[x][3], reverse=True): for path in sorted(results, key=lambda x: results[x][3], reverse=True):
nr, up, uv, t = results[path] nr, up, uv, t = results[path]
lines.append(f'{path:<58} {nr:>6} {up:>7} {uv:>5} {t:>6}') lines.append(f'{path:<58} {nr:>6} {up:>7} {uv:>5} {t:>6}')
@@ -51,6 +51,6 @@ for path in sorted(results, key=lambda x: results[x][3], reverse=True):
lines.append('-' * 85) lines.append('-' * 85)
lines.append(f'{"TOTAL":<58} {"":>6} {"":>7} {"":>5} {gt:>6}') lines.append(f'{"TOTAL":<58} {"":>6} {"":>7} {"":>5} {gt:>6}')
report = '\n'.join(lines) report: str = '\n'.join(lines)
with open('scan_report.txt', 'w', encoding='utf-8') as f: with open('scan_report.txt', 'w', encoding='utf-8') as f:
f.write(report) f.write(report)

View File

@@ -1,14 +1,14 @@
import sys import sys
import ast import ast
def get_slice(filepath, start_line, end_line): def get_slice(filepath: str, start_line: int | str, end_line: int | str) -> str:
with open(filepath, 'r', encoding='utf-8') as f: with open(filepath, 'r', encoding='utf-8') as f:
lines = f.readlines() lines = f.readlines()
start_idx = int(start_line) - 1 start_idx = int(start_line) - 1
end_idx = int(end_line) end_idx = int(end_line)
return "".join(lines[start_idx:end_idx]) return "".join(lines[start_idx:end_idx])
def set_slice(filepath, start_line, end_line, new_content): def set_slice(filepath: str, start_line: int | str, end_line: int | str, new_content: str) -> None:
with open(filepath, 'r', encoding='utf-8') as f: with open(filepath, 'r', encoding='utf-8') as f:
lines = f.readlines() lines = f.readlines()
start_idx = int(start_line) - 1 start_idx = int(start_line) - 1
@@ -20,7 +20,7 @@ def set_slice(filepath, start_line, end_line, new_content):
with open(filepath, 'w', encoding='utf-8', newline='') as f: with open(filepath, 'w', encoding='utf-8', newline='') as f:
f.writelines(lines) f.writelines(lines)
def get_def(filepath, symbol_name): def get_def(filepath: str, symbol_name: str) -> str:
with open(filepath, 'r', encoding='utf-8') as f: with open(filepath, 'r', encoding='utf-8') as f:
content = f.read() content = f.read()
tree = ast.parse(content) tree = ast.parse(content)
@@ -35,7 +35,7 @@ def get_def(filepath, symbol_name):
return f"{start},{end}{chr(10)}{slice_content}" return f"{start},{end}{chr(10)}{slice_content}"
return "NOT_FOUND" return "NOT_FOUND"
def set_def(filepath, symbol_name, new_content): def set_def(filepath: str, symbol_name: str, new_content: str) -> None:
res = get_def(filepath, symbol_name) res = get_def(filepath, symbol_name)
if res == "NOT_FOUND": if res == "NOT_FOUND":
print(f"Error: Symbol '{symbol_name}' not found in {filepath}") print(f"Error: Symbol '{symbol_name}' not found in {filepath}")

View File

@@ -2,6 +2,7 @@ import sys
import os import os
import time import time
import pytest import pytest
from typing import Any, Optional
from api_hook_client import ApiHookClient from api_hook_client import ApiHookClient
from simulation.workflow_sim import WorkflowSimulator from simulation.workflow_sim import WorkflowSimulator
@@ -17,7 +18,7 @@ class BaseSimulation:
self.sim = WorkflowSimulator(self.client) self.sim = WorkflowSimulator(self.client)
self.project_path = None self.project_path = None
def setup(self, project_name="SimProject"): def setup(self, project_name: str = "SimProject") -> None:
print(f"\n[BaseSim] Connecting to GUI...") print(f"\n[BaseSim] Connecting to GUI...")
if not self.client.wait_for_server(timeout=5): if not self.client.wait_for_server(timeout=5):
raise RuntimeError("Could not connect to GUI. Ensure it is running with --enable-test-hooks") raise RuntimeError("Could not connect to GUI. Ensure it is running with --enable-test-hooks")
@@ -43,20 +44,16 @@ class BaseSimulation:
pass pass
print("[BaseSim] Teardown complete.") print("[BaseSim] Teardown complete.")
def get_value(self, tag): def get_value(self, tag: str) -> Any:
return self.client.get_value(tag) return self.client.get_value(tag)
def wait_for_event(self, event_type, timeout=5): def wait_for_event(self, event_type: str, timeout: int = 5) -> Optional[dict]:
return self.client.wait_for_event(event_type, timeout) return self.client.wait_for_event(event_type, timeout)
def assert_panel_visible(self, panel_tag, msg=None): def assert_panel_visible(self, panel_tag: str, msg: str = None) -> None:
# This assumes we have a hook to check panel visibility or just check if an element in it exists
# For now, we'll check if we can get a value from an element that should be in that panel
# or use a specific hook if available.
# Actually, let's just check if get_indicator_state or similar works for generic tags.
pass pass
def wait_for_element(self, tag, timeout=2): def wait_for_element(self, tag: str, timeout: int = 2) -> bool:
start = time.time() start = time.time()
while time.time() - start < timeout: while time.time() - start < timeout:
try: try:
@@ -67,7 +64,7 @@ class BaseSimulation:
time.sleep(0.1) time.sleep(0.1)
return False return False
def run_sim(sim_class): def run_sim(sim_class: type) -> None:
"""Helper to run a simulation class standalone.""" """Helper to run a simulation class standalone."""
sim = sim_class() sim = sim_class()
try: try:

View File

@@ -4,7 +4,7 @@ import time
from simulation.sim_base import BaseSimulation, run_sim from simulation.sim_base import BaseSimulation, run_sim
class ExecutionSimulation(BaseSimulation): class ExecutionSimulation(BaseSimulation):
def setup(self, project_name="SimProject"): def setup(self, project_name: str = "SimProject") -> None:
super().setup(project_name) super().setup(project_name)
if os.path.exists("hello.ps1"): if os.path.exists("hello.ps1"):
os.remove("hello.ps1") os.remove("hello.ps1")

View File

@@ -1,9 +1,10 @@
import time import time
import random import random
from typing import Any, Callable
import ai_client import ai_client
class UserSimAgent: class UserSimAgent:
def __init__(self, hook_client, model="gemini-2.5-flash-lite"): def __init__(self, hook_client: Any, model: str = "gemini-2.5-flash-lite") -> None:
self.hook_client = hook_client self.hook_client = hook_client
self.model = model self.model = model
self.system_prompt = ( self.system_prompt = (
@@ -13,7 +14,7 @@ class UserSimAgent:
"Do not use markdown blocks for your main message unless you are providing code." "Do not use markdown blocks for your main message unless you are providing code."
) )
def generate_response(self, conversation_history): def generate_response(self, conversation_history: list[dict]) -> str:
""" """
Generates a human-like response based on the conversation history. Generates a human-like response based on the conversation history.
conversation_history: list of dicts with 'role' and 'content' conversation_history: list of dicts with 'role' and 'content'
@@ -38,7 +39,7 @@ class UserSimAgent:
ai_client.set_custom_system_prompt("") ai_client.set_custom_system_prompt("")
return response return response
def perform_action_with_delay(self, action_func, *args, **kwargs): def perform_action_with_delay(self, action_func: Callable, *args: Any, **kwargs: Any) -> Any:
""" """
Executes an action with a human-like delay. Executes an action with a human-like delay.
""" """

View File

@@ -8,7 +8,7 @@ class WorkflowSimulator:
self.client = hook_client self.client = hook_client
self.user_agent = UserSimAgent(hook_client) self.user_agent = UserSimAgent(hook_client)
def setup_new_project(self, name, git_dir, project_path=None): def setup_new_project(self, name: str, git_dir: str, project_path: str = None) -> None:
print(f"Setting up new project: {name}") print(f"Setting up new project: {name}")
if project_path: if project_path:
self.client.click("btn_project_new_automated", user_data=project_path) self.client.click("btn_project_new_automated", user_data=project_path)
@@ -19,13 +19,13 @@ class WorkflowSimulator:
self.client.click("btn_project_save") self.client.click("btn_project_save")
time.sleep(1) time.sleep(1)
def create_discussion(self, name): def create_discussion(self, name: str) -> None:
print(f"Creating discussion: {name}") print(f"Creating discussion: {name}")
self.client.set_value("disc_new_name_input", name) self.client.set_value("disc_new_name_input", name)
self.client.click("btn_disc_create") self.client.click("btn_disc_create")
time.sleep(1) time.sleep(1)
def switch_discussion(self, name): def switch_discussion(self, name: str) -> None:
print(f"Switching to discussion: {name}") print(f"Switching to discussion: {name}")
self.client.select_list_item("disc_listbox", name) self.client.select_list_item("disc_listbox", name)
time.sleep(1) time.sleep(1)
@@ -37,18 +37,18 @@ class WorkflowSimulator:
# without more hooks, but we can verify the button click. # without more hooks, but we can verify the button click.
time.sleep(1) time.sleep(1)
def truncate_history(self, pairs): def truncate_history(self, pairs: int) -> None:
print(f"Truncating history to {pairs} pairs") print(f"Truncating history to {pairs} pairs")
self.client.set_value("disc_truncate_pairs", pairs) self.client.set_value("disc_truncate_pairs", pairs)
self.client.click("btn_disc_truncate") self.client.click("btn_disc_truncate")
time.sleep(1) time.sleep(1)
def run_discussion_turn(self, user_message=None): def run_discussion_turn(self, user_message: str = None) -> dict | None:
self.run_discussion_turn_async(user_message) self.run_discussion_turn_async(user_message)
# Wait for AI # Wait for AI
return self.wait_for_ai_response() return self.wait_for_ai_response()
def run_discussion_turn_async(self, user_message=None): def run_discussion_turn_async(self, user_message: str = None) -> None:
if user_message is None: if user_message is None:
# Generate from AI history # Generate from AI history
session = self.client.get_session() session = self.client.get_session()
@@ -58,7 +58,7 @@ class WorkflowSimulator:
self.client.set_value("ai_input", user_message) self.client.set_value("ai_input", user_message)
self.client.click("btn_gen_send") self.client.click("btn_gen_send")
def wait_for_ai_response(self, timeout=60): def wait_for_ai_response(self, timeout: int = 60) -> dict | None:
print("Waiting for AI response...", end="", flush=True) print("Waiting for AI response...", end="", flush=True)
start_time = time.time() start_time = time.time()
last_count = len(self.client.get_session().get('session', {}).get('entries', [])) last_count = len(self.client.get_session().get('session', {}).get('entries', []))

View File

@@ -1,4 +1,5 @@
import pytest import pytest
from typing import Any
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import ai_client import ai_client
@@ -10,22 +11,22 @@ class MockUsage:
self.cached_content_token_count = 0 self.cached_content_token_count = 0
class MockPart: class MockPart:
def __init__(self, text, function_call): def __init__(self, text: Any, function_call: Any) -> None:
self.text = text self.text = text
self.function_call = function_call self.function_call = function_call
class MockContent: class MockContent:
def __init__(self, parts): def __init__(self, parts: Any) -> None:
self.parts = parts self.parts = parts
class MockCandidate: class MockCandidate:
def __init__(self, parts): def __init__(self, parts: Any) -> None:
self.content = MockContent(parts) self.content = MockContent(parts)
self.finish_reason = MagicMock() self.finish_reason = MagicMock()
self.finish_reason.name = "STOP" self.finish_reason.name = "STOP"
def test_ai_client_event_emitter_exists(): def test_ai_client_event_emitter_exists() -> None:
# This should fail initially because 'events' won't exist on ai_client # This should fail initially because 'events' won't exist on ai_client
assert hasattr(ai_client, 'events') assert hasattr(ai_client, 'events')
def test_event_emission() -> None: def test_event_emission() -> None:

View File

@@ -12,7 +12,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from api_hook_client import ApiHookClient from api_hook_client import ApiHookClient
def test_get_status_success(live_gui): def test_get_status_success(live_gui: tuple) -> None:
""" """
Test that get_status successfully retrieves the server status Test that get_status successfully retrieves the server status
when the live GUI is running. when the live GUI is running.
@@ -21,7 +21,7 @@ def test_get_status_success(live_gui):
status = client.get_status() status = client.get_status()
assert status == {'status': 'ok'} assert status == {'status': 'ok'}
def test_get_project_success(live_gui): def test_get_project_success(live_gui: tuple) -> None:
""" """
Test successful retrieval of project data from the live GUI. Test successful retrieval of project data from the live GUI.
""" """
@@ -30,7 +30,7 @@ def test_get_project_success(live_gui):
assert 'project' in response assert 'project' in response
# We don't assert specific content as it depends on the environment's active project # We don't assert specific content as it depends on the environment's active project
def test_get_session_success(live_gui): def test_get_session_success(live_gui: tuple) -> None:
""" """
Test successful retrieval of session data. Test successful retrieval of session data.
""" """
@@ -39,7 +39,7 @@ def test_get_session_success(live_gui):
assert 'session' in response assert 'session' in response
assert 'entries' in response['session'] assert 'entries' in response['session']
def test_post_gui_success(live_gui): def test_post_gui_success(live_gui: tuple) -> None:
""" """
Test successful posting of GUI data. Test successful posting of GUI data.
""" """
@@ -48,7 +48,7 @@ def test_post_gui_success(live_gui):
response = client.post_gui(gui_data) response = client.post_gui(gui_data)
assert response == {'status': 'queued'} assert response == {'status': 'queued'}
def test_get_performance_success(live_gui): def test_get_performance_success(live_gui: tuple) -> None:
""" """
Test successful retrieval of performance metrics. Test successful retrieval of performance metrics.
""" """

View File

@@ -1,6 +1,7 @@
import pytest import pytest
import sys import sys
import os import os
from typing import Any
# Ensure project root is in path for imports # Ensure project root is in path for imports
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
@@ -13,20 +14,20 @@ def test_api_client_has_extensions() -> None:
assert hasattr(client, 'select_tab') assert hasattr(client, 'select_tab')
assert hasattr(client, 'select_list_item') assert hasattr(client, 'select_list_item')
def test_select_tab_integration(live_gui): def test_select_tab_integration(live_gui: Any) -> None:
client = ApiHookClient() client = ApiHookClient()
# We'll need to make sure the tags exist in gui_legacy.py # We'll need to make sure the tags exist in gui_legacy.py
# For now, this is a placeholder for the integration test # For now, this is a placeholder for the integration test
response = client.select_tab("operations_tabs", "tab_tool") response = client.select_tab("operations_tabs", "tab_tool")
assert response == {'status': 'queued'} assert response == {'status': 'queued'}
def test_select_list_item_integration(live_gui): def test_select_list_item_integration(live_gui: Any) -> None:
client = ApiHookClient() client = ApiHookClient()
# Assuming 'Default' discussion exists or we can just test that it queues # Assuming 'Default' discussion exists or we can just test that it queues
response = client.select_list_item("disc_listbox", "Default") response = client.select_list_item("disc_listbox", "Default")
assert response == {'status': 'queued'} assert response == {'status': 'queued'}
def test_get_indicator_state_integration(live_gui): def test_get_indicator_state_integration(live_gui: Any) -> None:
client = ApiHookClient() client = ApiHookClient()
# thinking_indicator is usually hidden unless AI is running # thinking_indicator is usually hidden unless AI is running
response = client.get_indicator_state("thinking_indicator") response = client.get_indicator_state("thinking_indicator")

View File

@@ -1,17 +1,18 @@
import os import os
import pytest import pytest
from typing import Any
from datetime import datetime from datetime import datetime
from log_registry import LogRegistry from log_registry import LogRegistry
@pytest.fixture @pytest.fixture
def registry_setup(tmp_path): def registry_setup(tmp_path: Any) -> Any:
registry_path = tmp_path / "log_registry.toml" registry_path = tmp_path / "log_registry.toml"
logs_dir = tmp_path / "logs" logs_dir = tmp_path / "logs"
logs_dir.mkdir() logs_dir.mkdir()
registry = LogRegistry(str(registry_path)) registry = LogRegistry(str(registry_path))
return registry, logs_dir return registry, logs_dir
def test_auto_whitelist_keywords(registry_setup): def test_auto_whitelist_keywords(registry_setup: Any) -> None:
registry, logs_dir = registry_setup registry, logs_dir = registry_setup
session_id = "test_kw" session_id = "test_kw"
session_dir = logs_dir / session_id session_dir = logs_dir / session_id
@@ -24,7 +25,7 @@ def test_auto_whitelist_keywords(registry_setup):
assert registry.is_session_whitelisted(session_id) assert registry.is_session_whitelisted(session_id)
assert "ERROR" in registry.data[session_id]["metadata"]["reason"] assert "ERROR" in registry.data[session_id]["metadata"]["reason"]
def test_auto_whitelist_message_count(registry_setup): def test_auto_whitelist_message_count(registry_setup: Any) -> None:
registry, logs_dir = registry_setup registry, logs_dir = registry_setup
session_id = "test_msg_count" session_id = "test_msg_count"
session_dir = logs_dir / session_id session_dir = logs_dir / session_id
@@ -37,7 +38,7 @@ def test_auto_whitelist_message_count(registry_setup):
assert registry.is_session_whitelisted(session_id) assert registry.is_session_whitelisted(session_id)
assert registry.data[session_id]["metadata"]["message_count"] == 15 assert registry.data[session_id]["metadata"]["message_count"] == 15
def test_auto_whitelist_large_size(registry_setup): def test_auto_whitelist_large_size(registry_setup: Any) -> None:
registry, logs_dir = registry_setup registry, logs_dir = registry_setup
session_id = "test_large" session_id = "test_large"
session_dir = logs_dir / session_id session_dir = logs_dir / session_id
@@ -50,7 +51,7 @@ def test_auto_whitelist_large_size(registry_setup):
assert registry.is_session_whitelisted(session_id) assert registry.is_session_whitelisted(session_id)
assert "Large session size" in registry.data[session_id]["metadata"]["reason"] assert "Large session size" in registry.data[session_id]["metadata"]["reason"]
def test_no_auto_whitelist_insignificant(registry_setup): def test_no_auto_whitelist_insignificant(registry_setup: Any) -> None:
registry, logs_dir = registry_setup registry, logs_dir = registry_setup
session_id = "test_insignificant" session_id = "test_insignificant"
session_dir = logs_dir / session_id session_dir = logs_dir / session_id

View File

@@ -22,8 +22,7 @@ class TestCliToolBridge(unittest.TestCase):
@patch('sys.stdin', new_callable=io.StringIO) @patch('sys.stdin', new_callable=io.StringIO)
@patch('sys.stdout', new_callable=io.StringIO) @patch('sys.stdout', new_callable=io.StringIO)
@patch('api_hook_client.ApiHookClient.request_confirmation') @patch('api_hook_client.ApiHookClient.request_confirmation')
def test_allow_decision(self, mock_request, mock_stdout, mock_stdin): def test_allow_decision(self, mock_request: MagicMock, mock_stdout: MagicMock, mock_stdin: MagicMock) -> None:
# 1. Mock stdin with a JSON string tool call
mock_stdin.write(json.dumps(self.tool_call)) mock_stdin.write(json.dumps(self.tool_call))
mock_stdin.seek(0) mock_stdin.seek(0)
# 2. Mock ApiHookClient to return approved # 2. Mock ApiHookClient to return approved
@@ -37,8 +36,7 @@ class TestCliToolBridge(unittest.TestCase):
@patch('sys.stdin', new_callable=io.StringIO) @patch('sys.stdin', new_callable=io.StringIO)
@patch('sys.stdout', new_callable=io.StringIO) @patch('sys.stdout', new_callable=io.StringIO)
@patch('api_hook_client.ApiHookClient.request_confirmation') @patch('api_hook_client.ApiHookClient.request_confirmation')
def test_deny_decision(self, mock_request, mock_stdout, mock_stdin): def test_deny_decision(self, mock_request: MagicMock, mock_stdout: MagicMock, mock_stdin: MagicMock) -> None:
# Mock stdin
mock_stdin.write(json.dumps(self.tool_call)) mock_stdin.write(json.dumps(self.tool_call))
mock_stdin.seek(0) mock_stdin.seek(0)
# 4. Mock ApiHookClient to return denied # 4. Mock ApiHookClient to return denied
@@ -51,8 +49,7 @@ class TestCliToolBridge(unittest.TestCase):
@patch('sys.stdin', new_callable=io.StringIO) @patch('sys.stdin', new_callable=io.StringIO)
@patch('sys.stdout', new_callable=io.StringIO) @patch('sys.stdout', new_callable=io.StringIO)
@patch('api_hook_client.ApiHookClient.request_confirmation') @patch('api_hook_client.ApiHookClient.request_confirmation')
def test_unreachable_hook_server(self, mock_request, mock_stdout, mock_stdin): def test_unreachable_hook_server(self, mock_request: MagicMock, mock_stdout: MagicMock, mock_stdin: MagicMock) -> None:
# Mock stdin
mock_stdin.write(json.dumps(self.tool_call)) mock_stdin.write(json.dumps(self.tool_call))
mock_stdin.seek(0) mock_stdin.seek(0)
# 5. Test case where hook server is unreachable (exception) # 5. Test case where hook server is unreachable (exception)

View File

@@ -18,7 +18,7 @@ class TestCliToolBridgeMapping(unittest.TestCase):
@patch('sys.stdin', new_callable=io.StringIO) @patch('sys.stdin', new_callable=io.StringIO)
@patch('sys.stdout', new_callable=io.StringIO) @patch('sys.stdout', new_callable=io.StringIO)
@patch('api_hook_client.ApiHookClient.request_confirmation') @patch('api_hook_client.ApiHookClient.request_confirmation')
def test_mapping_from_api_format(self, mock_request, mock_stdout, mock_stdin): def test_mapping_from_api_format(self, mock_request: MagicMock, mock_stdout: MagicMock, mock_stdin: MagicMock) -> None:
""" """
Verify that bridge correctly maps 'id', 'name', 'input' (Gemini API format) Verify that bridge correctly maps 'id', 'name', 'input' (Gemini API format)
into tool_name and tool_input for the hook client. into tool_name and tool_input for the hook client.

View File

@@ -6,13 +6,14 @@ import time
import json import json
import requests import requests
import sys import sys
from typing import Any
# Ensure project root is in path # Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from api_hook_client import ApiHookClient from api_hook_client import ApiHookClient
def simulate_conductor_phase_completion(client: ApiHookClient): def simulate_conductor_phase_completion(client: ApiHookClient) -> dict[str, Any]:
""" """
Simulates the Conductor agent's logic for phase completion using ApiHookClient. Simulates the Conductor agent's logic for phase completion using ApiHookClient.
""" """
@@ -33,7 +34,7 @@ def simulate_conductor_phase_completion(client: ApiHookClient):
results["verification_message"] = f"Automated verification failed: {e}" results["verification_message"] = f"Automated verification failed: {e}"
return results return results
def test_conductor_integrates_api_hook_client_for_verification(live_gui): def test_conductor_integrates_api_hook_client_for_verification(live_gui: Any) -> None:
""" """
Verify that Conductor's simulated phase completion logic properly integrates Verify that Conductor's simulated phase completion logic properly integrates
and uses the ApiHookClient for verification against the live GUI. and uses the ApiHookClient for verification against the live GUI.
@@ -43,7 +44,7 @@ def test_conductor_integrates_api_hook_client_for_verification(live_gui):
assert results["verification_successful"] is True assert results["verification_successful"] is True
assert "successfully" in results["verification_message"] assert "successfully" in results["verification_message"]
def test_conductor_handles_api_hook_failure(live_gui): def test_conductor_handles_api_hook_failure(live_gui: Any) -> None:
""" """
Verify Conductor handles a simulated API hook verification failure. Verify Conductor handles a simulated API hook verification failure.
We patch the client's get_status to simulate failure even with live GUI. We patch the client's get_status to simulate failure even with live GUI.

View File

@@ -16,7 +16,7 @@ def test_conductor_engine_initialization() -> None:
assert engine.track == track assert engine.track == track
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_conductor_engine_run_linear_executes_tickets_in_order(monkeypatch): async def test_conductor_engine_run_linear_executes_tickets_in_order(monkeypatch: pytest.MonkeyPatch) -> None:
""" """
Test that run_linear iterates through executable tickets and calls the worker lifecycle. Test that run_linear iterates through executable tickets and calls the worker lifecycle.
""" """
@@ -48,7 +48,7 @@ async def test_conductor_engine_run_linear_executes_tickets_in_order(monkeypatch
assert calls[1][0][0].id == "T2" assert calls[1][0][0].id == "T2"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_run_worker_lifecycle_calls_ai_client_send(monkeypatch): async def test_run_worker_lifecycle_calls_ai_client_send(monkeypatch: pytest.MonkeyPatch) -> None:
""" """
Test that run_worker_lifecycle triggers the AI client and updates ticket status on success. Test that run_worker_lifecycle triggers the AI client and updates ticket status on success.
""" """
@@ -69,7 +69,7 @@ async def test_run_worker_lifecycle_calls_ai_client_send(monkeypatch):
assert ticket.description in kwargs["user_message"] assert ticket.description in kwargs["user_message"]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_run_worker_lifecycle_context_injection(monkeypatch): async def test_run_worker_lifecycle_context_injection(monkeypatch: pytest.MonkeyPatch) -> None:
""" """
Test that run_worker_lifecycle can take a context_files list and injects AST views into the prompt. Test that run_worker_lifecycle can take a context_files list and injects AST views into the prompt.
""" """
@@ -115,7 +115,7 @@ async def test_run_worker_lifecycle_context_injection(monkeypatch):
assert "secondary.py" in user_message assert "secondary.py" in user_message
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_run_worker_lifecycle_handles_blocked_response(monkeypatch): async def test_run_worker_lifecycle_handles_blocked_response(monkeypatch: pytest.MonkeyPatch) -> None:
""" """
Test that run_worker_lifecycle marks the ticket as blocked if the AI indicates it cannot proceed. Test that run_worker_lifecycle marks the ticket as blocked if the AI indicates it cannot proceed.
""" """
@@ -132,7 +132,7 @@ async def test_run_worker_lifecycle_handles_blocked_response(monkeypatch):
assert "BLOCKED" in ticket.blocked_reason assert "BLOCKED" in ticket.blocked_reason
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_run_worker_lifecycle_step_mode_confirmation(monkeypatch): async def test_run_worker_lifecycle_step_mode_confirmation(monkeypatch: pytest.MonkeyPatch) -> None:
""" """
Test that run_worker_lifecycle passes confirm_execution to ai_client.send when step_mode is True. Test that run_worker_lifecycle passes confirm_execution to ai_client.send when step_mode is True.
Verify that if confirm_execution is called (simulated by mocking ai_client.send to call its callback), Verify that if confirm_execution is called (simulated by mocking ai_client.send to call its callback),
@@ -162,7 +162,7 @@ async def test_run_worker_lifecycle_step_mode_confirmation(monkeypatch):
assert ticket.status == "completed" assert ticket.status == "completed"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_run_worker_lifecycle_step_mode_rejection(monkeypatch): async def test_run_worker_lifecycle_step_mode_rejection(monkeypatch: pytest.MonkeyPatch) -> None:
""" """
Verify that if confirm_execution returns False, the logic (in ai_client, which we simulate here) Verify that if confirm_execution returns False, the logic (in ai_client, which we simulate here)
would prevent execution. In run_worker_lifecycle, we just check if it's passed. would prevent execution. In run_worker_lifecycle, we just check if it's passed.
@@ -184,7 +184,7 @@ async def test_run_worker_lifecycle_step_mode_rejection(monkeypatch):
# here we just verify the wiring. # here we just verify the wiring.
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_conductor_engine_dynamic_parsing_and_execution(monkeypatch): async def test_conductor_engine_dynamic_parsing_and_execution(monkeypatch: pytest.MonkeyPatch) -> None:
""" """
Test that parse_json_tickets correctly populates the track and run_linear executes them in dependency order. Test that parse_json_tickets correctly populates the track and run_linear executes them in dependency order.
""" """

View File

@@ -1,4 +1,5 @@
import unittest import unittest
from typing import Any
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
import json import json
import conductor_tech_lead import conductor_tech_lead
@@ -7,8 +8,7 @@ class TestConductorTechLead(unittest.TestCase):
@patch('ai_client.send') @patch('ai_client.send')
@patch('ai_client.set_provider') @patch('ai_client.set_provider')
@patch('ai_client.reset_session') @patch('ai_client.reset_session')
def test_generate_tickets_success(self, mock_reset_session, mock_set_provider, mock_send): def test_generate_tickets_success(self, mock_reset_session: Any, mock_set_provider: Any, mock_send: Any) -> None:
# Setup mock response
mock_tickets = [ mock_tickets = [
{ {
"id": "ticket_1", "id": "ticket_1",
@@ -39,7 +39,7 @@ class TestConductorTechLead(unittest.TestCase):
@patch('ai_client.send') @patch('ai_client.send')
@patch('ai_client.set_provider') @patch('ai_client.set_provider')
@patch('ai_client.reset_session') @patch('ai_client.reset_session')
def test_generate_tickets_parse_error(self, mock_reset_session, mock_set_provider, mock_send): def test_generate_tickets_parse_error(self, mock_reset_session: Any, mock_set_provider: Any, mock_send: Any) -> None:
# Setup mock invalid response # Setup mock invalid response
mock_send.return_value = "Invalid JSON" mock_send.return_value = "Invalid JSON"
# Call the function # Call the function
@@ -63,7 +63,7 @@ class TestTopologicalSort(unittest.TestCase):
ids = [t["id"] for t in sorted_tickets] ids = [t["id"] for t in sorted_tickets]
self.assertEqual(ids, ["t1", "t2", "t3"]) self.assertEqual(ids, ["t1", "t2", "t3"])
def test_topological_sort_complex(self): def test_topological_sort_complex(self) -> None:
# t1 # t1
# | \ # | \
# t2 t3 # t2 t3
@@ -91,7 +91,7 @@ class TestTopologicalSort(unittest.TestCase):
conductor_tech_lead.topological_sort(tickets) conductor_tech_lead.topological_sort(tickets)
self.assertIn("Circular dependency detected", str(cm.exception)) self.assertIn("Circular dependency detected", str(cm.exception))
def test_topological_sort_missing_dependency(self): def test_topological_sort_missing_dependency(self) -> None:
# If a ticket depends on something not in the list, we should probably handle it or let it fail. # If a ticket depends on something not in the list, we should probably handle it or let it fail.
# Usually in our context, we only care about dependencies within the same track. # Usually in our context, we only care about dependencies within the same track.
tickets = [ tickets = [

View File

@@ -1,3 +1,4 @@
from typing import Any
import pytest import pytest
import os import os
import tomllib import tomllib
@@ -11,7 +12,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import ai_client import ai_client
import project_manager import project_manager
def test_credentials_error_mentions_deepseek(monkeypatch): def test_credentials_error_mentions_deepseek(monkeypatch: pytest.MonkeyPatch) -> None:
""" """
Verify that the error message shown when credentials.toml is missing Verify that the error message shown when credentials.toml is missing
includes deepseek instructions. includes deepseek instructions.
@@ -48,7 +49,7 @@ def test_deepseek_model_listing() -> None:
assert "deepseek-chat" in models assert "deepseek-chat" in models
assert "deepseek-reasoner" in models assert "deepseek-reasoner" in models
def test_gui_provider_list_via_hooks(live_gui): def test_gui_provider_list_via_hooks(live_gui: Any) -> None:
""" """
Verify 'deepseek' is present in the GUI provider list using API hooks. Verify 'deepseek' is present in the GUI provider list using API hooks.
""" """

View File

@@ -1,4 +1,5 @@
import pytest import pytest
from typing import Any
import time import time
import sys import sys
import os import os
@@ -13,7 +14,7 @@ from simulation.sim_tools import ToolsSimulation
from simulation.sim_execution import ExecutionSimulation from simulation.sim_execution import ExecutionSimulation
@pytest.mark.integration @pytest.mark.integration
def test_context_sim_live(live_gui): def test_context_sim_live(live_gui: Any) -> None:
"""Run the Context & Chat simulation against a live GUI.""" """Run the Context & Chat simulation against a live GUI."""
client = ApiHookClient() client = ApiHookClient()
assert client.wait_for_server(timeout=10) assert client.wait_for_server(timeout=10)
@@ -23,7 +24,7 @@ def test_context_sim_live(live_gui):
sim.teardown() sim.teardown()
@pytest.mark.integration @pytest.mark.integration
def test_ai_settings_sim_live(live_gui): def test_ai_settings_sim_live(live_gui: Any) -> None:
"""Run the AI Settings simulation against a live GUI.""" """Run the AI Settings simulation against a live GUI."""
client = ApiHookClient() client = ApiHookClient()
assert client.wait_for_server(timeout=10) assert client.wait_for_server(timeout=10)
@@ -33,7 +34,7 @@ def test_ai_settings_sim_live(live_gui):
sim.teardown() sim.teardown()
@pytest.mark.integration @pytest.mark.integration
def test_tools_sim_live(live_gui): def test_tools_sim_live(live_gui: Any) -> None:
"""Run the Tools & Search simulation against a live GUI.""" """Run the Tools & Search simulation against a live GUI."""
client = ApiHookClient() client = ApiHookClient()
assert client.wait_for_server(timeout=10) assert client.wait_for_server(timeout=10)
@@ -43,7 +44,7 @@ def test_tools_sim_live(live_gui):
sim.teardown() sim.teardown()
@pytest.mark.integration @pytest.mark.integration
def test_execution_sim_live(live_gui): def test_execution_sim_live(live_gui: Any) -> None:
"""Run the Execution & Modals simulation against a live GUI.""" """Run the Execution & Modals simulation against a live GUI."""
client = ApiHookClient() client = ApiHookClient()
assert client.wait_for_server(timeout=10) assert client.wait_for_server(timeout=10)

View File

@@ -1,4 +1,5 @@
import unittest import unittest
from typing import Any
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
import json import json
import subprocess import subprocess
@@ -16,7 +17,7 @@ class TestGeminiCliAdapter(unittest.TestCase):
self.adapter = GeminiCliAdapter(binary_path="gemini") self.adapter = GeminiCliAdapter(binary_path="gemini")
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_send_starts_subprocess_with_correct_args(self, mock_popen): def test_send_starts_subprocess_with_correct_args(self, mock_popen: Any) -> None:
""" """
Verify that send(message) correctly starts the subprocess with Verify that send(message) correctly starts the subprocess with
--output-format stream-json and the provided message via stdin using communicate. --output-format stream-json and the provided message via stdin using communicate.
@@ -48,7 +49,7 @@ class TestGeminiCliAdapter(unittest.TestCase):
self.assertEqual(kwargs.get('text'), True) self.assertEqual(kwargs.get('text'), True)
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_send_parses_jsonl_output(self, mock_popen): def test_send_parses_jsonl_output(self, mock_popen: Any) -> None:
""" """
Verify that it correctly parses multiple JSONL 'message' events Verify that it correctly parses multiple JSONL 'message' events
and returns the combined text. and returns the combined text.
@@ -69,7 +70,7 @@ class TestGeminiCliAdapter(unittest.TestCase):
self.assertEqual(result["tool_calls"], []) self.assertEqual(result["tool_calls"], [])
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_send_handles_tool_use_events(self, mock_popen): def test_send_handles_tool_use_events(self, mock_popen: Any) -> None:
""" """
Verify that it correctly handles 'tool_use' events in the stream Verify that it correctly handles 'tool_use' events in the stream
by continuing to read until the final 'result' event. by continuing to read until the final 'result' event.
@@ -93,7 +94,7 @@ class TestGeminiCliAdapter(unittest.TestCase):
self.assertEqual(result["tool_calls"][0]["name"], "read_file") self.assertEqual(result["tool_calls"][0]["name"], "read_file")
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_send_captures_usage_metadata(self, mock_popen): def test_send_captures_usage_metadata(self, mock_popen: Any) -> None:
""" """
Verify that usage data is extracted from the 'result' event. Verify that usage data is extracted from the 'result' event.
""" """

View File

@@ -29,7 +29,7 @@ class TestGeminiCliAdapterParity(unittest.TestCase):
self.session_logger_patcher.stop() self.session_logger_patcher.stop()
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_count_tokens_uses_estimation(self, mock_popen): def test_count_tokens_uses_estimation(self, mock_popen: MagicMock) -> None:
""" """
Test that count_tokens uses character-based estimation. Test that count_tokens uses character-based estimation.
""" """
@@ -42,7 +42,7 @@ class TestGeminiCliAdapterParity(unittest.TestCase):
mock_popen.assert_not_called() mock_popen.assert_not_called()
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_send_with_safety_settings_no_flags_added(self, mock_popen): def test_send_with_safety_settings_no_flags_added(self, mock_popen: MagicMock) -> None:
""" """
Test that the send method does NOT add --safety flags when safety_settings are provided, Test that the send method does NOT add --safety flags when safety_settings are provided,
as this functionality is no longer supported via CLI flags. as this functionality is no longer supported via CLI flags.
@@ -66,7 +66,7 @@ class TestGeminiCliAdapterParity(unittest.TestCase):
process_mock.communicate.assert_called_once_with(input=message_content) process_mock.communicate.assert_called_once_with(input=message_content)
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_send_without_safety_settings_no_flags(self, mock_popen): def test_send_without_safety_settings_no_flags(self, mock_popen: MagicMock) -> None:
""" """
Test that when safety_settings is None or an empty list, no --safety flags are added. Test that when safety_settings is None or an empty list, no --safety flags are added.
""" """
@@ -85,7 +85,7 @@ class TestGeminiCliAdapterParity(unittest.TestCase):
self.assertNotIn("--safety", args_empty[0]) self.assertNotIn("--safety", args_empty[0])
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_send_with_system_instruction_prepended_to_stdin(self, mock_popen): def test_send_with_system_instruction_prepended_to_stdin(self, mock_popen: MagicMock) -> None:
""" """
Test that the send method prepends the system instruction to the prompt Test that the send method prepends the system instruction to the prompt
sent via stdin, and does NOT add a --system flag to the command. sent via stdin, and does NOT add a --system flag to the command.
@@ -107,7 +107,7 @@ class TestGeminiCliAdapterParity(unittest.TestCase):
self.assertNotIn("--system", command) self.assertNotIn("--system", command)
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_send_with_model_parameter(self, mock_popen): def test_send_with_model_parameter(self, mock_popen: MagicMock) -> None:
""" """
Test that the send method correctly adds the -m <model> flag when a model is specified. Test that the send method correctly adds the -m <model> flag when a model is specified.
""" """
@@ -128,7 +128,7 @@ class TestGeminiCliAdapterParity(unittest.TestCase):
process_mock.communicate.assert_called_once_with(input=message_content) process_mock.communicate.assert_called_once_with(input=message_content)
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_send_kills_process_on_communicate_exception(self, mock_popen): def test_send_kills_process_on_communicate_exception(self, mock_popen: MagicMock) -> None:
""" """
Test that if subprocess.Popen().communicate() raises an exception, Test that if subprocess.Popen().communicate() raises an exception,
GeminiCliAdapter.send() kills the process and re-raises the exception. GeminiCliAdapter.send() kills the process and re-raises the exception.

View File

@@ -4,9 +4,10 @@ import os
import sys import sys
import requests import requests
import json import json
from typing import Any
from api_hook_client import ApiHookClient from api_hook_client import ApiHookClient
def test_gemini_cli_context_bleed_prevention(live_gui): def test_gemini_cli_context_bleed_prevention(live_gui: Any) -> None:
""" """
Test that the GeminiCliAdapter correctly filters out echoed 'user' messages Test that the GeminiCliAdapter correctly filters out echoed 'user' messages
and only shows assistant content in the GUI history. and only shows assistant content in the GUI history.
@@ -39,7 +40,7 @@ print(json.dumps({"type": "result", "stats": {"total_tokens": 10}}), flush=True)
assert "echoing you" not in ai_entries[0].get("content") assert "echoing you" not in ai_entries[0].get("content")
os.remove(bleed_mock) os.remove(bleed_mock)
def test_gemini_cli_parameter_resilience(live_gui): def test_gemini_cli_parameter_resilience(live_gui: Any) -> None:
""" """
Test that mcp_client correctly handles 'file_path' and 'dir_path' aliases Test that mcp_client correctly handles 'file_path' and 'dir_path' aliases
sent by the AI instead of 'path'. sent by the AI instead of 'path'.
@@ -106,7 +107,7 @@ else:
assert found, "Tool result indicating success not found in history" assert found, "Tool result indicating success not found in history"
os.remove(alias_mock) os.remove(alias_mock)
def test_gemini_cli_loop_termination(live_gui): def test_gemini_cli_loop_termination(live_gui: Any) -> None:
""" """
Test that multi-round tool calling correctly terminates and preserves Test that multi-round tool calling correctly terminates and preserves
payload (session context) between rounds. payload (session context) between rounds.

View File

@@ -1,3 +1,4 @@
from typing import Any
import pytest import pytest
import time import time
import os import os
@@ -5,7 +6,7 @@ import sys
import requests import requests
from api_hook_client import ApiHookClient from api_hook_client import ApiHookClient
def test_gemini_cli_full_integration(live_gui): def test_gemini_cli_full_integration(live_gui: Any) -> None:
""" """
Integration test for the Gemini CLI provider and tool bridge. Integration test for the Gemini CLI provider and tool bridge.
Handles 'ask_received' events from the bridge and any other approval requests. Handles 'ask_received' events from the bridge and any other approval requests.
@@ -70,7 +71,7 @@ def test_gemini_cli_full_integration(live_gui):
assert approved_count > 0, "No approval events were processed" assert approved_count > 0, "No approval events were processed"
assert found_final, "Final message from mock CLI was not found in the GUI history" assert found_final, "Final message from mock CLI was not found in the GUI history"
def test_gemini_cli_rejection_and_history(live_gui): def test_gemini_cli_rejection_and_history(live_gui: Any) -> None:
""" """
Integration test for the Gemini CLI provider: Rejection flow and history. Integration test for the Gemini CLI provider: Rejection flow and history.
""" """

View File

@@ -2,6 +2,7 @@ import pytest
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
import sys import sys
import os import os
from typing import Any
# Add project root to sys.path # Add project root to sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
@@ -19,7 +20,7 @@ def setup_ai_client() -> None:
@patch('ai_client.GeminiCliAdapter') @patch('ai_client.GeminiCliAdapter')
@patch('ai_client._get_combined_system_prompt') @patch('ai_client._get_combined_system_prompt')
def test_send_invokes_adapter_send(mock_prompt, mock_adapter_class): def test_send_invokes_adapter_send(mock_prompt: Any, mock_adapter_class: Any) -> None:
mock_prompt.return_value = "Mocked Prompt" mock_prompt.return_value = "Mocked Prompt"
mock_instance = mock_adapter_class.return_value mock_instance = mock_adapter_class.return_value
mock_instance.send.return_value = {"text": "Done", "tool_calls": []} mock_instance.send.return_value = {"text": "Done", "tool_calls": []}
@@ -34,7 +35,7 @@ def test_send_invokes_adapter_send(mock_prompt, mock_adapter_class):
assert kwargs['system_instruction'] == "Mocked Prompt\n\n<context>\ncontext\n</context>" assert kwargs['system_instruction'] == "Mocked Prompt\n\n<context>\ncontext\n</context>"
@patch('ai_client.GeminiCliAdapter') @patch('ai_client.GeminiCliAdapter')
def test_get_history_bleed_stats(mock_adapter_class): def test_get_history_bleed_stats(mock_adapter_class: Any) -> None:
mock_instance = mock_adapter_class.return_value mock_instance = mock_adapter_class.return_value
mock_instance.send.return_value = {"text": "txt", "tool_calls": []} mock_instance.send.return_value = {"text": "txt", "tool_calls": []}
mock_instance.last_usage = {"input_tokens": 1500} mock_instance.last_usage = {"input_tokens": 1500}

View File

@@ -1,9 +1,10 @@
from typing import Generator
import pytest import pytest
from unittest.mock import patch from unittest.mock import patch
from gui_2 import App from gui_2 import App
@pytest.fixture @pytest.fixture
def app_instance() -> None: def app_instance() -> Generator[App, None, None]:
with ( with (
patch('gui_2.load_config', return_value={'gui': {'show_windows': {}}}), patch('gui_2.load_config', return_value={'gui': {'show_windows': {}}}),
patch('gui_2.save_config'), patch('gui_2.save_config'),
@@ -17,7 +18,7 @@ def app_instance() -> None:
): ):
yield App() yield App()
def test_gui2_hubs_exist_in_show_windows(app_instance): def test_gui2_hubs_exist_in_show_windows(app_instance: App) -> None:
""" """
Verifies that the new consolidated Hub windows are defined in the App's show_windows. Verifies that the new consolidated Hub windows are defined in the App's show_windows.
This ensures they will be available in the 'Windows' menu. This ensures they will be available in the 'Windows' menu.
@@ -33,7 +34,7 @@ def test_gui2_hubs_exist_in_show_windows(app_instance):
for hub in expected_hubs: for hub in expected_hubs:
assert hub in app_instance.show_windows, f"Expected hub window '{hub}' not found in show_windows" assert hub in app_instance.show_windows, f"Expected hub window '{hub}' not found in show_windows"
def test_gui2_old_windows_removed_from_show_windows(app_instance): def test_gui2_old_windows_removed_from_show_windows(app_instance: App) -> None:
""" """
Verifies that the old fragmented windows are removed from show_windows. Verifies that the old fragmented windows are removed from show_windows.
""" """

View File

@@ -1,4 +1,5 @@
import pytest import pytest
from typing import Any
import time import time
import json import json
import os import os
@@ -22,7 +23,7 @@ def cleanup_callback_file() -> None:
if TEST_CALLBACK_FILE.exists(): if TEST_CALLBACK_FILE.exists():
TEST_CALLBACK_FILE.unlink() TEST_CALLBACK_FILE.unlink()
def test_gui2_set_value_hook_works(live_gui): def test_gui2_set_value_hook_works(live_gui: Any) -> None:
""" """
Tests that the 'set_value' GUI hook is correctly implemented. Tests that the 'set_value' GUI hook is correctly implemented.
""" """
@@ -37,7 +38,7 @@ def test_gui2_set_value_hook_works(live_gui):
current_value = client.get_value('ai_input') current_value = client.get_value('ai_input')
assert current_value == test_value assert current_value == test_value
def test_gui2_click_hook_works(live_gui): def test_gui2_click_hook_works(live_gui: Any) -> None:
""" """
Tests that the 'click' GUI hook for the 'Reset' button is implemented. Tests that the 'click' GUI hook for the 'Reset' button is implemented.
""" """
@@ -54,7 +55,7 @@ def test_gui2_click_hook_works(live_gui):
# Verify it was reset # Verify it was reset
assert client.get_value('ai_input') == "" assert client.get_value('ai_input') == ""
def test_gui2_custom_callback_hook_works(live_gui): def test_gui2_custom_callback_hook_works(live_gui: Any) -> None:
""" """
Tests that the 'custom_callback' GUI hook is correctly implemented. Tests that the 'custom_callback' GUI hook is correctly implemented.
""" """

View File

@@ -2,6 +2,7 @@ import pytest
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
import importlib.util import importlib.util
import sys import sys
from typing import Any
import dearpygui.dearpygui as dpg import dearpygui.dearpygui as dpg
# Load gui.py as a module for testing # Load gui.py as a module for testing
@@ -29,14 +30,13 @@ def app_instance() -> None:
yield app yield app
dpg.destroy_context() dpg.destroy_context()
def test_diagnostics_panel_initialization(app_instance): def test_diagnostics_panel_initialization(app_instance: Any) -> None:
assert "Diagnostics" in app_instance.window_info assert "Diagnostics" in app_instance.window_info
assert app_instance.window_info["Diagnostics"] == "win_diagnostics" assert app_instance.window_info["Diagnostics"] == "win_diagnostics"
assert "frame_time" in app_instance.perf_history assert "frame_time" in app_instance.perf_history
assert len(app_instance.perf_history["frame_time"]) == 100 assert len(app_instance.perf_history["frame_time"]) == 100
def test_diagnostics_panel_updates(app_instance): def test_diagnostics_panel_updates(app_instance: Any) -> None:
# Mock dependencies
mock_metrics = { mock_metrics = {
'last_frame_time_ms': 10.0, 'last_frame_time_ms': 10.0,
'fps': 100.0, 'fps': 100.0,

View File

@@ -3,6 +3,7 @@ from unittest.mock import patch, MagicMock
import importlib.util import importlib.util
import sys import sys
import os import os
from typing import Any
import dearpygui.dearpygui as dpg import dearpygui.dearpygui as dpg
# Ensure project root is in path for imports # Ensure project root is in path for imports
@@ -40,7 +41,7 @@ def app_instance() -> None:
yield app yield app
dpg.destroy_context() dpg.destroy_context()
def test_telemetry_panel_updates_correctly(app_instance): def test_telemetry_panel_updates_correctly(app_instance: Any) -> None:
""" """
Tests that the _update_performance_diagnostics method correctly updates Tests that the _update_performance_diagnostics method correctly updates
DPG widgets based on the stats from ai_client. DPG widgets based on the stats from ai_client.
@@ -71,7 +72,7 @@ def test_telemetry_panel_updates_correctly(app_instance):
# Assert Gemini-specific widget was hidden # Assert Gemini-specific widget was hidden
mock_configure_item.assert_any_call("gemini_cache_label", show=False) mock_configure_item.assert_any_call("gemini_cache_label", show=False)
def test_cache_data_display_updates_correctly(app_instance): def test_cache_data_display_updates_correctly(app_instance: Any) -> None:
""" """
Tests that the _update_performance_diagnostics method correctly updates the Tests that the _update_performance_diagnostics method correctly updates the
GUI with Gemini cache statistics when the provider is set to Gemini. GUI with Gemini cache statistics when the provider is set to Gemini.

View File

@@ -8,8 +8,7 @@ from pathlib import Path
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
class TestHeadlessAPI(unittest.TestCase): class TestHeadlessAPI(unittest.TestCase):
def setUp(self): def setUp(self) -> None:
# We need an App instance to initialize the API, but we want to avoid GUI stuff
with patch('gui_2.session_logger.open_session'), \ with patch('gui_2.session_logger.open_session'), \
patch('gui_2.ai_client.set_provider'), \ patch('gui_2.ai_client.set_provider'), \
patch('gui_2.session_logger.close_session'): patch('gui_2.session_logger.close_session'):
@@ -29,14 +28,12 @@ class TestHeadlessAPI(unittest.TestCase):
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), {"status": "ok"}) self.assertEqual(response.json(), {"status": "ok"})
def test_status_endpoint_unauthorized(self): def test_status_endpoint_unauthorized(self) -> None:
# Ensure a key is required
with patch.dict(self.app_instance.config, {"headless": {"api_key": "some-required-key"}}): with patch.dict(self.app_instance.config, {"headless": {"api_key": "some-required-key"}}):
response = self.client.get("/status") response = self.client.get("/status")
self.assertEqual(response.status_code, 403) self.assertEqual(response.status_code, 403)
def test_status_endpoint_authorized(self): def test_status_endpoint_authorized(self) -> None:
# We'll use a test key
headers = {"X-API-KEY": "test-secret-key"} headers = {"X-API-KEY": "test-secret-key"}
with patch.dict(self.app_instance.config, {"headless": {"api_key": "test-secret-key"}}): with patch.dict(self.app_instance.config, {"headless": {"api_key": "test-secret-key"}}):
response = self.client.get("/status", headers=headers) response = self.client.get("/status", headers=headers)
@@ -63,8 +60,7 @@ class TestHeadlessAPI(unittest.TestCase):
self.assertIn("metadata", data) self.assertIn("metadata", data)
self.assertEqual(data["usage"]["input_tokens"], 10) self.assertEqual(data["usage"]["input_tokens"], 10)
def test_pending_actions_endpoint(self): def test_pending_actions_endpoint(self) -> None:
# Manually add a pending action
with patch('gui_2.uuid.uuid4', return_value="test-action-id"): with patch('gui_2.uuid.uuid4', return_value="test-action-id"):
dialog = gui_2.ConfirmDialog("dir", ".") dialog = gui_2.ConfirmDialog("dir", ".")
self.app_instance._pending_actions[dialog._uid] = dialog self.app_instance._pending_actions[dialog._uid] = dialog
@@ -74,8 +70,7 @@ class TestHeadlessAPI(unittest.TestCase):
self.assertEqual(len(data), 1) self.assertEqual(len(data), 1)
self.assertEqual(data[0]["action_id"], "test-action-id") self.assertEqual(data[0]["action_id"], "test-action-id")
def test_confirm_action_endpoint(self): def test_confirm_action_endpoint(self) -> None:
# Manually add a pending action
with patch('gui_2.uuid.uuid4', return_value="test-confirm-id"): with patch('gui_2.uuid.uuid4', return_value="test-confirm-id"):
dialog = gui_2.ConfirmDialog("dir", ".") dialog = gui_2.ConfirmDialog("dir", ".")
self.app_instance._pending_actions[dialog._uid] = dialog self.app_instance._pending_actions[dialog._uid] = dialog
@@ -85,8 +80,7 @@ class TestHeadlessAPI(unittest.TestCase):
self.assertTrue(dialog._done) self.assertTrue(dialog._done)
self.assertTrue(dialog._approved) self.assertTrue(dialog._approved)
def test_list_sessions_endpoint(self): def test_list_sessions_endpoint(self) -> None:
# Ensure logs directory exists
Path("logs").mkdir(exist_ok=True) Path("logs").mkdir(exist_ok=True)
# Create a dummy log # Create a dummy log
dummy_log = Path("logs/test_session_api.log") dummy_log = Path("logs/test_session_api.log")
@@ -108,8 +102,7 @@ class TestHeadlessAPI(unittest.TestCase):
self.assertIn("screenshots", data) self.assertIn("screenshots", data)
self.assertIn("files_base_dir", data) self.assertIn("files_base_dir", data)
def test_endpoint_no_api_key_configured(self): def test_endpoint_no_api_key_configured(self) -> None:
# Test the security fix specifically
with patch.dict(self.app_instance.config, {"headless": {"api_key": ""}}): with patch.dict(self.app_instance.config, {"headless": {"api_key": ""}}):
response = self.client.get("/status", headers=self.headers) response = self.client.get("/status", headers=self.headers)
self.assertEqual(response.status_code, 403) self.assertEqual(response.status_code, 403)
@@ -122,8 +115,7 @@ class TestHeadlessStartup(unittest.TestCase):
@patch('gui_2.save_config') @patch('gui_2.save_config')
@patch('gui_2.ai_client.cleanup') @patch('gui_2.ai_client.cleanup')
@patch('uvicorn.run') # Mock uvicorn.run to prevent hanging @patch('uvicorn.run') # Mock uvicorn.run to prevent hanging
def test_headless_flag_prevents_gui_run(self, mock_uvicorn_run, mock_cleanup, mock_save_config, mock_hook_server, mock_immapp_run): def test_headless_flag_prevents_gui_run(self, mock_uvicorn_run: MagicMock, mock_cleanup: MagicMock, mock_save_config: MagicMock, mock_hook_server: MagicMock, mock_immapp_run: MagicMock) -> None:
# Setup mock argv with --headless
test_args = ["gui_2.py", "--headless"] test_args = ["gui_2.py", "--headless"]
with patch.object(sys, 'argv', test_args): with patch.object(sys, 'argv', test_args):
with patch('gui_2.session_logger.close_session'), \ with patch('gui_2.session_logger.close_session'), \
@@ -138,7 +130,7 @@ class TestHeadlessStartup(unittest.TestCase):
mock_uvicorn_run.assert_called_once() mock_uvicorn_run.assert_called_once()
@patch('gui_2.immapp.run') @patch('gui_2.immapp.run')
def test_normal_startup_calls_gui_run(self, mock_immapp_run): def test_normal_startup_calls_gui_run(self, mock_immapp_run: MagicMock) -> None:
test_args = ["gui_2.py"] test_args = ["gui_2.py"]
with patch.object(sys, 'argv', test_args): with patch.object(sys, 'argv', test_args):
# In normal mode, it should still call immapp.run # In normal mode, it should still call immapp.run

View File

@@ -17,7 +17,7 @@ import ai_client
# --- Tests for Aggregate Module --- # --- Tests for Aggregate Module ---
def test_aggregate_includes_segregated_history(tmp_path): def test_aggregate_includes_segregated_history(tmp_path: Path) -> None:
""" """
Tests if the aggregate function correctly includes history Tests if the aggregate function correctly includes history
when it's segregated into a separate file. when it's segregated into a separate file.
@@ -38,7 +38,7 @@ def test_aggregate_includes_segregated_history(tmp_path):
assert "Show me history" in markdown assert "Show me history" in markdown
# --- Tests for MCP Client and Blacklisting --- # --- Tests for MCP Client and Blacklisting ---
def test_mcp_blacklist(tmp_path): def test_mcp_blacklist(tmp_path: Path) -> None:
""" """
Tests that the MCP client correctly blacklists specified files Tests that the MCP client correctly blacklists specified files
and prevents listing them. and prevents listing them.
@@ -57,7 +57,7 @@ def test_mcp_blacklist(tmp_path):
# The blacklisted file should not appear in the directory listing # The blacklisted file should not appear in the directory listing
assert "my_project_history.toml" not in result assert "my_project_history.toml" not in result
def test_aggregate_blacklist(tmp_path): def test_aggregate_blacklist(tmp_path: Path) -> None:
""" """
Tests that aggregate's path resolution respects blacklisting, Tests that aggregate's path resolution respects blacklisting,
ensuring history files are not included by default. ensuring history files are not included by default.
@@ -73,7 +73,7 @@ def test_aggregate_blacklist(tmp_path):
assert hist_file not in paths, "History file should be excluded even with a general glob" assert hist_file not in paths, "History file should be excluded even with a general glob"
# --- Tests for History Migration and Separation --- # --- Tests for History Migration and Separation ---
def test_migration_on_load(tmp_path): def test_migration_on_load(tmp_path: Path) -> None:
""" """
Tests that project loading migrates discussion history from manual_slop.toml Tests that project loading migrates discussion history from manual_slop.toml
to manual_slop_history.toml if it exists in the main config. to manual_slop_history.toml if it exists in the main config.
@@ -102,7 +102,7 @@ def test_migration_on_load(tmp_path):
on_disk_hist = tomllib.load(f) on_disk_hist = tomllib.load(f)
assert on_disk_hist["discussions"]["main"]["history"] == ["Hello", "World"] assert on_disk_hist["discussions"]["main"]["history"] == ["Hello", "World"]
def test_save_separation(tmp_path): def test_save_separation(tmp_path: Path) -> None:
""" """
Tests that saving project data correctly separates discussion history Tests that saving project data correctly separates discussion history
into manual_slop_history.toml. into manual_slop_history.toml.
@@ -128,7 +128,7 @@ def test_save_separation(tmp_path):
assert h_disk["discussions"]["main"]["history"] == ["Saved", "Separately"] assert h_disk["discussions"]["main"]["history"] == ["Saved", "Separately"]
# --- Tests for History Persistence Across Turns --- # --- Tests for History Persistence Across Turns ---
def test_history_persistence_across_turns(tmp_path): def test_history_persistence_across_turns(tmp_path: Path) -> None:
""" """
Tests that discussion history is correctly persisted across multiple save/load cycles. Tests that discussion history is correctly persisted across multiple save/load cycles.
""" """

View File

@@ -1,4 +1,5 @@
import pytest import pytest
from typing import Any
import sys import sys
import os import os
import importlib.util import importlib.util
@@ -40,7 +41,7 @@ def test_new_hubs_defined_in_window_info() -> None:
assert l == label or label in l, f"Label mismatch for {tag}: expected {label}, found {l}" assert l == label or label in l, f"Label mismatch for {tag}: expected {label}, found {l}"
assert found, f"Expected window label {label} not found in window_info" assert found, f"Expected window label {label} not found in window_info"
def test_old_windows_removed_from_window_info(app_instance_simple): def test_old_windows_removed_from_window_info(app_instance_simple: Any) -> None:
""" """
Verifies that the old fragmented windows are removed from window_info. Verifies that the old fragmented windows are removed from window_info.
""" """
@@ -54,14 +55,14 @@ def test_old_windows_removed_from_window_info(app_instance_simple):
assert tag not in app_instance_simple.window_info.values(), f"Old window tag {tag} should have been removed from window_info" assert tag not in app_instance_simple.window_info.values(), f"Old window tag {tag} should have been removed from window_info"
@pytest.fixture @pytest.fixture
def app_instance_simple(): def app_instance_simple() -> Any:
from unittest.mock import patch from unittest.mock import patch
from gui_legacy import App from gui_legacy import App
with patch('gui_legacy.load_config', return_value={}): with patch('gui_legacy.load_config', return_value={}):
app = App() app = App()
return app return app
def test_hub_windows_have_correct_flags(app_instance_simple): def test_hub_windows_have_correct_flags(app_instance_simple: Any) -> None:
""" """
Verifies that the new Hub windows have appropriate flags for a professional workspace. Verifies that the new Hub windows have appropriate flags for a professional workspace.
(e.g., no_collapse should be True for main hubs). (e.g., no_collapse should be True for main hubs).
@@ -80,7 +81,7 @@ def test_hub_windows_have_correct_flags(app_instance_simple):
# but we can check if it's been configured if we mock dpg.window or check it manually # but we can check if it's been configured if we mock dpg.window or check it manually
dpg.destroy_context() dpg.destroy_context()
def test_indicators_exist(app_instance_simple): def test_indicators_exist(app_instance_simple: Any) -> None:
""" """
Verifies that the new thinking and live indicators exist in the UI. Verifies that the new thinking and live indicators exist in the UI.
""" """

View File

@@ -1,3 +1,4 @@
from typing import Generator
import pytest import pytest
from unittest.mock import MagicMock, patch, AsyncMock from unittest.mock import MagicMock, patch, AsyncMock
import asyncio import asyncio
@@ -7,7 +8,7 @@ from events import UserRequestEvent
import ai_client import ai_client
@pytest.fixture @pytest.fixture
def mock_app() -> None: def mock_app() -> Generator[App, None, None]:
with ( with (
patch('gui_2.load_config', return_value={ patch('gui_2.load_config', return_value={
"ai": {"provider": "gemini", "model": "model-1", "temperature": 0.0, "max_tokens": 100, "history_trunc_limit": 1000}, "ai": {"provider": "gemini", "model": "model-1", "temperature": 0.0, "max_tokens": 100, "history_trunc_limit": 1000},
@@ -33,7 +34,7 @@ def mock_app() -> None:
# so we just let it daemon-exit. # so we just let it daemon-exit.
@pytest.mark.timeout(10) @pytest.mark.timeout(10)
def test_user_request_integration_flow(mock_app): def test_user_request_integration_flow(mock_app: App) -> None:
""" """
Verifies that pushing a UserRequestEvent to the event_queue: Verifies that pushing a UserRequestEvent to the event_queue:
1. Triggers ai_client.send 1. Triggers ai_client.send
@@ -83,7 +84,7 @@ def test_user_request_integration_flow(mock_app):
assert app.ai_status == "done" assert app.ai_status == "done"
@pytest.mark.timeout(10) @pytest.mark.timeout(10)
def test_user_request_error_handling(mock_app): def test_user_request_error_handling(mock_app: App) -> None:
""" """
Verifies that if ai_client.send raises an exception, the UI is updated with the error state. Verifies that if ai_client.send raises an exception, the UI is updated with the error state.
""" """

View File

@@ -8,7 +8,8 @@ import gui_2
from gui_2 import App from gui_2 import App
@pytest.fixture @pytest.fixture
def mock_config(tmp_path): @pytest.fixture
def mock_config(tmp_path: Path) -> Path:
config_path = tmp_path / "config.toml" config_path = tmp_path / "config.toml"
config_path.write_text("""[projects] config_path.write_text("""[projects]
paths = [] paths = []
@@ -20,7 +21,8 @@ model = "model"
return config_path return config_path
@pytest.fixture @pytest.fixture
def mock_project(tmp_path): @pytest.fixture
def mock_project(tmp_path: Path) -> Path:
project_path = tmp_path / "project.toml" project_path = tmp_path / "project.toml"
project_path.write_text("""[project] project_path.write_text("""[project]
name = "test" name = "test"
@@ -33,7 +35,8 @@ history = []
return project_path return project_path
@pytest.fixture @pytest.fixture
def app_instance(mock_config, mock_project, monkeypatch): @pytest.fixture
def app_instance(mock_config: Path, mock_project: Path, monkeypatch: pytest.MonkeyPatch) -> App:
monkeypatch.setattr("gui_2.CONFIG_PATH", mock_config) monkeypatch.setattr("gui_2.CONFIG_PATH", mock_config)
with patch("project_manager.load_project") as mock_load, \ with patch("project_manager.load_project") as mock_load, \
patch("session_logger.open_session"): patch("session_logger.open_session"):
@@ -54,14 +57,14 @@ def app_instance(mock_config, mock_project, monkeypatch):
# but python allows calling it directly. # but python allows calling it directly.
return app return app
def test_log_management_init(app_instance): def test_log_management_init(app_instance: App) -> None:
app = app_instance app = app_instance
assert "Log Management" in app.show_windows assert "Log Management" in app.show_windows
assert app.show_windows["Log Management"] is False assert app.show_windows["Log Management"] is False
assert hasattr(app, "_render_log_management") assert hasattr(app, "_render_log_management")
assert callable(app._render_log_management) assert callable(app._render_log_management)
def test_render_log_management_logic(app_instance): def test_render_log_management_logic(app_instance: App) -> None:
app = app_instance app = app_instance
app.show_windows["Log Management"] = True app.show_windows["Log Management"] = True
# Mock LogRegistry # Mock LogRegistry

View File

@@ -1,3 +1,4 @@
from typing import Tuple
import os import os
import shutil import shutil
import pytest import pytest
@@ -7,7 +8,7 @@ from log_registry import LogRegistry
from log_pruner import LogPruner from log_pruner import LogPruner
@pytest.fixture @pytest.fixture
def pruner_setup(tmp_path): def pruner_setup(tmp_path: Path) -> Tuple[LogPruner, LogRegistry, Path]:
logs_dir = tmp_path / "logs" logs_dir = tmp_path / "logs"
logs_dir.mkdir() logs_dir.mkdir()
registry_path = logs_dir / "log_registry.toml" registry_path = logs_dir / "log_registry.toml"
@@ -15,7 +16,7 @@ def pruner_setup(tmp_path):
pruner = LogPruner(registry, str(logs_dir)) pruner = LogPruner(registry, str(logs_dir))
return pruner, registry, logs_dir return pruner, registry, logs_dir
def test_prune_old_insignificant_logs(pruner_setup): def test_prune_old_insignificant_logs(pruner_setup: Tuple[LogPruner, LogRegistry, Path]) -> None:
pruner, registry, logs_dir = pruner_setup pruner, registry, logs_dir = pruner_setup
# 1. Old and small (insignificant) -> should be pruned # 1. Old and small (insignificant) -> should be pruned
session_id_old_small = "old_small" session_id_old_small = "old_small"

View File

@@ -1,6 +1,7 @@
import os import os
import shutil import shutil
import pytest import pytest
from typing import Any
from pathlib import Path from pathlib import Path
from datetime import datetime, timedelta from datetime import datetime, timedelta
from unittest.mock import patch from unittest.mock import patch
@@ -10,7 +11,7 @@ from log_registry import LogRegistry
from log_pruner import LogPruner from log_pruner import LogPruner
@pytest.fixture @pytest.fixture
def e2e_setup(tmp_path, monkeypatch): def e2e_setup(tmp_path: Path, monkeypatch: Any) -> Any:
# Ensure closed before starting # Ensure closed before starting
session_logger.close_session() session_logger.close_session()
monkeypatch.setattr(session_logger, "_comms_fh", None) monkeypatch.setattr(session_logger, "_comms_fh", None)
@@ -29,7 +30,7 @@ def e2e_setup(tmp_path, monkeypatch):
session_logger._LOG_DIR = original_log_dir session_logger._LOG_DIR = original_log_dir
session_logger._SCRIPTS_DIR = original_scripts_dir session_logger._SCRIPTS_DIR = original_scripts_dir
def test_logging_e2e(e2e_setup): def test_logging_e2e(e2e_setup: Any) -> None:
tmp_path = e2e_setup tmp_path = e2e_setup
logs_dir = tmp_path / "logs" logs_dir = tmp_path / "logs"
# Step 1: Initialize (open_session) # Step 1: Initialize (open_session)

View File

@@ -1,9 +1,10 @@
import pytest import pytest
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
from typing import Any
from gui_2 import App from gui_2 import App
@pytest.fixture @pytest.fixture
def app_instance(): def app_instance() -> Any:
# We patch the dependencies of App.__init__ to avoid side effects # We patch the dependencies of App.__init__ to avoid side effects
with ( with (
patch('gui_2.load_config', return_value={'ai': {}, 'projects': {}}), patch('gui_2.load_config', return_value={'ai': {}, 'projects': {}}),
@@ -23,7 +24,7 @@ def app_instance():
# Return the app and the mock_pm for use in tests # Return the app and the mock_pm for use in tests
yield app, mock_pm yield app, mock_pm
def test_mma_dashboard_refresh(app_instance): def test_mma_dashboard_refresh(app_instance: Any) -> None:
app, mock_pm = app_instance app, mock_pm = app_instance
# 1. Define mock tracks # 1. Define mock tracks
mock_tracks = [ mock_tracks = [
@@ -43,7 +44,7 @@ def test_mma_dashboard_refresh(app_instance):
# Verify get_all_tracks was called with the correct base_dir # Verify get_all_tracks was called with the correct base_dir
mock_pm.get_all_tracks.assert_called_with(app.ui_files_base_dir) mock_pm.get_all_tracks.assert_called_with(app.ui_files_base_dir)
def test_mma_dashboard_initialization_refresh(app_instance): def test_mma_dashboard_initialization_refresh(app_instance: Any) -> None:
""" """
Checks that _refresh_from_project is called during initialization if Checks that _refresh_from_project is called during initialization if
_load_active_project is NOT mocked to skip it (but here it IS mocked in fixture). _load_active_project is NOT mocked to skip it (but here it IS mocked in fixture).

View File

@@ -25,7 +25,7 @@ def app_instance() -> None:
if not hasattr(app, '_show_track_proposal_modal'): app._show_track_proposal_modal = False if not hasattr(app, '_show_track_proposal_modal'): app._show_track_proposal_modal = False
yield app yield app
def test_mma_ui_state_initialization(app_instance): def test_mma_ui_state_initialization(app_instance: App) -> None:
"""Verifies that the new MMA UI state variables are initialized correctly.""" """Verifies that the new MMA UI state variables are initialized correctly."""
assert hasattr(app_instance, 'ui_epic_input') assert hasattr(app_instance, 'ui_epic_input')
assert hasattr(app_instance, 'proposed_tracks') assert hasattr(app_instance, 'proposed_tracks')
@@ -36,7 +36,7 @@ def test_mma_ui_state_initialization(app_instance):
assert app_instance._show_track_proposal_modal is False assert app_instance._show_track_proposal_modal is False
assert app_instance.mma_streams == {} assert app_instance.mma_streams == {}
def test_process_pending_gui_tasks_show_track_proposal(app_instance): def test_process_pending_gui_tasks_show_track_proposal(app_instance: App) -> None:
"""Verifies that the 'show_track_proposal' action correctly updates the UI state.""" """Verifies that the 'show_track_proposal' action correctly updates the UI state."""
mock_tracks = [{"id": "track_1", "title": "Test Track"}] mock_tracks = [{"id": "track_1", "title": "Test Track"}]
task = { task = {
@@ -48,7 +48,7 @@ def test_process_pending_gui_tasks_show_track_proposal(app_instance):
assert app_instance.proposed_tracks == mock_tracks assert app_instance.proposed_tracks == mock_tracks
assert app_instance._show_track_proposal_modal is True assert app_instance._show_track_proposal_modal is True
def test_cb_plan_epic_launches_thread(app_instance): def test_cb_plan_epic_launches_thread(app_instance: App) -> None:
"""Verifies that _cb_plan_epic launches a thread and eventually queues a task.""" """Verifies that _cb_plan_epic launches a thread and eventually queues a task."""
app_instance.ui_epic_input = "Develop a new feature" app_instance.ui_epic_input = "Develop a new feature"
app_instance.active_project_path = "test_project.toml" app_instance.active_project_path = "test_project.toml"
@@ -80,7 +80,7 @@ def test_cb_plan_epic_launches_thread(app_instance):
mock_get_history.assert_called_once() mock_get_history.assert_called_once()
mock_gen_tracks.assert_called_once() mock_gen_tracks.assert_called_once()
def test_process_pending_gui_tasks_mma_spawn_approval(app_instance): def test_process_pending_gui_tasks_mma_spawn_approval(app_instance: App) -> None:
"""Verifies that the 'mma_spawn_approval' action correctly updates the UI state.""" """Verifies that the 'mma_spawn_approval' action correctly updates the UI state."""
task = { task = {
"action": "mma_spawn_approval", "action": "mma_spawn_approval",
@@ -100,7 +100,7 @@ def test_process_pending_gui_tasks_mma_spawn_approval(app_instance):
assert task["dialog_container"][0] is not None assert task["dialog_container"][0] is not None
assert task["dialog_container"][0]._ticket_id == "T1" assert task["dialog_container"][0]._ticket_id == "T1"
def test_handle_ai_response_with_stream_id(app_instance): def test_handle_ai_response_with_stream_id(app_instance: App) -> None:
"""Verifies routing to mma_streams.""" """Verifies routing to mma_streams."""
task = { task = {
"action": "handle_ai_response", "action": "handle_ai_response",
@@ -116,7 +116,7 @@ def test_handle_ai_response_with_stream_id(app_instance):
assert app_instance.ai_status == "Thinking..." assert app_instance.ai_status == "Thinking..."
assert app_instance.ai_response == "" assert app_instance.ai_response == ""
def test_handle_ai_response_fallback(app_instance): def test_handle_ai_response_fallback(app_instance: App) -> None:
"""Verifies fallback to ai_response when stream_id is missing.""" """Verifies fallback to ai_response when stream_id is missing."""
task = { task = {
"action": "handle_ai_response", "action": "handle_ai_response",

View File

@@ -1,10 +1,11 @@
from typing import Generator
import pytest import pytest
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
import asyncio import asyncio
from gui_2 import App from gui_2 import App
@pytest.fixture @pytest.fixture
def app_instance() -> None: def app_instance() -> Generator[App, None, None]:
with ( with (
patch('gui_2.load_config', return_value={'ai': {}, 'projects': {}}), patch('gui_2.load_config', return_value={'ai': {}, 'projects': {}}),
patch('gui_2.save_config'), patch('gui_2.save_config'),
@@ -21,7 +22,7 @@ def app_instance() -> None:
app._loop = MagicMock() app._loop = MagicMock()
yield app yield app
def test_cb_ticket_retry(app_instance): def test_cb_ticket_retry(app_instance: App) -> None:
ticket_id = "test_ticket_1" ticket_id = "test_ticket_1"
app_instance.active_tickets = [{"id": ticket_id, "status": "failed"}] app_instance.active_tickets = [{"id": ticket_id, "status": "failed"}]
with patch('asyncio.run_coroutine_threadsafe') as mock_run_safe: with patch('asyncio.run_coroutine_threadsafe') as mock_run_safe:
@@ -34,7 +35,7 @@ def test_cb_ticket_retry(app_instance):
args, _ = mock_run_safe.call_args args, _ = mock_run_safe.call_args
assert args[1] == app_instance._loop assert args[1] == app_instance._loop
def test_cb_ticket_skip(app_instance): def test_cb_ticket_skip(app_instance: App) -> None:
ticket_id = "test_ticket_1" ticket_id = "test_ticket_1"
app_instance.active_tickets = [{"id": ticket_id, "status": "todo"}] app_instance.active_tickets = [{"id": ticket_id, "status": "todo"}]
with patch('asyncio.run_coroutine_threadsafe') as mock_run_safe: with patch('asyncio.run_coroutine_threadsafe') as mock_run_safe:

View File

@@ -1,17 +1,18 @@
import pytest import pytest
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import json import json
from typing import Any
import orchestrator_pm import orchestrator_pm
import conductor_tech_lead import conductor_tech_lead
import multi_agent_conductor import multi_agent_conductor
from models import Track, Ticket from models import Track, Ticket
@pytest.fixture @pytest.fixture
def mock_ai_client() -> None: def mock_ai_client() -> Any:
with patch("ai_client.send") as mock_send: with patch("ai_client.send") as mock_send:
yield mock_send yield mock_send
def test_generate_tracks(mock_ai_client): def test_generate_tracks(mock_ai_client: Any) -> None:
# Tier 1 (PM) response mock # Tier 1 (PM) response mock
mock_ai_client.return_value = json.dumps([ mock_ai_client.return_value = json.dumps([
{"id": "track_1", "title": "Infrastructure Setup", "description": "Setup basic project structure"}, {"id": "track_1", "title": "Infrastructure Setup", "description": "Setup basic project structure"},
@@ -26,8 +27,7 @@ def test_generate_tracks(mock_ai_client):
assert tracks[1]["id"] == "track_2" assert tracks[1]["id"] == "track_2"
mock_ai_client.assert_called_once() mock_ai_client.assert_called_once()
def test_generate_tickets(mock_ai_client): def test_generate_tickets(mock_ai_client: Any) -> None:
# Tier 2 (Tech Lead) response mock
mock_ai_client.return_value = json.dumps([ mock_ai_client.return_value = json.dumps([
{"id": "T-001", "description": "Define interfaces", "depends_on": []}, {"id": "T-001", "description": "Define interfaces", "depends_on": []},
{"id": "T-002", "description": "Implement interfaces", "depends_on": ["T-001"]} {"id": "T-002", "description": "Implement interfaces", "depends_on": ["T-001"]}
@@ -102,7 +102,7 @@ def test_conductor_engine_parse_json_tickets() -> None:
assert track.tickets[1].id == "T2" assert track.tickets[1].id == "T2"
assert track.tickets[1].depends_on == ["T1"] assert track.tickets[1].depends_on == ["T1"]
def test_run_worker_lifecycle_blocked(mock_ai_client): def test_run_worker_lifecycle_blocked(mock_ai_client: Any) -> None:
ticket = Ticket(id="T1", description="desc", status="todo", assigned_to="user") ticket = Ticket(id="T1", description="desc", status="todo", assigned_to="user")
context = multi_agent_conductor.WorkerContext(ticket_id="T1", model_name="model", messages=[]) context = multi_agent_conductor.WorkerContext(ticket_id="T1", model_name="model", messages=[])
mock_ai_client.return_value = "BLOCKED because of missing info" mock_ai_client.return_value = "BLOCKED because of missing info"

View File

@@ -1,4 +1,5 @@
import unittest import unittest
from typing import Any
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
import json import json
import orchestrator_pm import orchestrator_pm
@@ -8,7 +9,7 @@ class TestOrchestratorPM(unittest.TestCase):
@patch('summarize.build_summary_markdown') @patch('summarize.build_summary_markdown')
@patch('ai_client.send') @patch('ai_client.send')
def test_generate_tracks_success(self, mock_send, mock_summarize): def test_generate_tracks_success(self, mock_send: Any, mock_summarize: Any) -> None:
# Setup mocks # Setup mocks
mock_summarize.return_value = "REPO_MAP_CONTENT" mock_summarize.return_value = "REPO_MAP_CONTENT"
mock_response_data = [ mock_response_data = [
@@ -44,7 +45,7 @@ class TestOrchestratorPM(unittest.TestCase):
@patch('summarize.build_summary_markdown') @patch('summarize.build_summary_markdown')
@patch('ai_client.send') @patch('ai_client.send')
def test_generate_tracks_markdown_wrapped(self, mock_send, mock_summarize): def test_generate_tracks_markdown_wrapped(self, mock_send: Any, mock_summarize: Any) -> None:
mock_summarize.return_value = "REPO_MAP" mock_summarize.return_value = "REPO_MAP"
mock_response_data = [{"id": "track_1"}] mock_response_data = [{"id": "track_1"}]
expected_result = [{"id": "track_1", "title": "Untitled Track"}] expected_result = [{"id": "track_1", "title": "Untitled Track"}]
@@ -59,7 +60,7 @@ class TestOrchestratorPM(unittest.TestCase):
@patch('summarize.build_summary_markdown') @patch('summarize.build_summary_markdown')
@patch('ai_client.send') @patch('ai_client.send')
def test_generate_tracks_malformed_json(self, mock_send, mock_summarize): def test_generate_tracks_malformed_json(self, mock_send: Any, mock_summarize: Any) -> None:
mock_summarize.return_value = "REPO_MAP" mock_summarize.return_value = "REPO_MAP"
mock_send.return_value = "NOT A JSON" mock_send.return_value = "NOT A JSON"
# Should return empty list and print error (we can mock print if we want to be thorough) # Should return empty list and print error (we can mock print if we want to be thorough)

View File

@@ -19,7 +19,7 @@ class TestOrchestratorPMHistory(unittest.TestCase):
if self.test_dir.exists(): if self.test_dir.exists():
shutil.rmtree(self.test_dir) shutil.rmtree(self.test_dir)
def create_track(self, parent_dir, track_id, title, status, overview): def create_track(self, parent_dir: Path, track_id: str, title: str, status: str, overview: str) -> None:
track_path = parent_dir / track_id track_path = parent_dir / track_id
track_path.mkdir(exist_ok=True) track_path.mkdir(exist_ok=True)
metadata = {"title": title, "status": status} metadata = {"title": title, "status": status}
@@ -30,8 +30,7 @@ class TestOrchestratorPMHistory(unittest.TestCase):
f.write(spec_content) f.write(spec_content)
@patch('orchestrator_pm.CONDUCTOR_PATH', Path("test_conductor")) @patch('orchestrator_pm.CONDUCTOR_PATH', Path("test_conductor"))
def test_get_track_history_summary(self): def test_get_track_history_summary(self) -> None:
# Setup mock tracks
self.create_track(self.archive_dir, "track_001", "Initial Setup", "completed", "Setting up the project structure.") self.create_track(self.archive_dir, "track_001", "Initial Setup", "completed", "Setting up the project structure.")
self.create_track(self.tracks_dir, "track_002", "Feature A", "in_progress", "Implementing Feature A.") self.create_track(self.tracks_dir, "track_002", "Feature A", "in_progress", "Implementing Feature A.")
summary = orchestrator_pm.get_track_history_summary() summary = orchestrator_pm.get_track_history_summary()
@@ -43,8 +42,7 @@ class TestOrchestratorPMHistory(unittest.TestCase):
self.assertIn("Implementing Feature A.", summary) self.assertIn("Implementing Feature A.", summary)
@patch('orchestrator_pm.CONDUCTOR_PATH', Path("test_conductor")) @patch('orchestrator_pm.CONDUCTOR_PATH', Path("test_conductor"))
def test_get_track_history_summary_missing_files(self): def test_get_track_history_summary_missing_files(self) -> None:
# Track with missing spec.md
track_path = self.tracks_dir / "track_003" track_path = self.tracks_dir / "track_003"
track_path.mkdir(exist_ok=True) track_path.mkdir(exist_ok=True)
with open(track_path / "metadata.json", "w") as f: with open(track_path / "metadata.json", "w") as f:
@@ -56,7 +54,7 @@ class TestOrchestratorPMHistory(unittest.TestCase):
@patch('orchestrator_pm.summarize.build_summary_markdown') @patch('orchestrator_pm.summarize.build_summary_markdown')
@patch('ai_client.send') @patch('ai_client.send')
def test_generate_tracks_with_history(self, mock_send, mock_summarize): def test_generate_tracks_with_history(self, mock_send: MagicMock, mock_summarize: MagicMock) -> None:
mock_summarize.return_value = "REPO_MAP" mock_summarize.return_value = "REPO_MAP"
mock_send.return_value = "[]" mock_send.return_value = "[]"
history_summary = "PAST_HISTORY_SUMMARY" history_summary = "PAST_HISTORY_SUMMARY"

View File

@@ -1,10 +1,11 @@
from typing import Generator
import pytest import pytest
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import ai_client import ai_client
from gui_2 import App from gui_2 import App
@pytest.fixture @pytest.fixture
def app_instance() -> None: def app_instance() -> Generator[App, None, None]:
with ( with (
patch('gui_2.load_config', return_value={'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'}, 'projects': {}}), patch('gui_2.load_config', return_value={'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'}, 'projects': {}}),
patch('gui_2.save_config'), patch('gui_2.save_config'),
@@ -21,8 +22,7 @@ def app_instance() -> None:
app = App() app = App()
yield app yield app
def test_redundant_calls_in_process_pending_gui_tasks(app_instance): def test_redundant_calls_in_process_pending_gui_tasks(app_instance: App) -> None:
# Setup
app_instance._pending_gui_tasks = [ app_instance._pending_gui_tasks = [
{'action': 'set_value', 'item': 'current_provider', 'value': 'anthropic'} {'action': 'set_value', 'item': 'current_provider', 'value': 'anthropic'}
] ]
@@ -40,8 +40,7 @@ def test_redundant_calls_in_process_pending_gui_tasks(app_instance):
assert mock_set_provider.call_count == 1 assert mock_set_provider.call_count == 1
assert mock_reset_session.call_count == 1 assert mock_reset_session.call_count == 1
def test_gcli_path_updates_adapter(app_instance): def test_gcli_path_updates_adapter(app_instance: App) -> None:
# Setup
app_instance.current_provider = 'gemini_cli' app_instance.current_provider = 'gemini_cli'
app_instance._pending_gui_tasks = [ app_instance._pending_gui_tasks = [
{'action': 'set_value', 'item': 'gcli_path', 'value': '/new/path/to/gemini'} {'action': 'set_value', 'item': 'gcli_path', 'value': '/new/path/to/gemini'}

View File

@@ -1,15 +1,15 @@
import pytest import pytest
from typing import Any
import json import json
from pathlib import Path from pathlib import Path
from project_manager import get_all_tracks, save_track_state from project_manager import get_all_tracks, save_track_state
from models import TrackState, Metadata, Ticket from models import TrackState, Metadata, Ticket
from datetime import datetime from datetime import datetime
def test_get_all_tracks_empty(tmp_path): def test_get_all_tracks_empty(tmp_path: Any) -> None:
# conductor/tracks directory doesn't exist
assert get_all_tracks(tmp_path) == [] assert get_all_tracks(tmp_path) == []
def test_get_all_tracks_with_state(tmp_path): def test_get_all_tracks_with_state(tmp_path: Any) -> None:
tracks_dir = tmp_path / "conductor" / "tracks" tracks_dir = tmp_path / "conductor" / "tracks"
tracks_dir.mkdir(parents=True) tracks_dir.mkdir(parents=True)
track_id = "test_track_1" track_id = "test_track_1"
@@ -34,7 +34,7 @@ def test_get_all_tracks_with_state(tmp_path):
assert track["total"] == 2 assert track["total"] == 2
assert track["progress"] == 0.5 assert track["progress"] == 0.5
def test_get_all_tracks_with_metadata_json(tmp_path): def test_get_all_tracks_with_metadata_json(tmp_path: Any) -> None:
tracks_dir = tmp_path / "conductor" / "tracks" tracks_dir = tmp_path / "conductor" / "tracks"
tracks_dir.mkdir(parents=True) tracks_dir.mkdir(parents=True)
track_id = "test_track_2" track_id = "test_track_2"
@@ -66,7 +66,7 @@ def test_get_all_tracks_with_metadata_json(tmp_path):
assert track["total"] == 3 assert track["total"] == 3
assert pytest.approx(track["progress"]) == 0.333333 assert pytest.approx(track["progress"]) == 0.333333
def test_get_all_tracks_malformed(tmp_path): def test_get_all_tracks_malformed(tmp_path: Any) -> None:
tracks_dir = tmp_path / "conductor" / "tracks" tracks_dir = tmp_path / "conductor" / "tracks"
tracks_dir.mkdir(parents=True) tracks_dir.mkdir(parents=True)
track_id = "malformed_track" track_id = "malformed_track"

View File

@@ -1,15 +1,16 @@
import os
import shutil
import pytest import pytest
import shutil
import os
import tomllib
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
from unittest.mock import patch from unittest.mock import patch
from typing import Generator
import session_logger import session_logger
import tomllib
@pytest.fixture @pytest.fixture
def temp_logs(tmp_path, monkeypatch): def temp_logs(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[Path, None, None]:
# Ensure closed before starting # Ensure closed before starting
session_logger.close_session() session_logger.close_session()
monkeypatch.setattr(session_logger, "_comms_fh", None) monkeypatch.setattr(session_logger, "_comms_fh", None)
# Mock _LOG_DIR in session_logger # Mock _LOG_DIR in session_logger
@@ -28,7 +29,8 @@ def temp_logs(tmp_path, monkeypatch):
session_logger._LOG_DIR = original_log_dir session_logger._LOG_DIR = original_log_dir
session_logger._SCRIPTS_DIR = original_scripts_dir session_logger._SCRIPTS_DIR = original_scripts_dir
def test_open_session_creates_subdir_and_registry(temp_logs): def test_open_session_creates_subdir_and_registry(temp_logs: Path) -> None:
label = "test-label" label = "test-label"
# We can't easily mock datetime.datetime.now() because it's a built-in # We can't easily mock datetime.datetime.now() because it's a built-in
# but we can check the resulting directory name pattern # but we can check the resulting directory name pattern

View File

@@ -7,12 +7,11 @@ import asyncio
import concurrent.futures import concurrent.futures
class MockDialog: class MockDialog:
def __init__(self, approved, final_payload=None): def __init__(self, approved: bool, final_payload: dict | None = None) -> None:
self.approved = approved self.approved = approved
self.final_payload = final_payload self.final_payload = final_payload
def wait(self): def wait(self) -> dict:
# Match the new return format: a dictionary
res = {'approved': self.approved, 'abort': False} res = {'approved': self.approved, 'abort': False}
if self.final_payload: if self.final_payload:
res.update(self.final_payload) res.update(self.final_payload)
@@ -25,7 +24,7 @@ def mock_ai_client() -> None:
yield mock_send yield mock_send
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_confirm_spawn_pushed_to_queue(): async def test_confirm_spawn_pushed_to_queue() -> None:
event_queue = events.AsyncEventQueue() event_queue = events.AsyncEventQueue()
ticket_id = "T1" ticket_id = "T1"
role = "Tier 3 Worker" role = "Tier 3 Worker"
@@ -54,7 +53,7 @@ async def test_confirm_spawn_pushed_to_queue():
assert final_context == "Modified Context" assert final_context == "Modified Context"
@patch("multi_agent_conductor.confirm_spawn") @patch("multi_agent_conductor.confirm_spawn")
def test_run_worker_lifecycle_approved(mock_confirm, mock_ai_client): def test_run_worker_lifecycle_approved(mock_confirm: MagicMock, mock_ai_client: MagicMock) -> None:
ticket = Ticket(id="T1", description="desc", status="todo", assigned_to="user") ticket = Ticket(id="T1", description="desc", status="todo", assigned_to="user")
context = WorkerContext(ticket_id="T1", model_name="model", messages=[]) context = WorkerContext(ticket_id="T1", model_name="model", messages=[])
event_queue = events.AsyncEventQueue() event_queue = events.AsyncEventQueue()
@@ -68,7 +67,7 @@ def test_run_worker_lifecycle_approved(mock_confirm, mock_ai_client):
assert ticket.status == "completed" assert ticket.status == "completed"
@patch("multi_agent_conductor.confirm_spawn") @patch("multi_agent_conductor.confirm_spawn")
def test_run_worker_lifecycle_rejected(mock_confirm, mock_ai_client): def test_run_worker_lifecycle_rejected(mock_confirm: MagicMock, mock_ai_client: MagicMock) -> None:
ticket = Ticket(id="T1", description="desc", status="todo", assigned_to="user") ticket = Ticket(id="T1", description="desc", status="todo", assigned_to="user")
context = WorkerContext(ticket_id="T1", model_name="model", messages=[]) context = WorkerContext(ticket_id="T1", model_name="model", messages=[])
event_queue = events.AsyncEventQueue() event_queue = events.AsyncEventQueue()

View File

@@ -1,9 +1,9 @@
import pytest import pytest
from typing import Any
from pathlib import Path from pathlib import Path
from aggregate import build_tier1_context, build_tier2_context, build_tier3_context from aggregate import build_tier1_context, build_tier2_context, build_tier3_context
def test_build_tier1_context_exists(): def test_build_tier1_context_exists() -> None:
# This should fail if the function is not defined
file_items = [ file_items = [
{"path": Path("conductor/product.md"), "entry": "conductor/product.md", "content": "Product content", "error": False}, {"path": Path("conductor/product.md"), "entry": "conductor/product.md", "content": "Product content", "error": False},
{"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False} {"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False}
@@ -22,7 +22,7 @@ def test_build_tier2_context_exists() -> None:
result = build_tier2_context(file_items, Path("."), [], history) result = build_tier2_context(file_items, Path("."), [], history)
assert "Other content" in result assert "Other content" in result
def test_build_tier3_context_ast_skeleton(monkeypatch): def test_build_tier3_context_ast_skeleton(monkeypatch: Any) -> None:
from unittest.mock import MagicMock from unittest.mock import MagicMock
import aggregate import aggregate
import file_cache import file_cache
@@ -59,7 +59,7 @@ def test_build_tier3_context_exists() -> None:
assert "other.py" in result assert "other.py" in result
assert "AST Skeleton" in result assert "AST Skeleton" in result
def test_build_file_items_with_tiers(tmp_path): def test_build_file_items_with_tiers(tmp_path: Any) -> None:
from aggregate import build_file_items from aggregate import build_file_items
# Create some dummy files # Create some dummy files
file1 = tmp_path / "file1.txt" file1 = tmp_path / "file1.txt"
@@ -80,7 +80,7 @@ def test_build_file_items_with_tiers(tmp_path):
assert item2["content"] == "content2" assert item2["content"] == "content2"
assert item2["tier"] == 3 assert item2["tier"] == 3
def test_build_files_section_with_dicts(tmp_path): def test_build_files_section_with_dicts(tmp_path: Any) -> None:
from aggregate import build_files_section from aggregate import build_files_section
file1 = tmp_path / "file1.txt" file1 = tmp_path / "file1.txt"
file1.write_text("content1") file1.write_text("content1")