Files
manual_slop/tests/test_gui_streaming.py
Ed_ d1ce0eaaeb feat(gui): implement Phases 2-5 of Comprehensive GUI UX track
- Add cost tracking with new cost_tracker.py module
- Enhance Track Proposal modal with editable titles and goals
- Add Conductor Setup summary and New Track creation form to MMA Dashboard
- Implement Task DAG editing (add/delete tickets) and track-scoped discussion
- Add visual polish: color-coded statuses, tinted progress bars, and node indicators
- Support live worker streaming from AI providers to GUI panels
- Fix numerous integration test regressions and stabilize headless service
2026-03-01 20:17:31 -05:00

105 lines
3.4 KiB
Python

import pytest
import asyncio
from unittest.mock import patch, MagicMock
from gui_2 import App
import events
@pytest.fixture
def app_instance():
with (
patch('gui_2.load_config', return_value={'ai': {}, 'projects': {}}),
patch('gui_2.save_config'),
patch('gui_2.project_manager'),
patch('gui_2.session_logger'),
patch('gui_2.immapp.run'),
patch.object(App, '_load_active_project'),
patch.object(App, '_fetch_models'),
patch.object(App, '_load_fonts'),
patch.object(App, '_post_init')
):
app = App()
yield app
@pytest.mark.asyncio
async def test_mma_stream_event_routing(app_instance: App):
"""Verifies that 'mma_stream' events from AsyncEventQueue reach mma_streams."""
# 1. Mock received chunks from a Tier 3 worker
stream_id = "Tier 3 (Worker): T-001"
chunks = ["Thinking... ", "I will ", "list files."]
for chunk in chunks:
# Simulate receiving an 'mma_stream' event in the background asyncio worker
payload = {"stream_id": stream_id, "text": chunk}
# We manually trigger the logic inside _process_event_queue for this test
# to avoid dealing with the background thread's lifecycle.
with app_instance._pending_gui_tasks_lock:
app_instance._pending_gui_tasks.append({
"action": "mma_stream_append",
"payload": payload
})
# 2. Simulate GUI frame processing
app_instance._process_pending_gui_tasks()
# 3. Verify final state
expected_text = "".join(chunks)
assert app_instance.mma_streams.get(stream_id) == expected_text
@pytest.mark.asyncio
async def test_mma_stream_multiple_workers(app_instance: App):
"""Verifies that streaming works for multiple concurrent workers."""
s1 = "Tier 3 (Worker): T-001"
s2 = "Tier 3 (Worker): T-002"
# Interleaved chunks
events_to_simulate = [
(s1, "T1 start. "),
(s2, "T2 start. "),
(s1, "T1 middle. "),
(s2, "T2 middle. "),
(s1, "T1 end."),
(s2, "T2 end.")
]
for sid, txt in events_to_simulate:
with app_instance._pending_gui_tasks_lock:
app_instance._pending_gui_tasks.append({
"action": "mma_stream_append",
"payload": {"stream_id": sid, "text": txt}
})
app_instance._process_pending_gui_tasks()
assert app_instance.mma_streams[s1] == "T1 start. T1 middle. T1 end."
assert app_instance.mma_streams[s2] == "T2 start. T2 middle. T2 end."
def test_handle_ai_response_resets_stream(app_instance: App):
"""Verifies that the final handle_ai_response (status=done) replaces/finalizes the stream."""
stream_id = "Tier 3 (Worker): T-001"
# Part 1: Some streaming progress
with app_instance._pending_gui_tasks_lock:
app_instance._pending_gui_tasks.append({
"action": "mma_stream_append",
"payload": {"stream_id": stream_id, "text": "Partially streamed..."}
})
app_instance._process_pending_gui_tasks()
assert app_instance.mma_streams[stream_id] == "Partially streamed..."
# Part 2: Final response arrives (full text)
with app_instance._pending_gui_tasks_lock:
app_instance._pending_gui_tasks.append({
"action": "handle_ai_response",
"payload": {
"stream_id": stream_id,
"text": "Final complete response.",
"status": "done"
}
})
app_instance._process_pending_gui_tasks()
# In our current implementation, handle_ai_response OVERWRITES.
# This is good because it ensures we have the exact final text from the model
# (sometimes streaming chunks don't perfectly match final text if there are
# tool calls or specific SDK behaviors).
assert app_instance.mma_streams[stream_id] == "Final complete response."