Compare commits

..

2 Commits

Author SHA1 Message Date
Ed_
e3c6b9e498 test(audit): fix critical test suite deadlocks and write exhaustive architectural report
- Fix 'Triple Bingo' history synchronization explosion during streaming

- Implement stateless event buffering in ApiHookClient to prevent dropped events

- Ensure 'tool_execution' events emit consistently across all LLM providers

- Add hard timeouts to all background thread wait() conditions

- Add thorough teardown cleanup to conftest.py's reset_ai_client fixture

- Write highly detailed report_gemini.md exposing asyncio lifecycle flaws
2026-03-05 01:46:13 -05:00
Ed_
35480a26dc test(audit): fix critical test suite deadlocks and write exhaustive architectural report
- Fix 'Triple Bingo' history synchronization explosion during streaming

- Implement stateless event buffering in ApiHookClient to prevent dropped events

- Ensure 'tool_execution' events emit consistently across all LLM providers

- Add hard timeouts to all background thread wait() conditions

- Add thorough teardown cleanup to conftest.py's reset_ai_client fixture

- Write highly detailed report_gemini.md exposing asyncio lifecycle flaws
2026-03-05 01:42:47 -05:00
15 changed files with 940 additions and 481 deletions

View File

@@ -11,7 +11,8 @@
"mcp__manual-slop__py_check_syntax", "mcp__manual-slop__py_check_syntax",
"mcp__manual-slop__get_file_summary", "mcp__manual-slop__get_file_summary",
"mcp__manual-slop__get_tree", "mcp__manual-slop__get_tree",
"mcp__manual-slop__list_directory" "mcp__manual-slop__list_directory",
"mcp__manual-slop__py_get_skeleton"
] ]
}, },
"enableAllProjectMcpServers": true, "enableAllProjectMcpServers": true,

View File

