test(audit): fix critical test suite deadlocks and write exhaustive architectural report

- Fix 'Triple Bingo' history synchronization explosion during streaming

- Implement stateless event buffering in ApiHookClient to prevent dropped events

- Ensure 'tool_execution' events emit consistently across all LLM providers

- Add hard timeouts to all background thread wait() conditions

- Add thorough teardown cleanup to conftest.py's reset_ai_client fixture

- Write highly detailed report_gemini.md exposing asyncio lifecycle flaws
This commit is contained in:
2026-03-05 01:42:47 -05:00
parent bfdbd43785
commit 35480a26dc
15 changed files with 715 additions and 481 deletions

View File

@@ -66,8 +66,11 @@ class ConfirmDialog:
self._approved = False
def wait(self) -> tuple[bool, str]:
start_time = time.time()
with self._condition:
while not self._done:
if time.time() - start_time > 120:
return False, self._script
self._condition.wait(timeout=0.1)
return self._approved, self._script
@@ -79,8 +82,11 @@ class MMAApprovalDialog:
self._approved = False
def wait(self) -> tuple[bool, str]:
start_time = time.time()
with self._condition:
while not self._done:
if time.time() - start_time > 120:
return False, self._payload
self._condition.wait(timeout=0.1)
return self._approved, self._payload
@@ -94,8 +100,11 @@ class MMASpawnApprovalDialog:
self._abort = False
def wait(self) -> dict[str, Any]:
start_time = time.time()
with self._condition:
while not self._done:
if time.time() - start_time > 120:
return {'approved': False, 'abort': True, 'prompt': self._prompt, 'context_md': self._context_md}
self._condition.wait(timeout=0.1)
return {
'approved': self._approved,
@@ -109,6 +118,8 @@ class AppController:
The headless controller for the Manual Slop application.
Owns the application state and manages background services.
"""
PROVIDERS: list[str] = ["gemini", "anthropic", "gemini_cli", "deepseek"]
def __init__(self):
# Initialize locks first to avoid initialization order issues
self._send_thread_lock: threading.Lock = threading.Lock()
@@ -267,6 +278,230 @@ class AppController:
self.prior_session_entries: List[Dict[str, Any]] = []
self.test_hooks_enabled: bool = ("--enable-test-hooks" in sys.argv) or (os.environ.get("SLOP_TEST_HOOKS") == "1")
self.ui_manual_approve: bool = False
self._init_actions()
def _init_actions(self) -> None:
# Set up state-related action maps
self._clickable_actions: dict[str, Callable[..., Any]] = {
'btn_reset': self._handle_reset_session,
'btn_gen_send': self._handle_generate_send,
'btn_md_only': self._handle_md_only,
'btn_approve_script': self._handle_approve_script,
'btn_reject_script': self._handle_reject_script,
'btn_project_save': self._cb_project_save,
'btn_disc_create': self._cb_disc_create,
'btn_mma_plan_epic': self._cb_plan_epic,
'btn_mma_accept_tracks': self._cb_accept_tracks,
'btn_mma_start_track': self._cb_start_track,
'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type),
'btn_approve_tool': self._handle_approve_ask,
'btn_approve_mma_step': lambda: self._handle_mma_respond(approved=True),
'btn_approve_spawn': lambda: self._handle_mma_respond(approved=True),
}
self._predefined_callbacks: dict[str, Callable[..., Any]] = {
'_test_callback_func_write_to_file': self._test_callback_func_write_to_file
}
def _process_pending_gui_tasks(self) -> None:
if not self._pending_gui_tasks:
return
with self._pending_gui_tasks_lock:
tasks = self._pending_gui_tasks[:]
self._pending_gui_tasks.clear()
for task in tasks:
try:
action = task.get("action")
if action:
session_logger.log_api_hook("PROCESS_TASK", action, str(task))
# ...
if action == "refresh_api_metrics":
self._refresh_api_metrics(task.get("payload", {}), md_content=self.last_md or None)
elif action == "handle_ai_response":
payload = task.get("payload", {})
text = payload.get("text", "")
stream_id = payload.get("stream_id")
is_streaming = payload.get("status") == "streaming..."
if stream_id:
if is_streaming:
if stream_id not in self.mma_streams: self.mma_streams[stream_id] = ""
self.mma_streams[stream_id] += text
else:
self.mma_streams[stream_id] = text
if stream_id == "Tier 1":
if "status" in payload:
self.ai_status = payload["status"]
else:
if is_streaming:
self.ai_response += text
else:
self.ai_response = text
self.ai_status = payload.get("status", "done")
self._trigger_blink = True
if not stream_id:
self._token_stats_dirty = True
# ONLY add to history when turn is complete
if self.ui_auto_add_history and not stream_id and not is_streaming:
role = payload.get("role", "AI")
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": self.ai_response,
"collapsed": False,
"ts": project_manager.now_ts()
})
elif action == "mma_stream_append":
payload = task.get("payload", {})
stream_id = payload.get("stream_id")
text = payload.get("text", "")
if stream_id:
if stream_id not in self.mma_streams:
self.mma_streams[stream_id] = ""
self.mma_streams[stream_id] += text
elif action == "show_track_proposal":
self.proposed_tracks = task.get("payload", [])
self._show_track_proposal_modal = True
elif action == "mma_state_update":
payload = task.get("payload", {})
self.mma_status = payload.get("status", "idle")
self.active_tier = payload.get("active_tier")
self.mma_tier_usage = payload.get("tier_usage", self.mma_tier_usage)
self.active_tickets = payload.get("tickets", [])
track_data = payload.get("track")
if track_data:
tickets = []
for t_data in self.active_tickets:
tickets.append(Ticket(**t_data))
self.active_track = Track(
id=track_data.get("id"),
description=track_data.get("title", ""),
tickets=tickets
)
elif action == "set_value":
item = task.get("item")
value = task.get("value")
if item in self._settable_fields:
attr_name = self._settable_fields[item]
setattr(self, attr_name, value)
if item == "gcli_path":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=str(value))
else:
ai_client._gemini_cli_adapter.binary_path = str(value)
elif action == "click":
item = task.get("item")
user_data = task.get("user_data")
if item == "btn_project_new_automated":
self._cb_new_project_automated(user_data)
elif item == "btn_mma_load_track":
self._cb_load_track(str(user_data or ""))
elif item in self._clickable_actions:
import inspect
func = self._clickable_actions[item]
try:
sig = inspect.signature(func)
if 'user_data' in sig.parameters:
func(user_data=user_data)
else:
func()
except Exception:
func()
elif action == "select_list_item":
item = task.get("listbox", task.get("item"))
value = task.get("item_value", task.get("value"))
if item == "disc_listbox":
self._switch_discussion(str(value or ""))
elif task.get("type") == "ask":
self._pending_ask_dialog = True
self._ask_request_id = task.get("request_id")
self._ask_tool_data = task.get("data", {})
elif action == "clear_ask":
if self._ask_request_id == task.get("request_id"):
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
elif action == "custom_callback":
cb = task.get("callback")
args = task.get("args", [])
if callable(cb):
try: cb(*args)
except Exception as e: print(f"Error in direct custom callback: {e}")
elif cb in self._predefined_callbacks:
self._predefined_callbacks[cb](*args)
elif action == "mma_step_approval":
dlg = MMAApprovalDialog(str(task.get("ticket_id") or ""), str(task.get("payload") or ""))
self._pending_mma_approval = task
if "dialog_container" in task:
task["dialog_container"][0] = dlg
elif action == 'refresh_from_project':
self._refresh_from_project()
elif action == "mma_spawn_approval":
spawn_dlg = MMASpawnApprovalDialog(
str(task.get("ticket_id") or ""),
str(task.get("role") or ""),
str(task.get("prompt") or ""),
str(task.get("context_md") or "")
)
self._pending_mma_spawn = task
self._mma_spawn_prompt = task.get("prompt", "")
self._mma_spawn_context = task.get("context_md", "")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
if "dialog_container" in task:
task["dialog_container"][0] = spawn_dlg
except Exception as e:
print(f"Error executing GUI task: {e}")
def _process_pending_history_adds(self) -> None:
"""Synchronizes pending history entries to the active discussion and project state."""
with self._pending_history_adds_lock:
items = self._pending_history_adds[:]
self._pending_history_adds.clear()
if not items:
return
self._scroll_disc_to_bottom = True
for item in items:
item.get("role", "unknown")
if item.get("role") and item["role"] not in self.disc_roles:
self.disc_roles.append(item["role"])
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
disc_data = discussions.get(self.active_discussion)
if disc_data is not None:
if item.get("disc_title", self.active_discussion) == self.active_discussion:
if self.disc_entries is not disc_data.get("history"):
if "history" not in disc_data:
disc_data["history"] = []
disc_data["history"].append(project_manager.entry_to_str(item))
disc_data["last_updated"] = project_manager.now_ts()
with self._disc_entries_lock:
self.disc_entries.append(item)
def _test_callback_func_write_to_file(self, data: str) -> None:
"""A dummy function that a custom_callback would execute for testing."""
with open("test_callback_output.txt", "w") as f:
f.write(data)
def _handle_approve_script(self, user_data=None) -> None:
"""Approves the currently pending PowerShell script."""
with self._pending_dialog_lock:
dlg = self._pending_dialog
if dlg:
with dlg._condition:
dlg._approved = True
dlg._done = True
dlg._condition.notify_all()
self._pending_dialog = None
def _handle_reject_script(self, user_data=None) -> None:
"""Rejects the currently pending PowerShell script."""
with self._pending_dialog_lock:
dlg = self._pending_dialog
if dlg:
with dlg._condition:
dlg._approved = False
dlg._done = True
dlg._condition.notify_all()
self._pending_dialog = None
def init_state(self):
"""Initializes the application state from configurations."""
@@ -418,10 +653,12 @@ class AppController:
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()
def stop_services(self) -> None:
def shutdown(self) -> None:
"""Stops background threads and cleans up resources."""
import ai_client
ai_client.cleanup()
if hasattr(self, 'hook_server') and self.hook_server:
self.hook_server.stop()
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._loop_thread and self._loop_thread.is_alive():
@@ -440,9 +677,9 @@ class AppController:
ai_client.tool_log_callback = self._on_tool_log
mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics
self.perf_monitor.alert_callback = self._on_performance_alert
ai_client.events.on("request_start", self._on_api_event)
ai_client.events.on("response_received", self._on_api_event)
ai_client.events.on("tool_execution", self._on_api_event)
ai_client.events.on("request_start", lambda **kw: self._on_api_event("request_start", **kw))
ai_client.events.on("response_received", lambda **kw: self._on_api_event("response_received", **kw))
ai_client.events.on("tool_execution", lambda **kw: self._on_api_event("tool_execution", **kw))
self._settable_fields: Dict[str, str] = {
'ai_input': 'ui_ai_input',
@@ -477,12 +714,35 @@ class AppController:
"""Internal loop runner."""
asyncio.set_event_loop(self._loop)
self._loop.create_task(self._process_event_queue())
# Fallback: process queues even if GUI thread is idling/stuck (or in headless mode)
async def queue_fallback() -> None:
while True:
try:
# These methods are normally called by the GUI thread,
# but we call them here as a fallback for headless/background operations.
# The methods themselves are expected to be thread-safe or handle locks.
# Since they are on 'self' (the controller), and App delegates to them,
# we need to make sure we don't double-process if App is also calling them.
# However, _pending_gui_tasks uses a lock, so it's safe.
if hasattr(self, '_process_pending_gui_tasks'):
self._process_pending_gui_tasks()
if hasattr(self, '_process_pending_history_adds'):
self._process_pending_history_adds()
except: pass
await asyncio.sleep(0.1)
self._loop.create_task(queue_fallback())
self._loop.run_forever()
async def _process_event_queue(self) -> None:
"""Listens for and processes events from the AsyncEventQueue."""
sys.stderr.write("[DEBUG] _process_event_queue started\n")
sys.stderr.flush()
while True:
event_name, payload = await self.event_queue.get()
sys.stderr.write(f"[DEBUG] _process_event_queue got event: {event_name}\n")
sys.stderr.flush()
if event_name == "user_request":
self._loop.run_in_executor(None, self._handle_request_event, payload)
elif event_name == "response":
@@ -517,6 +777,10 @@ class AppController:
"collapsed": False,
"ts": project_manager.now_ts()
})
# Clear response area for new turn
self.ai_response = ""
csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()])
ai_client.set_custom_system_prompt("\n\n".join(csp))
ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit)
@@ -528,11 +792,13 @@ class AppController:
event.base_dir,
event.file_items,
event.disc_text,
stream=True,
stream_callback=lambda text: self._on_ai_stream(text),
pre_tool_callback=self._confirm_and_run,
qa_callback=ai_client.run_tier4_analysis
)
asyncio.run_coroutine_threadsafe(
self.event_queue.put("response", {"text": resp, "status": "done"}),
self.event_queue.put("response", {"text": resp, "status": "done", "role": "AI"}),
self._loop
)
except ProviderError as e:
@@ -546,6 +812,13 @@ class AppController:
self._loop
)
def _on_ai_stream(self, text: str) -> None:
"""Handles streaming text from the AI."""
asyncio.run_coroutine_threadsafe(
self.event_queue.put("response", {"text": text, "status": "streaming...", "role": "AI"}),
self._loop
)
def _on_comms_entry(self, entry: Dict[str, Any]) -> None:
session_logger.log_comms(entry)
entry["local_ts"] = time.time()
@@ -586,11 +859,13 @@ class AppController:
with self._pending_tool_calls_lock:
self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
def _on_api_event(self, *args: Any, **kwargs: Any) -> None:
def _on_api_event(self, event_name: str, **kwargs: Any) -> None:
payload = kwargs.get("payload", {})
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload})
if self.test_hooks_enabled:
with self._api_event_queue_lock:
self._api_event_queue.append({"type": event_name, "payload": payload})
def _on_performance_alert(self, message: str) -> None:
alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load."
with self._pending_history_adds_lock:
@@ -601,12 +876,19 @@ class AppController:
})
def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> Optional[str]:
sys.stderr.write(f"[DEBUG] _confirm_and_run called. test_hooks={self.test_hooks_enabled}, manual_approve={getattr(self, 'ui_manual_approve', False)}\n")
sys.stderr.flush()
if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False):
sys.stderr.write("[DEBUG] Auto-approving script.\n")
sys.stderr.flush()
self.ai_status = "running powershell..."
output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback)
self._append_tool_log(script, output)
self.ai_status = "powershell done, awaiting AI..."
return output
sys.stderr.write("[DEBUG] Creating ConfirmDialog.\n")
sys.stderr.flush()
dialog = ConfirmDialog(script, base_dir)
is_headless = "--headless" in sys.argv
if is_headless:
@@ -625,8 +907,14 @@ class AppController:
"base_dir": str(base_dir),
"ts": time.time()
})
sys.stderr.write(f"[DEBUG] Appended script_confirmation_required to _api_event_queue. ID={dialog._uid}\n")
sys.stderr.flush()
sys.stderr.write(f"[DEBUG] Waiting for dialog ID={dialog._uid}...\n")
sys.stderr.flush()
approved, final_script = dialog.wait()
sys.stderr.write(f"[DEBUG] Dialog ID={dialog._uid} finished wait. approved={approved}\n")
sys.stderr.flush()
if is_headless:
with self._pending_dialog_lock:
if dialog._uid in self._pending_actions:
@@ -1119,62 +1407,86 @@ class AppController:
self._ask_tool_data = None
def _handle_reset_session(self) -> None:
"""Logic for resetting the AI session."""
"""Logic for resetting the AI session and GUI state."""
ai_client.reset_session()
ai_client.clear_comms_log()
self._tool_log.clear()
self._comms_log.clear()
self.disc_entries.clear()
# Clear history in project dict too
# Clear history in ALL discussions to be safe
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if self.active_discussion in discussions:
discussions[self.active_discussion]["history"] = []
for d_name in discussions:
discussions[d_name]["history"] = []
self.ai_status = "session reset"
self.ai_response = ""
self.ui_ai_input = ""
self.ui_manual_approve = False
self.ui_auto_add_history = False
self._current_provider = "gemini"
self._current_model = "gemini-2.5-flash-lite"
ai_client.set_provider(self._current_provider, self._current_model)
with self._pending_history_adds_lock:
self._pending_history_adds.clear()
with self._api_event_queue_lock:
self._api_event_queue.clear()
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.clear()
def _handle_md_only(self) -> None:
"""Logic for the 'MD Only' action."""
try:
md, path, *_ = self._do_generate()
self.last_md = md
self.last_md_path = path
self.ai_status = f"md written: {path.name}"
# Refresh token budget metrics with CURRENT md
self._refresh_api_metrics({}, md_content=md)
except Exception as e:
self.ai_status = f"error: {e}"
def worker():
try:
md, path, *_ = self._do_generate()
self.last_md = md
self.last_md_path = path
self.ai_status = f"md written: {path.name}"
# Refresh token budget metrics with CURRENT md
self._refresh_api_metrics({}, md_content=md)
except Exception as e:
self.ai_status = f"error: {e}"
threading.Thread(target=worker, daemon=True).start()
def _handle_generate_send(self) -> None:
"""Logic for the 'Gen + Send' action."""
try:
md, path, file_items, stable_md, disc_text = self._do_generate()
self._last_stable_md = stable_md
self.last_md = md
self.last_md_path = path
self.last_file_items = file_items
except Exception as e:
self.ai_status = f"generate error: {e}"
return
self.ai_status = "sending..."
user_msg = self.ui_ai_input
base_dir = self.ui_files_base_dir
# Prepare event payload
event_payload = events.UserRequestEvent(
prompt=user_msg,
stable_md=stable_md,
file_items=file_items,
disc_text=disc_text,
base_dir=base_dir
)
# Push to async queue
asyncio.run_coroutine_threadsafe(
self.event_queue.put("user_request", event_payload),
self._loop
)
def worker():
sys.stderr.write("[DEBUG] _handle_generate_send worker started\n")
sys.stderr.flush()
try:
md, path, file_items, stable_md, disc_text = self._do_generate()
self._last_stable_md = stable_md
self.last_md = md
self.last_md_path = path
self.last_file_items = file_items
self.ai_status = "sending..."
user_msg = self.ui_ai_input
base_dir = self.ui_files_base_dir
sys.stderr.write(f"[DEBUG] _do_generate success. Prompt: {user_msg[:50]}...\n")
sys.stderr.flush()
# Prepare event payload
event_payload = events.UserRequestEvent(
prompt=user_msg,
stable_md=stable_md,
file_items=file_items,
disc_text=disc_text,
base_dir=base_dir
)
# Push to async queue
asyncio.run_coroutine_threadsafe(
self.event_queue.put("user_request", event_payload),
self._loop
)
sys.stderr.write("[DEBUG] Enqueued user_request event\n")
sys.stderr.flush()
except Exception as e:
import traceback
sys.stderr.write(f"[DEBUG] _do_generate ERROR: {e}\n{traceback.format_exc()}\n")
sys.stderr.flush()
self.ai_status = f"generate error: {e}"
threading.Thread(target=worker, daemon=True).start()
def _recalculate_session_usage(self) -> None:
usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0}