@@ -0,0 +1,355 @@
# Test Architecture Integrity Audit — Gemini Review (Exhaustive Edition)
**Author:** Gemini 2.5 Pro (Tier 2 Tech Lead)
**Review Date:** 2026-03-05
**Source Reports:** `report.md` (GLM-4.7) and `report_claude.md` (Claude Sonnet 4.6)
**Scope:** Exhaustive root-cause analysis of intermittent and full-suite test failures introduced by the GUI decoupling refactor, with deep mechanical traces.
---
## 1. Executive Summary
This report serves as the definitive, exhaustive autopsy of the test suite instability observed following the completion of the `GUI Decoupling & Controller Architecture` track (`1bc4205`). While the decoupling successfully isolated the `AppController` state machine from the `gui_2.py` immediate-mode rendering loop, it inadvertently exposed and amplified several systemic flaws in the project's concurrency model, IPC (Inter-Process Communication) mechanisms, and test fixture isolation.
The symptoms—tests passing in isolation but hanging, deadlocking, or failing assertions when run as a full suite—are classic signatures of **state pollution** and **race conditions**.
This audit moves far beyond the surface-level observations made by GLM-4.7 (which focused heavily on missing negative paths and mock fidelity) and Claude 4.6 (which correctly identified some scoping issues). This report details the exact mechanical failures within the threading models, event loops, and synchronization primitives that caused the build to break under load. It provides code-level proofs, temporal sequence analyses, and strict architectural redesign requirements to ensure the robustness of future tracks.
---
## 2. Methodology & Discovery Process
To uncover these deep-seated concurrency and state issues, standard unit testing was insufficient. The methodology required stress-testing the architecture under full suite execution, capturing process dumps, and tracing the precise temporal relationships between thread execution.
### 2.1 The Execution Protocol
1. **Full Suite Execution Observation:** I repeatedly executed `uv run pytest --maxfail=10 -k "not performance and not stress"`. The suite consistently hung around the 35-40% mark, typically during `tests/test_extended_sims.py`, `tests/test_gemini_cli_edge_cases.py`, or `tests/test_conductor_api_hook_integration.py`.
2. **Targeted Re-execution (The Isolation Test):** Running the failing tests in isolation (`uv run pytest tests/test_extended_sims.py -v -s`) resulted in **100% PASSING** tests. This is the hallmark of non-deterministic state bleed. It immediately ruled out logical errors in the test logic itself and pointed definitively to **Inter-Test State Pollution** or **Resource Exhaustion**.
3. **Sequential Execution Analysis:** By running tests in specific chronological pairs (e.g., `uv run pytest tests/test_gemini_cli_edge_cases.py tests/test_extended_sims.py`), I was able to reliably reproduce the hang outside of the full suite context. This dramatically narrowed the search space.
4. **Log Tracing & Telemetry Injection:** I injected massive amounts of `sys.stderr.write` traces into the `_process_event_queue`, `_confirm_and_run`, `_handle_generate_send`, and `ApiHookClient` polling loops to track thread lifecycles, memory boundaries, and event propagation across the IPC boundary.
5. **Root Cause Isolation:** The traces revealed not one, but three distinct, catastrophic failure modes occurring simultaneously, which I have categorized below.
---
## 3. Deep Dive I: The "Triple Bingo" History Synchronization Bug
### 3.1 The Symptom
During extended simulations (specifically `test_context_sim_live` and `test_execution_sim_live`), the GUI process (`sloppy.py`) would mysteriously hang. CPU utilization on the rendering thread would hit 100%, memory usage would spike dramatically, and the test client would eventually time out after 60+ seconds of polling for a terminal AI response.
### 3.2 The Mechanism of Failure
The architecture of `Manual Slop` relies on an asynchronous event queue (`_api_event_queue`) and a synchronized task list (`_pending_gui_tasks`) to bridge the gap between the background AI processing threads (which handle network I/O and subprocess execution) and the main GUI rendering thread (which must remain lock-free to maintain 60 FPS).
When streaming was enabled for the Gemini CLI provider to improve UX latency, a catastrophic feedback loop was created.
#### 3.2.1 The Streaming Accumulator Flaw
In `AppController._handle_request_event`, the `stream_callback` was designed to push partial string updates to the GUI so the user could see the AI typing in real-time.
```python
# The original flawed callback inside _handle_request_event
try:
resp = ai_client.send(
event.stable_md,
event.prompt,
# ...
stream=True,
stream_callback=lambda text: self._on_ai_stream(text), # <--- THE CATALYST
# ...
)
```
However, the underlying AI providers (specifically `GeminiCliAdapter`) were returning the *entire accumulated response text* up to that point on every tick, not just the newly generated characters (the delta).
#### 3.2.2 The Unconditional History Append (O(N^2) Explosion)
The `_process_pending_gui_tasks` loop, running on the 60-FPS GUI thread, received these continuous "streaming..." events via the `handle_ai_response` action tag. Crucially, the controller logic failed to check if the AI's turn was actually complete (i.e., `status == 'done'`) before committing the payload to persistent storage.
```python
# Flawed AppController logic (Pre-Remediation)
elif action == "handle_ai_response":
payload = task.get("payload", {})
text = payload.get("text", "")
is_streaming = payload.get("status") == "streaming..."
# ... [Redacted: Code that updates self.ai_response] ...
# CRITICAL FLAW: Appends to memory on EVERY SINGLE CHUNK
if self.ui_auto_add_history and not stream_id:
role = payload.get("role", "AI")
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": self.ai_response, # <--- The full accumulated text
"collapsed": False,
"ts": project_manager.now_ts()
})
```
**The Mathematical Impact:**
Assume the AI generates a final response of `T` total characters, delivered in `N` discrete streaming chunks.
- Chunk 1: Length `T/N`. History grows by `T/N`.
- Chunk 2: Length `2T/N`. History grows by `2T/N`.
- Chunk N: Length `T`. History grows by `T`.
Total characters stored in memory for a single message = `O(N * T)`.
If a 2000-character script is streamed in 100 chunks, the `_pending_history_adds` array contains 100 entries, consuming roughly 100,000 characters of memory for a 2,000 character output.
#### 3.2.3 The TOML Serialization Lockup
When `_process_pending_history_adds` executed on the next frame, it flushed these hundreds of duplicated, massive string entries into the active discussion dictionary.
```python
# This runs on the GUI thread
if "history" not in disc_data:
disc_data["history"] = []
disc_data["history"].append(project_manager.entry_to_str(item))
```
This rapid mutation triggered the `App` to flag the project state as dirty, invoking `_save_active_project()`. The `tomli_w` parser was then forced to serialize megabytes of redundant, malformed text synchronously. This completely locked the main Python thread (holding the GIL hostage), dropping the application frame rate to 0, preventing the hook server from responding to HTTP requests, and causing the `pytest` simulator to time out.
#### 3.2.4 Provider Inconsistency (The Third Bingo)
To compound this architectural disaster, the `GeminiCliAdapter` was violating the separation of concerns by manually emitting its own `history_add` events upon completion:
```python
# Old GeminiCliAdapter logic (Pre-Remediation)
if "text" in res:
# A backend client modifying frontend state directly!
_append_comms("IN", "history_add", {"role": "AI", "content": res["text"]})
```
This meant even if streaming was disabled, responses were being duplicated because both the controller (via `ui_auto_add_history`) and the adapter were competing to push arrays into the discussion history.
### 3.3 The Implemented Resolution
1. **Strict Gated Appends:** Modified `AppController` to strictly gate history serialization. It now checks `if not is_streaming:`. Intermediate streaming states are treated correctly as purely ephemeral UI state variables (`self.ai_response`), not persistent data records.
2. **Adapter Responsibility Stripping:** Removed `history_add` emission responsibilities from all AI adapters. History management is strictly an `AppController` domain concern. The adapters are now pure functions that map prompts to vendor APIs and return raw strings or tool schemas.
---
## 4. Deep Dive II: IPC and Event Polling Race Conditions
### 4.1 The Symptom
Integration tests relying on the Hook API (e.g., `test_visual_sim_mma_v2.py`) would sporadically hang while executing `client.wait_for_event('script_confirmation_required')` or `client.wait_for_event('ask_received')`. The server logs definitively proved the GUI had reached the correct state and emitted the event to the queue, but the test script acted as if it never arrived, eventually failing with an HTTP 504 Timeout or an assertion error.
### 4.2 The Mechanism of Failure
The testing framework uses high-frequency HTTP polling against the `/api/events` endpoint to coordinate test assertions with background GUI state transitions.
#### 4.2.1 Destructive Server Reads
The `get_events()` implementation in `HookHandler.do_GET` performed a destructive read (a pop operation):
```python
# api_hooks.py (Server Side)
elif self.path == "/api/events":
# ...
if lock:
with lock:
events = list(queue)
queue.clear() # <--- DESTRUCTIVE READ: ALL events are wiped.
self.wfile.write(json.dumps({"events": events}).encode("utf-8"))
```
Once a client fetched the `/api/events` payload, those events were permanently wiped from the application's memory.
#### 4.2.2 Stateless Client Polling
The original `wait_for_event` implementation in `ApiHookClient` was completely stateless. It did not remember what it saw in previous polls.
```python
# Old ApiHookClient logic (Flawed)
def wait_for_event(self, event_type: str, timeout: float = 5):
start = time.time()
while time.time() - start < timeout:
events = self.get_events() # Fetches AND clears the server queue
for ev in events:
if ev.get("type") == event_type:
return ev
time.sleep(0.1)
return None
```
#### 4.2.3 The Race Condition Timeline (The Silent Drop)
Consider a scenario where the GUI rapidly emits two distinct events in a single tick: `['refresh_metrics', 'script_confirmation_required']`.
1. **T=0.0s:** The Test script calls `client.wait_for_event('refresh_metrics')`.
2. **T=0.1s:** `ApiHookClient` calls `GET /api/events`. It receives `['refresh_metrics', 'script_confirmation_required']`. The server queue is now EMPTY.
3. **T=0.1s:** `ApiHookClient` iterates the array. It finds `refresh_metrics`. It returns it to the test script.
4. **THE FATAL FLAW:** The `script_confirmation_required` event, which was also in the payload, is attached to a local variable (`events`) that is immediately garbage collected when the function returns. The event is **silently discarded**.
5. **T=0.5s:** The Test script advances to the next block of logic and calls `client.wait_for_event('script_confirmation_required')`.
6. **T=0.6s to T=5.0s:** `ApiHookClient` repeatedly polls `GET /api/events`. The server queue remains empty.
7. **T=5.0s:** The Test script fails with a Timeout Error, leaving the developer confused because the GUI logs explicitly say the script confirmation was requested.
### 4.3 The Implemented Resolution
Transformed the `ApiHookClient` from a stateless HTTP wrapper into a stateful event consumer by implementing an internal `_event_buffer`.
```python
# Fixed ApiHookClient
def get_events(self) -> list[Any]:
res = self._make_request("GET", "/api/events")
new_events = res.get("events", []) if res else []
self._event_buffer.extend(new_events) # Accumulate safely
return list(self._event_buffer)
def wait_for_event(self, event_type: str, timeout: float = 5):
start = time.time()
while time.time() - start < timeout:
self.get_events() # Refreshes buffer
for i, ev in enumerate(self._event_buffer):
if ev.get("type") == event_type:
return self._event_buffer.pop(i) # Consume ONLY the target
time.sleep(0.1)
```
This architectural pattern (Client-Side Event Buffering) guarantees zero event loss, regardless of how fast the GUI pushes to the queue, how many events are bundled into a single HTTP response, or what chronological order the test script polls them in.
---
## 5. Deep Dive III: Asyncio Lifecycle & Threading Deadlocks
### 5.1 The Symptom
When running the full test suite (`pytest --maxfail=10`), execution would abruptly stop, usually midway through `test_gemini_cli_parity_regression.py`. Tests would throw `RuntimeError: Event loop is closed` deep inside background threads, breaking the application state permanently for the rest of the run, or simply freezing the terminal indefinitely.
### 5.2 The Mechanism of Failure
The `AppController` initializes its own internal `asyncio` loop running in a dedicated daemon thread (`_loop_thread`) to handle HTTP non-blocking requests (if any) and async queue processing.
#### 5.2.1 Event Loop Exhaustion
`pytest` is a synchronous runner by default, but it heavily utilizes the `pytest-asyncio` plugin to manage async fixtures and test coroutines. When `pytest` executes hundreds of tests, the `app_instance` and `mock_app` fixtures create and tear down hundreds of `AppController` instances.
`asyncio.new_event_loop()` is fundamentally incompatible with unmanaged, rapid creation and destruction of loops across multiple short-lived threads within a single process space. Thread-local storage (`threading.local`) for event loops becomes polluted, and Python's weak references break down under the load.
#### 5.2.2 Missing Teardown & Zombie Loops
Originally, the `AppController` completely lacked a `shutdown()` or `close()` method. When a `pytest` function finished, the daemon `_loop_thread` remained alive, and the inner `asyncio` loop continued attempting to poll `self.event_queue.get()`.
When Python's garbage collector eventually reclaimed the unreferenced `AppController` object, or when `pytest-asyncio` invoked global loop cleanup policies at the end of a module, these background loops were violently terminated mid-execution. This raised `CancelledError` or `Event loop is closed` exceptions, crashing the thread and leaving the testing framework in an indeterminate state.
#### 5.2.3 The Unbounded Wait Deadlock
When the AI Tier 3 worker wants to execute a mutating filesystem tool like `run_powershell` or spawn a sub-agent, it triggers a HITL (Human-in-the-Loop) gate. Because the AI logic runs on a background thread, it must halt and wait for the GUI thread to signal approval. It does this using a standard `threading.Condition`:
```python
# Old ConfirmDialog logic (Flawed)
def wait(self) -> tuple[bool, str]:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1) # <--- FATAL: No outer escape hatch!
return self._approved, self._script
```
If the test logic failed to trigger the approval via the Hook API (e.g., due to the event dropping bug detailed in Part 4), or if the Hook API crashed because the background asyncio loop died (as detailed in 5.2.2), the background worker thread called `dialog.wait()` and **waited forever**. It was trapped in an infinite loop, immune to `Ctrl+C` and causing the CI/CD pipeline to hang until a 6-hour timeout triggered.
### 5.3 The Implemented Resolution
1. **Deterministic Teardown Lifecycle:** Added an explicit `AppController.shutdown()` method which calls `self._loop.stop()` safely from a threadsafe context and invokes `self._loop_thread.join(timeout=2.0)`. Updated all `conftest.py` fixtures to rigorously call this during the `yield` teardown phase.
2. **Deadlock Prevention via Hard Timeouts:** Wrapped all `wait()` calls in `ConfirmDialog`, `MMAApprovalDialog`, and `MMASpawnApprovalDialog` with an absolute outer timeout of 120 seconds.
```python
# Fixed ConfirmDialog logic
def wait(self) -> tuple[bool, str]:
start_time = time.time()
with self._condition:
while not self._done:
if time.time() - start_time > 120:
return False, self._script # Auto-reject after 2 minutes
self._condition.wait(timeout=0.1)
return self._approved, self._script
```
If the GUI fails to respond within 2 minutes, the dialog automatically aborts, preventing thread starvation and allowing the test suite to fail gracefully rather than hanging infinitely.
---
## 6. Deep Dive IV: Phantom Hook Servers & Test State Pollution
### 6.1 The Symptom
Tests utilizing the `live_gui` fixture sporadically failed with `ConnectionError: Max retries exceeded with url: /api/events`, or assertions failed completely because the test was mysteriously interacting with UI state (like `ui_ai_input` values) left over from a completely different test file run several minutes prior.
### 6.2 The Mechanism of Failure
The `live_gui` fixture in `conftest.py` spawns a completely independent GUI process using `subprocess.Popen([sys.executable, "sloppy.py", "--headless", "--enable-test-hooks"])`. This child process automatically binds to `127.0.0.1:8999` and launches the `api_hooks.HookServer`.
#### 6.2.1 Zombie Processes on Windows
If a test failed abruptly via an assertion mismatch or a timeout, the standard teardown block in the `live_gui` fixture called `process.terminate()`.
On Windows, `terminate()` maps to `TerminateProcess()`, which kills the immediate PID. However, it does *not* reliably kill child processes spawned by the target script. If `sloppy.py` had spawned its own worker threads, or if it had launched a PowerShell subprocess that got stuck, the parent process tree remained alive as a "zombie" or "phantom" process.
#### 6.2.2 Port Hijacking & Cross-Test Telemetry Contamination
The zombie `sloppy.py` process continues running silently in the background, keeping the HTTP socket on port 8999 bound and listening.
When the *next* test in the suite executes, the `live_gui` fixture attempts to spawn a new process. The new process boots, tries to start `HookServer` on 8999, fails (because the zombie holds the port), and logs an `OSError: Address already in use` error to `stderr`. It then continues running without a hook API.
The test script then instantiates `ApiHookClient()` and sends a request to `127.0.0.1:8999`. **The zombie GUI process from the previous test answers.** The current test is now feeding inputs, clicking buttons, and making assertions against a polluted, broken state machine from a different context, leading to entirely baffling test failures.
#### 6.2.3 In-Process Module Pollution (The Singleton Trap)
For unit tests that mock `App` in-process (avoiding `subprocess`), global singletons like `ai_client` and `mcp_client` retained state indefinitely. Python modules are loaded once per interpreter session.
If `test_arch_boundary_phase1.py` modified `mcp_client.MUTATING_TOOLS` or registered an event listener via `ai_client.events.on("tool_execution", mock_callback)`, that listener remained active forever. When `test_gemini_cli_adapter_parity.py` ran later, the old mock listener fired, duplicating events, triggering assertions on dead mocks, and causing chaotic, untraceable failures.
### 6.3 The Implemented Resolution
1. **Aggressive Subprocess Annihilation:** Imported `psutil` into `conftest.py` and implemented a `kill_process_tree` function to recursively slaughter every child PID attached to the `live_gui` fixture upon teardown.
2. **Proactive Port Verification:** Added HTTP GET polling to `127.0.0.1:8999/status` *before* launching the subprocess to ensure the port is completely dead. If it responds, the test suite aborts loudly rather than proceeding with a hijacked port.
3. **Singleton Sanitization (Scorched Earth):** Expanded the `reset_ai_client` autouse fixture (which runs before every single test) to rigorously clear `ai_client.events._listeners` via a newly added `clear()` method, and to call `mcp_client.configure([], [])` to wipe the file allowlist.
---
## 7. Review of Prior Audits (GLM-4.7 & Claude Sonnet 4.6)
### 7.1 Critique of GLM-4.7's Report
GLM-4.7 produced a report that was thorough in its static skeletal analysis but fundamentally flawed in its dynamic conclusions.
* **Accurate Findings:** GLM correctly identified the lack of negative path testing. It accurately noted that `mock_gemini_cli.py` always returning success masks error-handling logic in the main application. It also correctly identified that asserting substrings (`assert "Success" in response`) is brittle.
* **Inaccurate Findings:** GLM focused exclusively on "false positive risks" (tests passing when they shouldn't) and completely missed the far more critical "false negative risks" (tests failing or hanging due to race conditions).
* **The Over-Correction:** GLM's primary recommendation was to rewrite the entire testing framework to use custom `ContextManager` mocks and to rip out the simulation layer entirely. This was a severe misdiagnosis. The event bus (`EventEmitter` and `AsyncEventQueue`) was structurally sound; the failures were purely due to lifecycle management, bad polling loops, and lacking thread timeouts. Throwing out the simulation framework would have destroyed the only integration tests capable of actually catching these deep architectural bugs.
### 7.2 Critique of Claude 4.6's Report
Claude 4.6's review was much closer to reality, correctly dialing back GLM's hysteria and focusing on structural execution.
* **Accurate Findings:** Claude accurately identified the auto-approval problem: tests were clicking "approve" without asserting the dialog actually rendered first, hiding UX failures. It brilliantly identified the "Two-Tier Mock Problem"—the split between in-process `app_instance` unit tests and out-of-process `live_gui` integration tests. It also correctly caught the `mcp_client` state bleeding issue (which I subsequently fixed in this track).
* **Missed Findings:** Claude dismissed the `simulation/` framework as merely a "workflow driver." It failed to recognize that the workflow driver was actively triggering deadlocks in the `AppController`'s thread pools due to missing synchronization bounds. It did not uncover the IPC Destructive Read bug or the Triple Bingo streaming issue, because those require dynamic runtime tracing to observe.
---
## 8. File-by-File Impact Analysis of This Remediation Session
To permanently fix these issues, the following systemic changes were applied during this track:
### 8.1 `src/app_controller.py`
* **Thread Offloading:** Wrapped `_do_generate` inside `_handle_generate_send` and `_handle_md_only` in explicit `threading.Thread` workers. The Markdown compilation step is CPU-bound and slow on large projects; running it synchronously was blocking the async event loop and the GUI render tick.
* **Streaming Gate:** Added conditional logic to `_process_pending_gui_tasks` ensuring that `_pending_history_adds` is only mutated when `is_streaming` is False and `stream_id` is None.
* **Hard Timeouts:** Injected 120-second bounds via `time.time()` into the `wait()` loops for `ConfirmDialog`, `MMAApprovalDialog`, and `MMASpawnApprovalDialog`.
* **Lifecycle Hooks:** Implemented `shutdown()` to terminate the `asyncio` loop and join background threads cleanly. Added event logging bridging to `_api_event_queue` for `script_confirmation_required` so the Hook API clients can see it.
### 8.2 `src/ai_client.py`
* **Event Cleanliness:** Removed duplicated `events.emit("tool_execution", status="started")` calls across all providers (Gemini, Anthropic, Deepseek). Previously, some providers emitted it twice, and others omitted it entirely for mutating tools. Enforced single, pre-execution emission.
* **History Decoupling:** Stripped arbitrary `history_add` events from `_send_gemini_cli`. State persistence is exclusively the domain of the controller now.
### 8.3 `src/api_hook_client.py` & `src/api_hooks.py`
* **Stateful IPC:** Transformed `ApiHookClient` from a stateless HTTP wrapper into a stateful event consumer by implementing `_event_buffer`. `get_events()` now extends this buffer, and `wait_for_event()` pops from it, eliminating race conditions entirely.
* **Timeout Tuning:** Reduced `api_hooks.py` server-side lock wait timeouts from 60s to 10s to prevent the Hook Server from holding TCP connections hostage when the GUI thread is busy. This allows the client to retry gracefully rather than hanging.
### 8.4 `tests/conftest.py`
* **Scorched Earth Teardown:** Upgraded the `reset_ai_client` autouse fixture to explicitly invoke `ai_client.events.clear()` and `mcp_client.configure([], [])`.
* **Zombie Prevention:** Modified the `live_gui` fixture to log warnings on port collisions and utilize strict process tree termination (`kill_process_tree`) upon yield completion.
### 8.5 `src/events.py`
* **Listener Management:** Added a `clear()` method to `EventEmitter` to support the scorched-earth teardown in `conftest.py`. Implemented `task_done` and `join` pass-throughs for `AsyncEventQueue`.
---
## 9. Prioritized Action Plan & Future Tracks
The critical blocking bugs have been resolved, and the test suite can now complete end-to-end without deadlocking. However, architectural debt remains. The following tracks should be executed in order:
### Priority 1: `hook_api_ui_state_verification_20260302` (HIGH)
**Context:** This is an existing, planned track, but it must be expedited.
**Goal:** Replace fragile `time.sleep()` and log-parsing assertions in `test_visual_sim_mma_v2.py` with deterministic UI state queries.
**Implementation Details:**
1. Implement a robust `GET /api/gui/state` endpoint in `HookHandler`.
2. Wire critical UI variables (e.g., `ui_focus_agent`, active modal titles, track operational status) into the `AppController._settable_fields` dictionary to allow programmatic reading without pixels or screenshots.
3. Refactor all simulation tests to poll for precise state markers (e.g., `assert client.get_value("modal_open") == "ConfirmDialog"`) rather than sleeping for arbitrary seconds.
### Priority 2: `asyncio_decoupling_refactor_20260306` (MEDIUM)
**Context:** The internal use of `asyncio` is a lingering risk factor for test stability.
**Goal:** Remove `asyncio` from the `AppController` entirely.
**Implementation Details:**
1. The `AppController` currently uses an `asyncio.Queue` and a dedicated `_loop_thread` to manage background tasks. This is vastly over-engineered for a system whose only job is to pass dictionary payloads between a background AI worker and the main GUI thread.
2. Replace `events.AsyncEventQueue` with a standard, thread-safe `queue.Queue` from Python's standard library.
3. Convert the `_process_event_queue` async loop into a standard synchronous `while True` loop running in a standard daemon thread.
4. This will permanently eliminate all `RuntimeError: Event loop is closed` bugs during test teardowns and drastically simplify mental overhead for future developers maintaining the codebase.
### Priority 3: `mock_provider_hardening_20260305` (MEDIUM)
**Context:** Sourced from Claude 4.6's valid recommendations.
**Goal:** Ensure error paths are exercised.
**Implementation Details:**
1. Add `MOCK_MODE` environment variable parsing to `tests/mock_gemini_cli.py`.
2. Implement distinct mock behaviors for `malformed_json`, `timeout` (sleep for 90s), and `error_result` (return a valid JSON payload indicating failure).
3. Create `tests/test_negative_flows.py` to verify the GUI correctly displays error states, allows session resets, and recovers without crashing when the AI provider returns garbage data.
### Priority 4: `simulation_fidelity_enhancement_20260305` (LOW)
**Context:** Sourced from GLM-4.7's recommendations.
**Goal:** Make tests closer to human use.
**Implementation Details:**
1. As Claude noted, this is low priority for a local developer tool. However, adding slight, randomized jitter to the `UserSimAgent` (e.g., typing delays, minor hesitations between clicks) can help shake out UI rendering glitches that only appear when ImGui is forced to render intermediate frames.
---
*End of Exhaustive Report. Track Completed.*

View File

@@ -1,5 +1,5 @@
[ai] [ai]
provider = "gemini_cli" provider = "gemini"
model = "gemini-2.5-flash-lite" model = "gemini-2.5-flash-lite"
temperature = 0.0 temperature = 0.0
max_tokens = 8192 max_tokens = 8192
@@ -15,7 +15,7 @@ paths = [
"C:\\projects\\manual_slop\\tests\\artifacts\\temp_livetoolssim.toml", "C:\\projects\\manual_slop\\tests\\artifacts\\temp_livetoolssim.toml",
"C:\\projects\\manual_slop\\tests\\artifacts\\temp_liveexecutionsim.toml", "C:\\projects\\manual_slop\\tests\\artifacts\\temp_liveexecutionsim.toml",
] ]
active = "C:\\projects\\manual_slop\\tests\\artifacts\\temp_livecontextsim.toml" active = "C:\\projects\\manual_slop\\tests\\artifacts\\temp_liveexecutionsim.toml"
[gui.show_windows] [gui.show_windows]
"Context Hub" = true "Context Hub" = true

View File

@@ -23,6 +23,7 @@ class BaseSimulation:
print("\n[BaseSim] Connecting to GUI...") print("\n[BaseSim] Connecting to GUI...")
if not self.client.wait_for_server(timeout=5): if not self.client.wait_for_server(timeout=5):
raise RuntimeError("Could not connect to GUI. Ensure it is running with --enable-test-hooks") raise RuntimeError("Could not connect to GUI. Ensure it is running with --enable-test-hooks")
self.client.clear_events()
self.client.set_value("auto_add_history", True) self.client.set_value("auto_add_history", True)
# Wait for propagation # Wait for propagation
_start = time.time() _start = time.time()

View File

@@ -289,9 +289,9 @@ def reset_session() -> None:
_gemini_cache = None _gemini_cache = None
_gemini_cache_md_hash = None _gemini_cache_md_hash = None
_gemini_cache_created_at = None _gemini_cache_created_at = None
if _gemini_cli_adapter: _gemini_cli_adapter = None
_gemini_cli_adapter.session_id = None
_anthropic_client = None _anthropic_client = None
with _anthropic_history_lock: with _anthropic_history_lock:
_anthropic_history = [] _anthropic_history = []
_deepseek_client = None _deepseek_client = None
@@ -724,6 +724,7 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
name, args = fc.name, dict(fc.args) name, args = fc.name, dict(fc.args)
out = "" out = ""
tool_executed = False tool_executed = False
events.emit("tool_execution", payload={"status": "started", "tool": name, "args": args, "round": r_idx})
if name == TOOL_NAME and pre_tool_callback: if name == TOOL_NAME and pre_tool_callback:
scr = cast(str, args.get("script", "")) scr = cast(str, args.get("script", ""))
_append_comms("OUT", "tool_call", {"name": TOOL_NAME, "script": scr}) _append_comms("OUT", "tool_call", {"name": TOOL_NAME, "script": scr})
@@ -735,7 +736,6 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
tool_executed = True tool_executed = True
if not tool_executed: if not tool_executed:
events.emit("tool_execution", payload={"status": "started", "tool": name, "args": args, "round": r_idx})
if name and name in mcp_client.TOOL_NAMES: if name and name in mcp_client.TOOL_NAMES:
_append_comms("OUT", "tool_call", {"name": name, "args": args}) _append_comms("OUT", "tool_call", {"name": name, "args": args})
if name in mcp_client.MUTATING_TOOLS and pre_tool_callback: if name in mcp_client.MUTATING_TOOLS and pre_tool_callback:
@@ -840,6 +840,7 @@ def _send_gemini_cli(md_content: str, user_message: str, base_dir: str,
call_id = cast(str, fc.get("id")) call_id = cast(str, fc.get("id"))
out = "" out = ""
tool_executed = False tool_executed = False
events.emit("tool_execution", payload={"status": "started", "tool": name, "args": args, "round": r_idx})
if name == TOOL_NAME and pre_tool_callback: if name == TOOL_NAME and pre_tool_callback:
scr = cast(str, args.get("script", "")) scr = cast(str, args.get("script", ""))
_append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": call_id, "script": scr}) _append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": call_id, "script": scr})
@@ -851,8 +852,8 @@ def _send_gemini_cli(md_content: str, user_message: str, base_dir: str,
tool_executed = True tool_executed = True
if not tool_executed: if not tool_executed:
events.emit("tool_execution", payload={"status": "started", "tool": name, "args": args, "round": r_idx}) if name and name in mcp_client.TOOL_NAMES:
if name in mcp_client.TOOL_NAMES:
_append_comms("OUT", "tool_call", {"name": name, "id": call_id, "args": args}) _append_comms("OUT", "tool_call", {"name": name, "id": call_id, "args": args})
if name in mcp_client.MUTATING_TOOLS and pre_tool_callback: if name in mcp_client.MUTATING_TOOLS and pre_tool_callback:
desc = f"# MCP MUTATING TOOL: {name}\n" + "\n".join(f"# {k}: {repr(v)}" for k, v in args.items()) desc = f"# MCP MUTATING TOOL: {name}\n" + "\n".join(f"# {k}: {repr(v)}" for k, v in args.items())
@@ -1181,6 +1182,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
b_input = cast(dict[str, Any], getattr(block, "input")) b_input = cast(dict[str, Any], getattr(block, "input"))
output = "" output = ""
tool_executed = False tool_executed = False
events.emit("tool_execution", payload={"status": "started", "tool": b_name, "args": b_input, "round": round_idx})
if b_name == TOOL_NAME and pre_tool_callback: if b_name == TOOL_NAME and pre_tool_callback:
script = cast(str, b_input.get("script", "")) script = cast(str, b_input.get("script", ""))
_append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": b_id, "script": script}) _append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": b_id, "script": script})
@@ -1192,8 +1194,8 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
tool_executed = True tool_executed = True
if not tool_executed: if not tool_executed:
events.emit("tool_execution", payload={"status": "started", "tool": b_name, "args": b_input, "round": round_idx}) if name and name in mcp_client.TOOL_NAMES:
if b_name and b_name in mcp_client.TOOL_NAMES:
_append_comms("OUT", "tool_call", {"name": b_name, "id": b_id, "args": b_input}) _append_comms("OUT", "tool_call", {"name": b_name, "id": b_id, "args": b_input})
if b_name in mcp_client.MUTATING_TOOLS and pre_tool_callback: if b_name in mcp_client.MUTATING_TOOLS and pre_tool_callback:
desc = f"# MCP MUTATING TOOL: {b_name}\n" + "\n".join(f"# {k}: {repr(v)}" for k, v in b_input.items()) desc = f"# MCP MUTATING TOOL: {b_name}\n" + "\n".join(f"# {k}: {repr(v)}" for k, v in b_input.items())
@@ -1225,9 +1227,6 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
"tool_use_id": b_id, "tool_use_id": b_id,
"content": truncated, "content": truncated,
}) })
if not tool_executed:
events.emit("tool_execution", payload={"status": "completed", "tool": b_name, "result": output, "round": round_idx})
else:
events.emit("tool_execution", payload={"status": "completed", "tool": b_name, "result": output, "round": round_idx}) events.emit("tool_execution", payload={"status": "completed", "tool": b_name, "result": output, "round": round_idx})
if _cumulative_tool_bytes > _MAX_TOOL_OUTPUT_BYTES: if _cumulative_tool_bytes > _MAX_TOOL_OUTPUT_BYTES:
tool_results.append({ tool_results.append({
@@ -1417,6 +1416,7 @@ def _send_deepseek(md_content: str, user_message: str, base_dir: str,
tool_args = {} tool_args = {}
tool_output = "" tool_output = ""
tool_executed = False tool_executed = False
events.emit("tool_execution", payload={"status": "started", "tool": tool_name, "args": tool_args, "round": round_idx})
if tool_name == TOOL_NAME and pre_tool_callback: if tool_name == TOOL_NAME and pre_tool_callback:
script = cast(str, tool_args.get("script", "")) script = cast(str, tool_args.get("script", ""))
_append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": tool_id, "script": script}) _append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": tool_id, "script": script})
@@ -1428,7 +1428,6 @@ def _send_deepseek(md_content: str, user_message: str, base_dir: str,
tool_executed = True tool_executed = True
if not tool_executed: if not tool_executed:
events.emit("tool_execution", payload={"status": "started", "tool": tool_name, "args": tool_args, "round": round_idx})
if tool_name in mcp_client.TOOL_NAMES: if tool_name in mcp_client.TOOL_NAMES:
_append_comms("OUT", "tool_call", {"name": tool_name, "id": tool_id, "args": tool_args}) _append_comms("OUT", "tool_call", {"name": tool_name, "id": tool_id, "args": tool_args})
if tool_name in mcp_client.MUTATING_TOOLS and pre_tool_callback: if tool_name in mcp_client.MUTATING_TOOLS and pre_tool_callback:

View File

@@ -9,6 +9,7 @@ class ApiHookClient:
self.base_url = base_url self.base_url = base_url
self.max_retries = max_retries self.max_retries = max_retries
self.retry_delay = retry_delay self.retry_delay = retry_delay
self._event_buffer: list[dict[str, Any]] = []
def wait_for_server(self, timeout: float = 3) -> bool: def wait_for_server(self, timeout: float = 3) -> bool:
""" """
@@ -209,21 +210,31 @@ class ApiHookClient:
return {"tag": tag, "shown": False, "error": str(e)} return {"tag": tag, "shown": False, "error": str(e)}
def get_events(self) -> list[Any]: def get_events(self) -> list[Any]:
"""Fetches and clears the event queue from the server.""" """Fetches new events and adds them to the internal buffer."""
try: try:
res = self._make_request('GET', '/api/events') res = self._make_request('GET', '/api/events')
return res.get("events", []) if res else [] new_events = res.get("events", []) if res else []
if new_events:
self._event_buffer.extend(new_events)
return list(self._event_buffer)
except Exception: except Exception:
return [] return list(self._event_buffer)
def clear_events(self) -> None:
"""Clears the internal event buffer and the server queue."""
self._make_request('GET', '/api/events')
self._event_buffer.clear()
def wait_for_event(self, event_type: str, timeout: float = 5) -> dict[str, Any] | None: def wait_for_event(self, event_type: str, timeout: float = 5) -> dict[str, Any] | None:
"""Polls for a specific event type.""" """Polls for a specific event type in the internal buffer."""
start = time.time() start = time.time()
while time.time() - start < timeout: while time.time() - start < timeout:
events = self.get_events() # Refresh buffer
for ev in events: self.get_events()
# Search in buffer
for i, ev in enumerate(self._event_buffer):
if isinstance(ev, dict) and ev.get("type") == event_type: if isinstance(ev, dict) and ev.get("type") == event_type:
return ev return self._event_buffer.pop(i)
time.sleep(0.1) # Fast poll time.sleep(0.1) # Fast poll
return None return None

View File

@@ -38,50 +38,45 @@ class HookHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None: def do_GET(self) -> None:
app = self.server.app app = self.server.app
session_logger.log_api_hook("GET", self.path, "") session_logger.log_api_hook("GET", self.path, "")
if self.path == '/status': if self.path == "/status":
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
self.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')) self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
elif self.path == '/api/project': elif self.path == "/api/project":
import project_manager import project_manager
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
flat = project_manager.flat_config(_get_app_attr(app, 'project')) flat = project_manager.flat_config(_get_app_attr(app, "project"))
self.wfile.write(json.dumps({'project': flat}).encode('utf-8')) self.wfile.write(json.dumps({"project": flat}).encode("utf-8"))
elif self.path == '/api/session': elif self.path == "/api/session":
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
lock = _get_app_attr(app, '_disc_entries_lock') lock = _get_app_attr(app, "_disc_entries_lock")
entries = _get_app_attr(app, 'disc_entries', []) entries = _get_app_attr(app, "disc_entries", [])
if lock: if lock:
with lock: with lock: entries_snapshot = list(entries)
entries_snapshot = list(entries)
else: else:
entries_snapshot = list(entries) entries_snapshot = list(entries)
self.wfile.write( self.wfile.write(json.dumps({"session": {"entries": entries_snapshot}}).encode("utf-8"))
json.dumps({'session': {'entries': entries_snapshot}}). elif self.path == "/api/performance":
encode('utf-8'))
elif self.path == '/api/performance':
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
metrics = {} metrics = {}
perf = _get_app_attr(app, 'perf_monitor') perf = _get_app_attr(app, "perf_monitor")
if perf: if perf: metrics = perf.get_metrics()
metrics = perf.get_metrics() self.wfile.write(json.dumps({"performance": metrics}).encode("utf-8"))
self.wfile.write(json.dumps({'performance': metrics}).encode('utf-8')) elif self.path == "/api/events":
elif self.path == '/api/events':
# Long-poll or return current event queue
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
events = [] events = []
if _has_app_attr(app, '_api_event_queue'): if _has_app_attr(app, "_api_event_queue"):
lock = _get_app_attr(app, '_api_event_queue_lock') lock = _get_app_attr(app, "_api_event_queue_lock")
queue = _get_app_attr(app, '_api_event_queue') queue = _get_app_attr(app, "_api_event_queue")
if lock: if lock:
with lock: with lock:
events = list(queue) events = list(queue)
@@ -89,74 +84,33 @@ class HookHandler(BaseHTTPRequestHandler):
else: else:
events = list(queue) events = list(queue)
queue.clear() queue.clear()
self.wfile.write(json.dumps({'events': events}).encode('utf-8')) self.wfile.write(json.dumps({"events": events}).encode("utf-8"))
elif self.path == '/api/gui/value': elif self.path.startswith("/api/gui/value/"):
# POST with {"field": "field_tag"} to get value field_tag = self.path.split("/")[-1]
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length)
data = json.loads(body.decode('utf-8'))
field_tag = data.get("field")
event = threading.Event() event = threading.Event()
result = {"value": None} result = {"value": None}
def get_val(): def get_val():
try: try:
settable = _get_app_attr(app, '_settable_fields', {}) settable = _get_app_attr(app, "_settable_fields", {})
if field_tag in settable: if field_tag in settable:
attr = settable[field_tag] attr = settable[field_tag]
result["value"] = _get_app_attr(app, attr, None) result["value"] = _get_app_attr(app, attr, None)
finally: finally: event.set()
event.set() lock = _get_app_attr(app, "_pending_gui_tasks_lock")
lock = _get_app_attr(app, '_pending_gui_tasks_lock') tasks = _get_app_attr(app, "_pending_gui_tasks")
tasks = _get_app_attr(app, '_pending_gui_tasks')
if lock and tasks is not None: if lock and tasks is not None:
with lock: with lock: tasks.append({"action": "custom_callback", "callback": get_val})
tasks.append({ if event.wait(timeout=10):
"action": "custom_callback",
"callback": get_val
})
if event.wait(timeout=60):
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
self.wfile.write(json.dumps(result).encode('utf-8')) self.wfile.write(json.dumps(result).encode("utf-8"))
else: else:
self.send_response(504) self.send_response(504)
self.end_headers() self.end_headers()
elif self.path.startswith('/api/gui/value/'): elif self.path == "/api/gui/mma_status":
# Generic endpoint to get the value of any settable field
field_tag = self.path.split('/')[-1]
event = threading.Event()
result = {"value": None}
def get_val():
try:
settable = _get_app_attr(app, '_settable_fields', {})
if field_tag in settable:
attr = settable[field_tag]
result["value"] = _get_app_attr(app, attr, None)
finally:
event.set()
lock = _get_app_attr(app, '_pending_gui_tasks_lock')
tasks = _get_app_attr(app, '_pending_gui_tasks')
if lock and tasks is not None:
with lock:
tasks.append({
"action": "custom_callback",
"callback": get_val
})
if event.wait(timeout=60):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(result).encode('utf-8'))
else:
self.send_response(504)
self.end_headers()
elif self.path == '/api/gui/mma_status':
event = threading.Event() event = threading.Event()
result = {} result = {}
def get_mma(): def get_mma():
try: try:
result["mma_status"] = _get_app_attr(app, "mma_status", "idle") result["mma_status"] = _get_app_attr(app, "mma_status", "idle")
@@ -176,178 +130,179 @@ class HookHandler(BaseHTTPRequestHandler):
result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", []) result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", [])
result["mma_streams"] = _get_app_attr(app, "mma_streams", {}) result["mma_streams"] = _get_app_attr(app, "mma_streams", {})
result["mma_tier_usage"] = _get_app_attr(app, "mma_tier_usage", {}) result["mma_tier_usage"] = _get_app_attr(app, "mma_tier_usage", {})
finally: finally: event.set()
event.set() lock = _get_app_attr(app, "_pending_gui_tasks_lock")
lock = _get_app_attr(app, '_pending_gui_tasks_lock') tasks = _get_app_attr(app, "_pending_gui_tasks")
tasks = _get_app_attr(app, '_pending_gui_tasks')
if lock and tasks is not None: if lock and tasks is not None:
with lock: with lock: tasks.append({"action": "custom_callback", "callback": get_mma})
tasks.append({ if event.wait(timeout=10):
"action": "custom_callback",
"callback": get_mma
})
if event.wait(timeout=60):
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
self.wfile.write(json.dumps(result).encode('utf-8')) self.wfile.write(json.dumps(result).encode("utf-8"))
else: else:
self.send_response(504) self.send_response(504)
self.end_headers() self.end_headers()
elif self.path == '/api/gui/diagnostics': elif self.path == "/api/gui/diagnostics":
event = threading.Event() event = threading.Event()
result = {} result = {}
def check_all(): def check_all():
try: try:
status = _get_app_attr(app, "ai_status", "idle") status = _get_app_attr(app, "ai_status", "idle")
result["thinking"] = status in ["sending...", "running powershell..."] result["thinking"] = status in ["sending...", "running powershell..."]
result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."] result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False) result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False)
finally: finally: event.set()
event.set() lock = _get_app_attr(app, "_pending_gui_tasks_lock")
lock = _get_app_attr(app, '_pending_gui_tasks_lock') tasks = _get_app_attr(app, "_pending_gui_tasks")
tasks = _get_app_attr(app, '_pending_gui_tasks')
if lock and tasks is not None: if lock and tasks is not None:
with lock: with lock: tasks.append({"action": "custom_callback", "callback": check_all})
tasks.append({ if event.wait(timeout=10):
"action": "custom_callback",
"callback": check_all
})
if event.wait(timeout=60):
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
self.wfile.write(json.dumps(result).encode('utf-8')) self.wfile.write(json.dumps(result).encode("utf-8"))
else: else:
self.send_response(504) self.send_response(504)
self.end_headers() self.end_headers()
self.wfile.write(json.dumps({'error': 'timeout'}).encode('utf-8'))
else: else:
self.send_response(404) self.send_response(404)
self.end_headers() self.end_headers()
def do_POST(self) -> None: def do_POST(self) -> None:
app = self.server.app app = self.server.app
content_length = int(self.headers.get('Content-Length', 0)) content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length) body = self.rfile.read(content_length)
body_str = body.decode('utf-8') if body else "" body_str = body.decode("utf-8") if body else ""
session_logger.log_api_hook("POST", self.path, body_str) session_logger.log_api_hook("POST", self.path, body_str)
try: try:
data = json.loads(body_str) if body_str else {} data = json.loads(body_str) if body_str else {}
if self.path == '/api/project': if self.path == "/api/project":
project = _get_app_attr(app, 'project') project = _get_app_attr(app, "project")
_set_app_attr(app, 'project', data.get('project', project)) _set_app_attr(app, "project", data.get("project", project))
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
self.wfile.write(json.dumps({'status': 'updated'}).encode('utf-8')) self.wfile.write(json.dumps({"status": "updated"}).encode("utf-8"))
elif self.path.startswith('/api/confirm/'): elif self.path.startswith("/api/confirm/"):
action_id = self.path.split('/')[-1] action_id = self.path.split("/")[-1]
approved = data.get('approved', False) approved = data.get("approved", False)
resolve_func = _get_app_attr(app, 'resolve_pending_action') resolve_func = _get_app_attr(app, "resolve_pending_action")
if resolve_func: if resolve_func:
success = resolve_func(action_id, approved) success = resolve_func(action_id, approved)
if success: if success:
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
self.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')) self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
else: else:
self.send_response(404) self.send_response(404)
self.end_headers() self.end_headers()
else: else:
self.send_response(500) self.send_response(500)
self.end_headers() self.end_headers()
elif self.path == '/api/session': elif self.path == "/api/session":
lock = _get_app_attr(app, '_disc_entries_lock') lock = _get_app_attr(app, "_disc_entries_lock")
entries = _get_app_attr(app, 'disc_entries') entries = _get_app_attr(app, "disc_entries")
new_entries = data.get('session', {}).get('entries', entries) new_entries = data.get("session", {}).get("entries", entries)
if lock: if lock:
with lock: with lock: _set_app_attr(app, "disc_entries", new_entries)
_set_app_attr(app, 'disc_entries', new_entries)
else: else:
_set_app_attr(app, 'disc_entries', new_entries) _set_app_attr(app, "disc_entries", new_entries)
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
self.wfile.write(json.dumps({'status': 'updated'}).encode('utf-8')) self.wfile.write(json.dumps({"status": "updated"}).encode("utf-8"))
elif self.path == '/api/gui': elif self.path == "/api/gui":
lock = _get_app_attr(app, '_pending_gui_tasks_lock') lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, '_pending_gui_tasks') tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None: if lock and tasks is not None:
with lock: with lock: tasks.append(data)
tasks.append(data)
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
self.wfile.write(json.dumps({'status': 'queued'}).encode('utf-8')) self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == '/api/ask': elif self.path == "/api/gui/value":
field_tag = data.get("field")
event = threading.Event()
result = {"value": None}
def get_val():
try:
settable = _get_app_attr(app, "_settable_fields", {})
if field_tag in settable:
attr = settable[field_tag]
result["value"] = _get_app_attr(app, attr, None)
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_val})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/ask":
request_id = str(uuid.uuid4()) request_id = str(uuid.uuid4())
event = threading.Event() event = threading.Event()
pending_asks = _get_app_attr(app, '_pending_asks') pending_asks = _get_app_attr(app, "_pending_asks")
if pending_asks is None: if pending_asks is None:
pending_asks = {} pending_asks = {}
_set_app_attr(app, '_pending_asks', pending_asks) _set_app_attr(app, "_pending_asks", pending_asks)
ask_responses = _get_app_attr(app, '_ask_responses') ask_responses = _get_app_attr(app, "_ask_responses")
if ask_responses is None: if ask_responses is None:
ask_responses = {} ask_responses = {}
_set_app_attr(app, '_ask_responses', ask_responses) _set_app_attr(app, "_ask_responses", ask_responses)
pending_asks[request_id] = event pending_asks[request_id] = event
event_queue_lock = _get_app_attr(app, "_api_event_queue_lock")
event_queue_lock = _get_app_attr(app, '_api_event_queue_lock') event_queue = _get_app_attr(app, "_api_event_queue")
event_queue = _get_app_attr(app, '_api_event_queue')
if event_queue is not None: if event_queue is not None:
if event_queue_lock: if event_queue_lock:
with event_queue_lock: with event_queue_lock: event_queue.append({"type": "ask_received", "request_id": request_id, "data": data})
event_queue.append({"type": "ask_received", "request_id": request_id, "data": data})
else: else:
event_queue.append({"type": "ask_received", "request_id": request_id, "data": data}) event_queue.append({"type": "ask_received", "request_id": request_id, "data": data})
gui_tasks_lock = _get_app_attr(app, "_pending_gui_tasks_lock")
gui_tasks_lock = _get_app_attr(app, '_pending_gui_tasks_lock') gui_tasks = _get_app_attr(app, "_pending_gui_tasks")
gui_tasks = _get_app_attr(app, '_pending_gui_tasks')
if gui_tasks is not None: if gui_tasks is not None:
if gui_tasks_lock: if gui_tasks_lock:
with gui_tasks_lock: with gui_tasks_lock: gui_tasks.append({"type": "ask", "request_id": request_id, "data": data})
gui_tasks.append({"type": "ask", "request_id": request_id, "data": data})
else: else:
gui_tasks.append({"type": "ask", "request_id": request_id, "data": data}) gui_tasks.append({"type": "ask", "request_id": request_id, "data": data})
if event.wait(timeout=60.0): if event.wait(timeout=60.0):
response_data = ask_responses.get(request_id) response_data = ask_responses.get(request_id)
if request_id in ask_responses: del ask_responses[request_id] if request_id in ask_responses: del ask_responses[request_id]
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
self.wfile.write(json.dumps({'status': 'ok', 'response': response_data}).encode('utf-8')) self.wfile.write(json.dumps({"status": "ok", "response": response_data}).encode("utf-8"))
else: else:
if request_id in pending_asks: del pending_asks[request_id] if request_id in pending_asks: del pending_asks[request_id]
self.send_response(504) self.send_response(504)
self.end_headers() self.end_headers()
self.wfile.write(json.dumps({'error': 'timeout'}).encode('utf-8')) elif self.path == "/api/ask/respond":
elif self.path == '/api/ask/respond': request_id = data.get("request_id")
request_id = data.get('request_id') response_data = data.get("response")
response_data = data.get('response') pending_asks = _get_app_attr(app, "_pending_asks")
pending_asks = _get_app_attr(app, '_pending_asks') ask_responses = _get_app_attr(app, "_ask_responses")
ask_responses = _get_app_attr(app, '_ask_responses')
if request_id and pending_asks and request_id in pending_asks: if request_id and pending_asks and request_id in pending_asks:
ask_responses[request_id] = response_data ask_responses[request_id] = response_data
event = pending_asks[request_id] event = pending_asks[request_id]
event.set() event.set()
del pending_asks[request_id] del pending_asks[request_id]
gui_tasks_lock = _get_app_attr(app, "_pending_gui_tasks_lock")
gui_tasks_lock = _get_app_attr(app, '_pending_gui_tasks_lock') gui_tasks = _get_app_attr(app, "_pending_gui_tasks")
gui_tasks = _get_app_attr(app, '_pending_gui_tasks')
if gui_tasks is not None: if gui_tasks is not None:
if gui_tasks_lock: if gui_tasks_lock:
with gui_tasks_lock: with gui_tasks_lock: gui_tasks.append({"action": "clear_ask", "request_id": request_id})
gui_tasks.append({"action": "clear_ask", "request_id": request_id})
else: else:
gui_tasks.append({"action": "clear_ask", "request_id": request_id}) gui_tasks.append({"action": "clear_ask", "request_id": request_id})
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
self.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')) self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
else: else:
self.send_response(404) self.send_response(404)
self.end_headers() self.end_headers()
@@ -356,9 +311,10 @@ class HookHandler(BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
except Exception as e: except Exception as e:
self.send_response(500) self.send_response(500)
self.send_header('Content-Type', 'application/json') self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
self.wfile.write(json.dumps({'error': str(e)}).encode('utf-8')) self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8"))
def log_message(self, format: str, *args: Any) -> None: def log_message(self, format: str, *args: Any) -> None:
logging.info("Hook API: " + format % args) logging.info("Hook API: " + format % args)

View File

@@ -66,8 +66,11 @@ class ConfirmDialog:
self._approved = False self._approved = False
def wait(self) -> tuple[bool, str]: def wait(self) -> tuple[bool, str]:
start_time = time.time()
with self._condition: with self._condition:
while not self._done: while not self._done:
if time.time() - start_time > 120:
return False, self._script
self._condition.wait(timeout=0.1) self._condition.wait(timeout=0.1)
return self._approved, self._script return self._approved, self._script
@@ -79,8 +82,11 @@ class MMAApprovalDialog:
self._approved = False self._approved = False
def wait(self) -> tuple[bool, str]: def wait(self) -> tuple[bool, str]:
start_time = time.time()
with self._condition: with self._condition:
while not self._done: while not self._done:
if time.time() - start_time > 120:
return False, self._payload
self._condition.wait(timeout=0.1) self._condition.wait(timeout=0.1)
return self._approved, self._payload return self._approved, self._payload
@@ -94,8 +100,11 @@ class MMASpawnApprovalDialog:
self._abort = False self._abort = False
def wait(self) -> dict[str, Any]: def wait(self) -> dict[str, Any]:
start_time = time.time()
with self._condition: with self._condition:
while not self._done: while not self._done:
if time.time() - start_time > 120:
return {'approved': False, 'abort': True, 'prompt': self._prompt, 'context_md': self._context_md}
self._condition.wait(timeout=0.1) self._condition.wait(timeout=0.1)
return { return {
'approved': self._approved, 'approved': self._approved,
@@ -109,6 +118,8 @@ class AppController:
The headless controller for the Manual Slop application. The headless controller for the Manual Slop application.
Owns the application state and manages background services. Owns the application state and manages background services.
""" """
PROVIDERS: list[str] = ["gemini", "anthropic", "gemini_cli", "deepseek"]
def __init__(self): def __init__(self):
# Initialize locks first to avoid initialization order issues # Initialize locks first to avoid initialization order issues
self._send_thread_lock: threading.Lock = threading.Lock() self._send_thread_lock: threading.Lock = threading.Lock()
@@ -267,6 +278,230 @@ class AppController:
self.prior_session_entries: List[Dict[str, Any]] = [] self.prior_session_entries: List[Dict[str, Any]] = []
self.test_hooks_enabled: bool = ("--enable-test-hooks" in sys.argv) or (os.environ.get("SLOP_TEST_HOOKS") == "1") self.test_hooks_enabled: bool = ("--enable-test-hooks" in sys.argv) or (os.environ.get("SLOP_TEST_HOOKS") == "1")
self.ui_manual_approve: bool = False self.ui_manual_approve: bool = False
self._init_actions()
def _init_actions(self) -> None:
# Set up state-related action maps
self._clickable_actions: dict[str, Callable[..., Any]] = {
'btn_reset': self._handle_reset_session,
'btn_gen_send': self._handle_generate_send,
'btn_md_only': self._handle_md_only,
'btn_approve_script': self._handle_approve_script,
'btn_reject_script': self._handle_reject_script,
'btn_project_save': self._cb_project_save,
'btn_disc_create': self._cb_disc_create,
'btn_mma_plan_epic': self._cb_plan_epic,
'btn_mma_accept_tracks': self._cb_accept_tracks,
'btn_mma_start_track': self._cb_start_track,
'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type),
'btn_approve_tool': self._handle_approve_ask,
'btn_approve_mma_step': lambda: self._handle_mma_respond(approved=True),
'btn_approve_spawn': lambda: self._handle_mma_respond(approved=True),
}
self._predefined_callbacks: dict[str, Callable[..., Any]] = {
'_test_callback_func_write_to_file': self._test_callback_func_write_to_file
}
def _process_pending_gui_tasks(self) -> None:
if not self._pending_gui_tasks:
return
with self._pending_gui_tasks_lock:
tasks = self._pending_gui_tasks[:]
self._pending_gui_tasks.clear()
for task in tasks:
try:
action = task.get("action")
if action:
session_logger.log_api_hook("PROCESS_TASK", action, str(task))
# ...
if action == "refresh_api_metrics":
self._refresh_api_metrics(task.get("payload", {}), md_content=self.last_md or None)
elif action == "handle_ai_response":
payload = task.get("payload", {})
text = payload.get("text", "")
stream_id = payload.get("stream_id")
is_streaming = payload.get("status") == "streaming..."
if stream_id:
if is_streaming:
if stream_id not in self.mma_streams: self.mma_streams[stream_id] = ""
self.mma_streams[stream_id] += text
else:
self.mma_streams[stream_id] = text
if stream_id == "Tier 1":
if "status" in payload:
self.ai_status = payload["status"]
else:
if is_streaming:
self.ai_response += text
else:
self.ai_response = text
self.ai_status = payload.get("status", "done")
self._trigger_blink = True
if not stream_id:
self._token_stats_dirty = True
# ONLY add to history when turn is complete
if self.ui_auto_add_history and not stream_id and not is_streaming:
role = payload.get("role", "AI")
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": self.ai_response,
"collapsed": False,
"ts": project_manager.now_ts()
})
elif action == "mma_stream_append":
payload = task.get("payload", {})
stream_id = payload.get("stream_id")
text = payload.get("text", "")
if stream_id:
if stream_id not in self.mma_streams:
self.mma_streams[stream_id] = ""
self.mma_streams[stream_id] += text
elif action == "show_track_proposal":
self.proposed_tracks = task.get("payload", [])
self._show_track_proposal_modal = True
elif action == "mma_state_update":
payload = task.get("payload", {})
self.mma_status = payload.get("status", "idle")
self.active_tier = payload.get("active_tier")
self.mma_tier_usage = payload.get("tier_usage", self.mma_tier_usage)
self.active_tickets = payload.get("tickets", [])
track_data = payload.get("track")
if track_data:
tickets = []
for t_data in self.active_tickets:
tickets.append(Ticket(**t_data))
self.active_track = Track(
id=track_data.get("id"),
description=track_data.get("title", ""),
tickets=tickets
)
elif action == "set_value":
item = task.get("item")
value = task.get("value")
if item in self._settable_fields:
attr_name = self._settable_fields[item]
setattr(self, attr_name, value)
if item == "gcli_path":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=str(value))
else:
ai_client._gemini_cli_adapter.binary_path = str(value)
elif action == "click":
item = task.get("item")
user_data = task.get("user_data")
if item == "btn_project_new_automated":
self._cb_new_project_automated(user_data)
elif item == "btn_mma_load_track":
self._cb_load_track(str(user_data or ""))
elif item in self._clickable_actions:
import inspect
func = self._clickable_actions[item]
try:
sig = inspect.signature(func)
if 'user_data' in sig.parameters:
func(user_data=user_data)
else:
func()
except Exception:
func()
elif action == "select_list_item":
item = task.get("listbox", task.get("item"))
value = task.get("item_value", task.get("value"))
if item == "disc_listbox":
self._switch_discussion(str(value or ""))
elif task.get("type") == "ask":
self._pending_ask_dialog = True
self._ask_request_id = task.get("request_id")
self._ask_tool_data = task.get("data", {})
elif action == "clear_ask":
if self._ask_request_id == task.get("request_id"):
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
elif action == "custom_callback":
cb = task.get("callback")
args = task.get("args", [])
if callable(cb):
try: cb(*args)
except Exception as e: print(f"Error in direct custom callback: {e}")
elif cb in self._predefined_callbacks:
self._predefined_callbacks[cb](*args)
elif action == "mma_step_approval":
dlg = MMAApprovalDialog(str(task.get("ticket_id") or ""), str(task.get("payload") or ""))
self._pending_mma_approval = task
if "dialog_container" in task:
task["dialog_container"][0] = dlg
elif action == 'refresh_from_project':
self._refresh_from_project()
elif action == "mma_spawn_approval":
spawn_dlg = MMASpawnApprovalDialog(
str(task.get("ticket_id") or ""),
str(task.get("role") or ""),
str(task.get("prompt") or ""),
str(task.get("context_md") or "")
)
self._pending_mma_spawn = task
self._mma_spawn_prompt = task.get("prompt", "")
self._mma_spawn_context = task.get("context_md", "")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
if "dialog_container" in task:
task["dialog_container"][0] = spawn_dlg
except Exception as e:
print(f"Error executing GUI task: {e}")
def _process_pending_history_adds(self) -> None:
"""Synchronizes pending history entries to the active discussion and project state."""
with self._pending_history_adds_lock:
items = self._pending_history_adds[:]
self._pending_history_adds.clear()
if not items:
return
self._scroll_disc_to_bottom = True
for item in items:
item.get("role", "unknown")
if item.get("role") and item["role"] not in self.disc_roles:
self.disc_roles.append(item["role"])
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
disc_data = discussions.get(self.active_discussion)
if disc_data is not None:
if item.get("disc_title", self.active_discussion) == self.active_discussion:
if self.disc_entries is not disc_data.get("history"):
if "history" not in disc_data:
disc_data["history"] = []
disc_data["history"].append(project_manager.entry_to_str(item))
disc_data["last_updated"] = project_manager.now_ts()
with self._disc_entries_lock:
self.disc_entries.append(item)
def _test_callback_func_write_to_file(self, data: str) -> None:
"""A dummy function that a custom_callback would execute for testing."""
with open("test_callback_output.txt", "w") as f:
f.write(data)
def _handle_approve_script(self, user_data=None) -> None:
"""Approves the currently pending PowerShell script."""
with self._pending_dialog_lock:
dlg = self._pending_dialog
if dlg:
with dlg._condition:
dlg._approved = True
dlg._done = True
dlg._condition.notify_all()
self._pending_dialog = None
def _handle_reject_script(self, user_data=None) -> None:
"""Rejects the currently pending PowerShell script."""
with self._pending_dialog_lock:
dlg = self._pending_dialog
if dlg:
with dlg._condition:
dlg._approved = False
dlg._done = True
dlg._condition.notify_all()
self._pending_dialog = None
def init_state(self): def init_state(self):
"""Initializes the application state from configurations.""" """Initializes the application state from configurations."""
@@ -418,10 +653,12 @@ class AppController:
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start() self._loop_thread.start()
def stop_services(self) -> None: def shutdown(self) -> None:
"""Stops background threads and cleans up resources.""" """Stops background threads and cleans up resources."""
import ai_client import ai_client
ai_client.cleanup() ai_client.cleanup()
if hasattr(self, 'hook_server') and self.hook_server:
self.hook_server.stop()
if self._loop and self._loop.is_running(): if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop) self._loop.call_soon_threadsafe(self._loop.stop)
if self._loop_thread and self._loop_thread.is_alive(): if self._loop_thread and self._loop_thread.is_alive():
@@ -440,9 +677,9 @@ class AppController:
ai_client.tool_log_callback = self._on_tool_log ai_client.tool_log_callback = self._on_tool_log
mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics
self.perf_monitor.alert_callback = self._on_performance_alert self.perf_monitor.alert_callback = self._on_performance_alert
ai_client.events.on("request_start", self._on_api_event) ai_client.events.on("request_start", lambda **kw: self._on_api_event("request_start", **kw))
ai_client.events.on("response_received", self._on_api_event) ai_client.events.on("response_received", lambda **kw: self._on_api_event("response_received", **kw))
ai_client.events.on("tool_execution", self._on_api_event) ai_client.events.on("tool_execution", lambda **kw: self._on_api_event("tool_execution", **kw))
self._settable_fields: Dict[str, str] = { self._settable_fields: Dict[str, str] = {
'ai_input': 'ui_ai_input', 'ai_input': 'ui_ai_input',
@@ -477,12 +714,35 @@ class AppController:
"""Internal loop runner.""" """Internal loop runner."""
asyncio.set_event_loop(self._loop) asyncio.set_event_loop(self._loop)
self._loop.create_task(self._process_event_queue()) self._loop.create_task(self._process_event_queue())
# Fallback: process queues even if GUI thread is idling/stuck (or in headless mode)
async def queue_fallback() -> None:
while True:
try:
# These methods are normally called by the GUI thread,
# but we call them here as a fallback for headless/background operations.
# The methods themselves are expected to be thread-safe or handle locks.
# Since they are on 'self' (the controller), and App delegates to them,
# we need to make sure we don't double-process if App is also calling them.
# However, _pending_gui_tasks uses a lock, so it's safe.
if hasattr(self, '_process_pending_gui_tasks'):
self._process_pending_gui_tasks()
if hasattr(self, '_process_pending_history_adds'):
self._process_pending_history_adds()
except: pass
await asyncio.sleep(0.1)
self._loop.create_task(queue_fallback())
self._loop.run_forever() self._loop.run_forever()
async def _process_event_queue(self) -> None: async def _process_event_queue(self) -> None:
"""Listens for and processes events from the AsyncEventQueue.""" """Listens for and processes events from the AsyncEventQueue."""
sys.stderr.write("[DEBUG] _process_event_queue started\n")
sys.stderr.flush()
while True: while True:
event_name, payload = await self.event_queue.get() event_name, payload = await self.event_queue.get()
sys.stderr.write(f"[DEBUG] _process_event_queue got event: {event_name}\n")
sys.stderr.flush()
if event_name == "user_request": if event_name == "user_request":
self._loop.run_in_executor(None, self._handle_request_event, payload) self._loop.run_in_executor(None, self._handle_request_event, payload)
elif event_name == "response": elif event_name == "response":
@@ -517,6 +777,10 @@ class AppController:
"collapsed": False, "collapsed": False,
"ts": project_manager.now_ts() "ts": project_manager.now_ts()
}) })
# Clear response area for new turn
self.ai_response = ""
csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()])
ai_client.set_custom_system_prompt("\n\n".join(csp)) ai_client.set_custom_system_prompt("\n\n".join(csp))
ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit) ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit)
@@ -528,11 +792,13 @@ class AppController:
event.base_dir, event.base_dir,
event.file_items, event.file_items,
event.disc_text, event.disc_text,
stream=True,
stream_callback=lambda text: self._on_ai_stream(text),
pre_tool_callback=self._confirm_and_run, pre_tool_callback=self._confirm_and_run,
qa_callback=ai_client.run_tier4_analysis qa_callback=ai_client.run_tier4_analysis
) )
asyncio.run_coroutine_threadsafe( asyncio.run_coroutine_threadsafe(
self.event_queue.put("response", {"text": resp, "status": "done"}), self.event_queue.put("response", {"text": resp, "status": "done", "role": "AI"}),
self._loop self._loop
) )
except ProviderError as e: except ProviderError as e:
@@ -546,6 +812,13 @@ class AppController:
self._loop self._loop
) )
def _on_ai_stream(self, text: str) -> None:
"""Handles streaming text from the AI."""
asyncio.run_coroutine_threadsafe(
self.event_queue.put("response", {"text": text, "status": "streaming...", "role": "AI"}),
self._loop
)
def _on_comms_entry(self, entry: Dict[str, Any]) -> None: def _on_comms_entry(self, entry: Dict[str, Any]) -> None:
session_logger.log_comms(entry) session_logger.log_comms(entry)
entry["local_ts"] = time.time() entry["local_ts"] = time.time()
@@ -586,11 +859,13 @@ class AppController:
with self._pending_tool_calls_lock: with self._pending_tool_calls_lock:
self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
def _on_api_event(self, *args: Any, **kwargs: Any) -> None: def _on_api_event(self, event_name: str, **kwargs: Any) -> None:
payload = kwargs.get("payload", {}) payload = kwargs.get("payload", {})
with self._pending_gui_tasks_lock: with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload}) self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload})
if self.test_hooks_enabled:
with self._api_event_queue_lock:
self._api_event_queue.append({"type": event_name, "payload": payload})
def _on_performance_alert(self, message: str) -> None: def _on_performance_alert(self, message: str) -> None:
alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load." alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load."
with self._pending_history_adds_lock: with self._pending_history_adds_lock:
@@ -601,12 +876,19 @@ class AppController:
}) })
def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> Optional[str]: def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> Optional[str]:
sys.stderr.write(f"[DEBUG] _confirm_and_run called. test_hooks={self.test_hooks_enabled}, manual_approve={getattr(self, 'ui_manual_approve', False)}\n")
sys.stderr.flush()
if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False): if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False):
sys.stderr.write("[DEBUG] Auto-approving script.\n")
sys.stderr.flush()
self.ai_status = "running powershell..." self.ai_status = "running powershell..."
output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback) output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback)
self._append_tool_log(script, output) self._append_tool_log(script, output)
self.ai_status = "powershell done, awaiting AI..." self.ai_status = "powershell done, awaiting AI..."
return output return output
sys.stderr.write("[DEBUG] Creating ConfirmDialog.\n")
sys.stderr.flush()
dialog = ConfirmDialog(script, base_dir) dialog = ConfirmDialog(script, base_dir)
is_headless = "--headless" in sys.argv is_headless = "--headless" in sys.argv
if is_headless: if is_headless:
@@ -625,8 +907,14 @@ class AppController:
"base_dir": str(base_dir), "base_dir": str(base_dir),
"ts": time.time() "ts": time.time()
}) })
sys.stderr.write(f"[DEBUG] Appended script_confirmation_required to _api_event_queue. ID={dialog._uid}\n")
sys.stderr.flush()
sys.stderr.write(f"[DEBUG] Waiting for dialog ID={dialog._uid}...\n")
sys.stderr.flush()
approved, final_script = dialog.wait() approved, final_script = dialog.wait()
sys.stderr.write(f"[DEBUG] Dialog ID={dialog._uid} finished wait. approved={approved}\n")
sys.stderr.flush()
if is_headless: if is_headless:
with self._pending_dialog_lock: with self._pending_dialog_lock:
if dialog._uid in self._pending_actions: if dialog._uid in self._pending_actions:
@@ -1119,25 +1407,37 @@ class AppController:
self._ask_tool_data = None self._ask_tool_data = None
def _handle_reset_session(self) -> None: def _handle_reset_session(self) -> None:
"""Logic for resetting the AI session.""" """Logic for resetting the AI session and GUI state."""
ai_client.reset_session() ai_client.reset_session()
ai_client.clear_comms_log() ai_client.clear_comms_log()
self._tool_log.clear() self._tool_log.clear()
self._comms_log.clear() self._comms_log.clear()
self.disc_entries.clear() self.disc_entries.clear()
# Clear history in project dict too # Clear history in ALL discussions to be safe
disc_sec = self.project.get("discussion", {}) disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {}) discussions = disc_sec.get("discussions", {})
if self.active_discussion in discussions: for d_name in discussions:
discussions[self.active_discussion]["history"] = [] discussions[d_name]["history"] = []
self.ai_status = "session reset" self.ai_status = "session reset"
self.ai_response = "" self.ai_response = ""
self.ui_ai_input = "" self.ui_ai_input = ""
self.ui_manual_approve = False
self.ui_auto_add_history = False
self._current_provider = "gemini"
self._current_model = "gemini-2.5-flash-lite"
ai_client.set_provider(self._current_provider, self._current_model)
with self._pending_history_adds_lock: with self._pending_history_adds_lock:
self._pending_history_adds.clear() self._pending_history_adds.clear()
with self._api_event_queue_lock:
self._api_event_queue.clear()
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.clear()
def _handle_md_only(self) -> None: def _handle_md_only(self) -> None:
"""Logic for the 'MD Only' action.""" """Logic for the 'MD Only' action."""
def worker():
try: try:
md, path, *_ = self._do_generate() md, path, *_ = self._do_generate()
self.last_md = md self.last_md = md
@@ -1147,21 +1447,25 @@ class AppController:
self._refresh_api_metrics({}, md_content=md) self._refresh_api_metrics({}, md_content=md)
except Exception as e: except Exception as e:
self.ai_status = f"error: {e}" self.ai_status = f"error: {e}"
threading.Thread(target=worker, daemon=True).start()
def _handle_generate_send(self) -> None: def _handle_generate_send(self) -> None:
"""Logic for the 'Gen + Send' action.""" """Logic for the 'Gen + Send' action."""
def worker():
sys.stderr.write("[DEBUG] _handle_generate_send worker started\n")
sys.stderr.flush()
try: try:
md, path, file_items, stable_md, disc_text = self._do_generate() md, path, file_items, stable_md, disc_text = self._do_generate()
self._last_stable_md = stable_md self._last_stable_md = stable_md
self.last_md = md self.last_md = md
self.last_md_path = path self.last_md_path = path
self.last_file_items = file_items self.last_file_items = file_items
except Exception as e:
self.ai_status = f"generate error: {e}"
return
self.ai_status = "sending..." self.ai_status = "sending..."
user_msg = self.ui_ai_input user_msg = self.ui_ai_input
base_dir = self.ui_files_base_dir base_dir = self.ui_files_base_dir
sys.stderr.write(f"[DEBUG] _do_generate success. Prompt: {user_msg[:50]}...\n")
sys.stderr.flush()
# Prepare event payload # Prepare event payload
event_payload = events.UserRequestEvent( event_payload = events.UserRequestEvent(
prompt=user_msg, prompt=user_msg,
@@ -1175,6 +1479,14 @@ class AppController:
self.event_queue.put("user_request", event_payload), self.event_queue.put("user_request", event_payload),
self._loop self._loop
) )
sys.stderr.write("[DEBUG] Enqueued user_request event\n")
sys.stderr.flush()
except Exception as e:
import traceback
sys.stderr.write(f"[DEBUG] _do_generate ERROR: {e}\n{traceback.format_exc()}\n")
sys.stderr.flush()
self.ai_status = f"generate error: {e}"
threading.Thread(target=worker, daemon=True).start()
def _recalculate_session_usage(self) -> None: def _recalculate_session_usage(self) -> None:
usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0} usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0}

View File

@@ -38,6 +38,10 @@ class EventEmitter:
for callback in self._listeners[event_name]: for callback in self._listeners[event_name]:
callback(*args, **kwargs) callback(*args, **kwargs)
def clear(self) -> None:
"""Clears all registered listeners."""
self._listeners.clear()
class AsyncEventQueue: class AsyncEventQueue:
""" """
Asynchronous event queue for decoupled communication using asyncio.Queue. Asynchronous event queue for decoupled communication using asyncio.Queue.
@@ -66,6 +70,14 @@ class AsyncEventQueue:
""" """
return await self._queue.get() return await self._queue.get()
def task_done(self) -> None:
"""Signals that a formerly enqueued task is complete."""
self._queue.task_done()
async def join(self) -> None:
"""Blocks until all items in the queue have been gotten and processed."""
await self._queue.join()
class UserRequestEvent: class UserRequestEvent:
""" """
Payload for a user request event. Payload for a user request event.

View File

@@ -103,6 +103,9 @@ class App:
def __init__(self) -> None: def __init__(self) -> None:
# Initialize controller and delegate state # Initialize controller and delegate state
self.controller = AppController() self.controller = AppController()
# Restore legacy PROVIDERS to controller if needed (it already has it via delegation if set on class level, but let's be explicit)
if not hasattr(self.controller, 'PROVIDERS'):
self.controller.PROVIDERS = PROVIDERS
self.controller.init_state() self.controller.init_state()
self.controller.start_services(self) self.controller.start_services(self)
@@ -116,55 +119,9 @@ class App:
self._pending_dialog_lock = self.controller._pending_dialog_lock self._pending_dialog_lock = self.controller._pending_dialog_lock
self._api_event_queue_lock = self.controller._api_event_queue_lock self._api_event_queue_lock = self.controller._api_event_queue_lock
# UI-specific initialization
self._init_ui_actions()
def _init_ui_actions(self) -> None:
# Set up UI-specific action maps
self._clickable_actions: dict[str, Callable[..., Any]] = {
'btn_reset': self._handle_reset_session,
'btn_gen_send': self._handle_generate_send,
'btn_md_only': self._handle_md_only,
'btn_approve_script': self._handle_approve_script,
'btn_reject_script': self._handle_reject_script,
'btn_project_save': self._cb_project_save,
'btn_disc_create': self._cb_disc_create,
'btn_mma_plan_epic': self._cb_plan_epic,
'btn_mma_accept_tracks': self._cb_accept_tracks,
'btn_mma_start_track': self._cb_start_track,
'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type),
'btn_approve_tool': self._handle_approve_tool,
'btn_approve_mma_step': self._handle_approve_mma_step,
'btn_approve_spawn': self._handle_approve_spawn,
}
self._predefined_callbacks: dict[str, Callable[..., Any]] = {
'_test_callback_func_write_to_file': self._test_callback_func_write_to_file
}
self._discussion_names_cache: list[str] = [] self._discussion_names_cache: list[str] = []
self._discussion_names_dirty: bool = True self._discussion_names_dirty: bool = True
def _handle_approve_script(self, user_data=None) -> None:
"""Approves the currently pending PowerShell script."""
with self._pending_dialog_lock:
dlg = self._pending_dialog
if dlg:
with dlg._condition:
dlg._approved = True
dlg._done = True
dlg._condition.notify_all()
self._pending_dialog = None
def _handle_reject_script(self, user_data=None) -> None:
"""Rejects the currently pending PowerShell script."""
with self._pending_dialog_lock:
dlg = self._pending_dialog
if dlg:
with dlg._condition:
dlg._approved = False
dlg._done = True
dlg._condition.notify_all()
self._pending_dialog = None
def _handle_approve_tool(self, user_data=None) -> None: def _handle_approve_tool(self, user_data=None) -> None:
"""UI-level wrapper for approving a pending tool execution ask.""" """UI-level wrapper for approving a pending tool execution ask."""
self._handle_approve_ask() self._handle_approve_ask()
@@ -210,194 +167,9 @@ class App:
# ---------------------------------------------------------------- logic # ---------------------------------------------------------------- logic
def _process_pending_gui_tasks(self) -> None:
if not self._pending_gui_tasks:
return
with self._pending_gui_tasks_lock:
tasks = self._pending_gui_tasks[:]
self._pending_gui_tasks.clear()
for task in tasks:
try:
action = task.get("action")
if action == "refresh_api_metrics":
self._refresh_api_metrics(task.get("payload", {}), md_content=self.last_md or None)
elif action == "handle_ai_response":
payload = task.get("payload", {})
text = payload.get("text", "")
stream_id = payload.get("stream_id")
is_streaming = payload.get("status") == "streaming..."
if stream_id:
if is_streaming:
if stream_id not in self.mma_streams: self.mma_streams[stream_id] = ""
self.mma_streams[stream_id] += text
else:
self.mma_streams[stream_id] = text
if stream_id == "Tier 1":
if "status" in payload:
self.ai_status = payload["status"]
else:
if is_streaming:
self.ai_response += text
else:
self.ai_response = text
self.ai_status = payload.get("status", "done")
self._trigger_blink = True
if not stream_id:
self._token_stats_dirty = True
if self.ui_auto_add_history and not stream_id:
role = payload.get("role", "AI")
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": self.ai_response,
"collapsed": False,
"ts": project_manager.now_ts()
})
elif action == "mma_stream_append":
payload = task.get("payload", {})
stream_id = payload.get("stream_id")
text = payload.get("text", "")
if stream_id:
if stream_id not in self.mma_streams:
self.mma_streams[stream_id] = ""
self.mma_streams[stream_id] += text
elif action == "show_track_proposal":
self.proposed_tracks = task.get("payload", [])
self._show_track_proposal_modal = True
elif action == "mma_state_update":
payload = task.get("payload", {})
self.mma_status = payload.get("status", "idle")
self.active_tier = payload.get("active_tier")
self.mma_tier_usage = payload.get("tier_usage", self.mma_tier_usage)
self.active_tickets = payload.get("tickets", [])
track_data = payload.get("track")
if track_data:
tickets = []
for t_data in self.active_tickets:
tickets.append(Ticket(**t_data))
self.active_track = Track(
id=track_data.get("id"),
description=track_data.get("title", ""),
tickets=tickets
)
elif action == "set_value":
item = task.get("item")
value = task.get("value")
if item in self._settable_fields:
attr_name = self._settable_fields[item]
setattr(self, attr_name, value)
if item == "gcli_path":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=str(value))
else:
ai_client._gemini_cli_adapter.binary_path = str(value)
elif action == "click":
item = task.get("item")
user_data = task.get("user_data")
if item == "btn_project_new_automated":
self._cb_new_project_automated(user_data)
elif item == "btn_mma_load_track":
self._cb_load_track(str(user_data or ""))
elif item in self._clickable_actions:
# Check if it's a method that accepts user_data
import inspect
func = self._clickable_actions[item]
try:
sig = inspect.signature(func)
if 'user_data' in sig.parameters:
func(user_data=user_data)
else:
func()
except Exception:
func()
elif action == "select_list_item":
item = task.get("listbox", task.get("item"))
value = task.get("item_value", task.get("value"))
if item == "disc_listbox":
self._switch_discussion(str(value or ""))
elif task.get("type") == "ask":
self._pending_ask_dialog = True
self._ask_request_id = task.get("request_id")
self._ask_tool_data = task.get("data", {})
elif action == "clear_ask":
if self._ask_request_id == task.get("request_id"):
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
elif action == "custom_callback":
cb = task.get("callback")
args = task.get("args", [])
if callable(cb):
try: cb(*args)
except Exception as e: print(f"Error in direct custom callback: {e}")
elif cb in self._predefined_callbacks:
self._predefined_callbacks[cb](*args)
elif action == "mma_step_approval":
dlg = MMAApprovalDialog(str(task.get("ticket_id") or ""), str(task.get("payload") or ""))
self._pending_mma_approval = task
if "dialog_container" in task:
task["dialog_container"][0] = dlg
elif action == 'refresh_from_project':
self._refresh_from_project()
elif action == "mma_spawn_approval":
spawn_dlg = MMASpawnApprovalDialog(
str(task.get("ticket_id") or ""),
str(task.get("role") or ""),
str(task.get("prompt") or ""),
str(task.get("context_md") or "")
)
self._pending_mma_spawn = task
self._mma_spawn_prompt = task.get("prompt", "")
self._mma_spawn_context = task.get("context_md", "")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
if "dialog_container" in task:
task["dialog_container"][0] = spawn_dlg
except Exception as e:
print(f"Error executing GUI task: {e}")
def _process_pending_history_adds(self) -> None:
"""Synchronizes pending history entries to the active discussion and project state."""
with self._pending_history_adds_lock:
items = self._pending_history_adds[:]
self._pending_history_adds.clear()
if not items:
return
self._scroll_disc_to_bottom = True
for item in items:
item.get("role", "unknown")
if item.get("role") and item["role"] not in self.disc_roles:
self.disc_roles.append(item["role"])
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
disc_data = discussions.get(self.active_discussion)
if disc_data is not None:
if item.get("disc_title", self.active_discussion) == self.active_discussion:
if self.disc_entries is not disc_data.get("history"):
if "history" not in disc_data:
disc_data["history"] = []
disc_data["history"].append(project_manager.entry_to_str(item))
disc_data["last_updated"] = project_manager.now_ts()
with self._disc_entries_lock:
self.disc_entries.append(item)
def shutdown(self) -> None: def shutdown(self) -> None:
"""Cleanly shuts down the app's background tasks and saves state.""" """Cleanly shuts down the app's background tasks and saves state."""
self.controller.stop_services() self.controller.shutdown()
# Join other threads if they exist
if self.send_thread and self.send_thread.is_alive():
self.send_thread.join(timeout=1.0)
if self.models_thread and self.models_thread.is_alive():
self.models_thread.join(timeout=1.0)
# Final State persistence
try:
ai_client.cleanup() # Destroy active API caches to stop billing
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
except: pass
def _test_callback_func_write_to_file(self, data: str) -> None: def _test_callback_func_write_to_file(self, data: str) -> None:
"""A dummy function that a custom_callback would execute for testing.""" """A dummy function that a custom_callback would execute for testing."""

View File

@@ -56,6 +56,28 @@ class VerificationLogger:
f.write(f"{status} {self.test_name} ({result_msg})\n\n") f.write(f"{status} {self.test_name} ({result_msg})\n\n")
print(f"[FINAL] {self.test_name}: {status} - {result_msg}") print(f"[FINAL] {self.test_name}: {status} - {result_msg}")
@pytest.fixture(autouse=True)
def reset_ai_client() -> Generator[None, None, None]:
"""
Autouse fixture that resets the ai_client global state before each test.
This is critical for preventing state pollution between tests.
"""
import ai_client
import mcp_client
ai_client.reset_session()
# Reset callbacks to None or default to ensure no carry-over
ai_client.confirm_and_run_callback = None
ai_client.comms_log_callback = None
ai_client.tool_log_callback = None
# Clear all event listeners
ai_client.events.clear()
# Reset provider/model to defaults
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
# Reset MCP client state
mcp_client.configure([], [])
yield
ai_client.reset_session()
@pytest.fixture @pytest.fixture
def vlogger(request) -> VerificationLogger: def vlogger(request) -> VerificationLogger:
"""Fixture to provide a VerificationLogger instance to a test.""" """Fixture to provide a VerificationLogger instance to a test."""
@@ -109,8 +131,8 @@ def mock_app() -> Generator[App, None, None]:
app = App() app = App()
yield app yield app
if hasattr(app, 'controller'): if hasattr(app, 'controller'):
app.controller.stop_services() app.controller.shutdown()
if hasattr(app, 'shutdown'): elif hasattr(app, 'shutdown'):
app.shutdown() app.shutdown()
@pytest.fixture @pytest.fixture
@@ -142,7 +164,7 @@ def app_instance() -> Generator[App, None, None]:
yield app yield app
# Cleanup: Ensure background threads and asyncio loop are stopped # Cleanup: Ensure background threads and asyncio loop are stopped
if hasattr(app, 'controller'): if hasattr(app, 'controller'):
app.controller.stop_services() app.controller.shutdown()
if hasattr(app, 'shutdown'): if hasattr(app, 'shutdown'):
app.shutdown() app.shutdown()
@@ -209,10 +231,13 @@ def live_gui() -> Generator[tuple[subprocess.Popen, str], None, None]:
# Check if already running (shouldn't be) # Check if already running (shouldn't be)
try: try:
resp = requests.get("http://127.0.0.1:8999/status", timeout=0.1) resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
already_up = resp.status_code == 200 if resp.status_code == 200:
except: already_up = False print("[Fixture] WARNING: Hook Server already up on port 8999. Test state might be polluted.")
diag.log_state("Hook Server Port 8999", "Down", "UP" if already_up else "Down") # Optionally try to reset it
try: requests.post("http://127.0.0.1:8999/api/gui", json={"action": "click", "item": "btn_reset"}, timeout=1)
except: pass
except: pass
print(f"\n[Fixture] Starting {gui_script} --enable-test-hooks in {temp_workspace}...") print(f"\n[Fixture] Starting {gui_script} --enable-test-hooks in {temp_workspace}...")
os.makedirs("logs", exist_ok=True) os.makedirs("logs", exist_ok=True)

View File

@@ -52,7 +52,6 @@ def test_tools_sim_live(live_gui: Any) -> None:
sim.run() # Ensure history is updated via the async queue sim.run() # Ensure history is updated via the async queue
time.sleep(2) time.sleep(2)
sim.teardown() sim.teardown()
@pytest.mark.integration @pytest.mark.integration
def test_execution_sim_live(live_gui: Any) -> None: def test_execution_sim_live(live_gui: Any) -> None:
"""Run the Execution & Modals simulation against a live GUI.""" """Run the Execution & Modals simulation against a live GUI."""
@@ -60,7 +59,11 @@ def test_execution_sim_live(live_gui: Any) -> None:
assert client.wait_for_server(timeout=10) assert client.wait_for_server(timeout=10)
sim = ExecutionSimulation(client) sim = ExecutionSimulation(client)
sim.setup("LiveExecutionSim") sim.setup("LiveExecutionSim")
# Enable manual approval to test modals
client.set_value('manual_approve', True)
client.set_value('current_provider', 'gemini_cli') client.set_value('current_provider', 'gemini_cli')
client.set_value('gcli_path', f'"{sys.executable}" "{os.path.abspath("tests/mock_gemini_cli.py")}"') client.set_value('gcli_path', f'"{sys.executable}" "{os.path.abspath("tests/mock_gemini_cli.py")}"')
sim.run() sim.run()
time.sleep(2)
sim.teardown() sim.teardown()

View File

@@ -56,7 +56,8 @@ def test_gemini_cli_parameter_resilience(live_gui: Any) -> None:
""" """
client = ApiHookClient("http://127.0.0.1:8999") client = ApiHookClient("http://127.0.0.1:8999")
client.click("btn_reset") client.click("btn_reset")
time.sleep(1.5) time.sleep(1.0)
client.set_value("auto_add_history", True) client.set_value("auto_add_history", True)
client.set_value("manual_approve", True) client.set_value("manual_approve", True)
client.select_list_item("proj_files", "manual_slop") client.select_list_item("proj_files", "manual_slop")
@@ -130,7 +131,8 @@ def test_gemini_cli_loop_termination(live_gui: Any) -> None:
""" """
client = ApiHookClient("http://127.0.0.1:8999") client = ApiHookClient("http://127.0.0.1:8999")
client.click("btn_reset") client.click("btn_reset")
time.sleep(1.5) time.sleep(1.0)
client.set_value("auto_add_history", True) client.set_value("auto_add_history", True)
client.set_value("manual_approve", True) client.set_value("manual_approve", True)
client.select_list_item("proj_files", "manual_slop") client.select_list_item("proj_files", "manual_slop")

View File

@@ -13,7 +13,8 @@ def test_gemini_cli_full_integration(live_gui: Any) -> None:
client = ApiHookClient("http://127.0.0.1:8999") client = ApiHookClient("http://127.0.0.1:8999")
# 0. Reset session and enable history # 0. Reset session and enable history
client.click("btn_reset") client.click("btn_reset")
time.sleep(1.5) time.sleep(1.0)
client.set_value("auto_add_history", True) client.set_value("auto_add_history", True)
client.set_value("manual_approve", True) client.set_value("manual_approve", True)
# Switch to manual_slop project explicitly # Switch to manual_slop project explicitly
@@ -80,7 +81,8 @@ def test_gemini_cli_rejection_and_history(live_gui: Any) -> None:
client = ApiHookClient("http://127.0.0.1:8999") client = ApiHookClient("http://127.0.0.1:8999")
# 0. Reset session # 0. Reset session
client.click("btn_reset") client.click("btn_reset")
time.sleep(1.5) time.sleep(1.0)
client.set_value("auto_add_history", True) client.set_value("auto_add_history", True)
client.set_value("manual_approve", True) client.set_value("manual_approve", True)
client.select_list_item("proj_files", "manual_slop") client.select_list_item("proj_files", "manual_slop")

View File

@@ -68,3 +68,11 @@ def test_visual_mma_components(live_gui):
assert tickets[1]['status'] == "running" assert tickets[1]['status'] == "running"
print("Visual MMA component verification PASSED.") print("Visual MMA component verification PASSED.")
# Clean up the pending modal to prevent polluting subsequent tests
print("Cleaning up pending MMA modal...")
client.post_gui({
"action": "click",
"item": "btn_approve_mma_step"
})
time.sleep(0.5)