1bea0d23bf
Tier 2's project-switch fix (commit 455c17ff) was correct but used
'manualslop.toml' (no underscore) instead of 'manual_slop.toml'. The
if Path(workspace_toml).exists() check was False, so the switch was
silently skipped — the subprocess stayed on whatever stale project a
prior test left, and the RAG engine used the wrong base_dir.
Fixing the filename makes the project switch actually fire. The test
now passes 4/4 runs in isolation (6-7s each). The RAG context block
appears in the discussion history as expected.
183 lines
7.5 KiB
Python
183 lines
7.5 KiB
Python
import pytest
|
|
import time
|
|
import sys
|
|
import os
|
|
import json
|
|
import shutil
|
|
from pathlib import Path
|
|
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
|
|
|
|
from src import api_hook_client
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.clean_baseline
|
|
def test_phase4_final_verify(live_gui, live_gui_workspace):
|
|
client = api_hook_client.ApiHookClient()
|
|
assert client.wait_for_server(timeout=15), "Hook server did not start"
|
|
|
|
# PROPER PROJECT CONTEXT (per Tier 1 addendum 3):
|
|
# The session-scoped subprocess may be on a stale project from a prior
|
|
# test (e.g. test_context_sim_live switches to temp_livecontextsim.toml
|
|
# and never switches back). The RAG engine uses active_project_root
|
|
# (derived from active_project_path) as its base_dir, NOT
|
|
# ui_files_base_dir. So hotpatching files/rag_enabled via set_value
|
|
# while active_project_path is stale leaves the RAG engine looking at
|
|
# a dead dir. Switch to the workspace project explicitly (like a user
|
|
# would) before configuring files and RAG.
|
|
# Must use absolute path — the subprocess's CWD is the workspace, so a
|
|
# relative path like 'tests/artifacts/.../manualslop.toml' resolves to
|
|
# the wrong dir from the subprocess's CWD.
|
|
workspace_toml = str((Path(live_gui_workspace) / "manual_slop.toml").resolve())
|
|
if Path(workspace_toml).exists():
|
|
client.push_event("custom_callback", {"callback": "_switch_project", "args": [workspace_toml]})
|
|
switch_status = client.wait_for_project_switch(expected_path=workspace_toml, timeout=30.0)
|
|
if switch_status.get("error"):
|
|
print(f"[VERIFY] WARNING: project switch error: {switch_status.get('error')}")
|
|
|
|
# Use a unique collection name per test invocation. The RAG engine
|
|
# stores its chromadb collection at
|
|
# <base_dir>/.slop_cache/chroma_<collection_name>. The live_gui
|
|
# subprocess holds a Windows file lock on the chroma sqlite file
|
|
# from the prior test (WinError 32), so the rmtree in
|
|
# _validate_collection_dim is a no-op on locked files, leaving the
|
|
# collection with a stale dim (e.g. 3072 from a prior Gemini
|
|
# embedding pass) that breaks subsequent searches (hangs on
|
|
# dim mismatch). A unique name avoids the collision entirely so the
|
|
# dim check is a no-op and the test gets a fresh collection.
|
|
_collection_name = f"test_final_verify_{int(time.time() * 1000)}"
|
|
|
|
# Best-effort cleanup of the workspace's .slop_cache (where the
|
|
# chroma collection actually lives). ignore_errors=True handles
|
|
# WinError 32 from the live_gui subprocess's file lock. This is
|
|
# belt-and-suspenders; the unique collection name above is the
|
|
# primary defense.
|
|
_slop_cache = Path(live_gui_workspace) / ".slop_cache"
|
|
if _slop_cache.exists():
|
|
for col_dir in _slop_cache.iterdir():
|
|
if col_dir.is_dir() and col_dir.name.startswith("chroma_"):
|
|
shutil.rmtree(col_dir, ignore_errors=True)
|
|
|
|
# 1. Setup mock project data
|
|
workspace_dir = live_gui_workspace
|
|
workspace_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create dummy files
|
|
(workspace_dir / "final_test_1.txt").write_text("Manual Slop RAG is great.")
|
|
(workspace_dir / "final_test_2.py").write_text("def test_func():\n return 'Manual Slop RAG result'")
|
|
|
|
try:
|
|
# 2. Configure project through Hook API
|
|
# set_value is async (push_event -> pending_gui_tasks -> render loop).
|
|
# Wait for the files setter to be processed before triggering the RAG
|
|
# sync, otherwise the sync sees self.files == [] and skips the rebuild
|
|
# (RAG sync only triggers the rebuild if both is_empty() AND self.files
|
|
# are truthy). The test was passing in 4d2a6666 because of timing;
|
|
# the project switch added latency, so the race is now exposed.
|
|
client.set_value('rag_collection_name', _collection_name)
|
|
client.set_value('files', ['final_test_1.txt', 'final_test_2.py'])
|
|
for _ in range(50):
|
|
if client.get_value('files') == ['final_test_1.txt', 'final_test_2.py']:
|
|
break
|
|
time.sleep(0.1)
|
|
client.set_value('rag_enabled', True)
|
|
client.set_value('rag_source', 'chroma')
|
|
client.set_value('rag_emb_provider', 'local')
|
|
client.set_value('auto_add_history', True)
|
|
client.set_value('current_provider', 'gemini_cli')
|
|
client.set_value('gcli_path', os.path.abspath(os.path.join(os.path.dirname(__file__), "mock_gcli.bat")))
|
|
|
|
time.sleep(1.5)
|
|
# Wait for settings to apply and engine to sync
|
|
success = False
|
|
for _ in range(100):
|
|
if client.get_value('rag_emb_provider') == 'local' and client.get_value('rag_status') == 'ready':
|
|
success = True
|
|
break
|
|
time.sleep(0.5)
|
|
assert success, f"RAG sync failed. Status: {client.get_value('rag_status')}"
|
|
|
|
# 3. Trigger Initial Indexing
|
|
print("[VERIFY] Triggering indexing...")
|
|
client.click('btn_rebuild_rag_index')
|
|
|
|
# Wait for ready
|
|
success = False
|
|
for _ in range(50):
|
|
status = client.get_value('rag_status')
|
|
if status == 'ready':
|
|
success = True
|
|
break
|
|
time.sleep(0.5)
|
|
assert success, f"Indexing failed. Status: {status}"
|
|
|
|
# 4. Verify Retrieval and Visualization
|
|
print("[VERIFY] Triggering retrieval turn...")
|
|
client.set_value('ai_input', "What makes RAG great?")
|
|
client.click('btn_gen_send')
|
|
|
|
# Wait for completion
|
|
success = False
|
|
print("[VERIFY] Polling for completion...")
|
|
for i in range(100):
|
|
state = client.get_gui_state()
|
|
status = state.get('ai_status')
|
|
if i % 10 == 0:
|
|
print(f"[VERIFY] Poll {i}, status: {status}")
|
|
if status == 'done':
|
|
success = True
|
|
break
|
|
if status and "error" in status.lower():
|
|
print(f"[VERIFY] Error detected: {status}")
|
|
break
|
|
time.sleep(0.5)
|
|
if not success:
|
|
print(f"[VERIFY] Timeout! Final status: {status}")
|
|
assert success, f"AI request timed out or failed. Status: {status}"
|
|
|
|
# 5. Verify discussion history has the context. After 'done' fires,
|
|
# poll entries separately because the User entry with RAG context
|
|
# injection may take an additional render frame to land in history
|
|
# (race condition exposed in batched live_gui context).
|
|
found_rag = False
|
|
for j in range(20):
|
|
session = client.get_session()
|
|
entries = session.get('session', {}).get('entries', [])
|
|
for entry in entries:
|
|
if entry.get('role') == 'User' and '## Retrieved Context' in entry.get('content', ''):
|
|
found_rag = True
|
|
content = entry.get('content', '')
|
|
print(f"[VERIFY] Found RAG context (poll {j}): {content[:100]}...")
|
|
# Accept either file's content as proof RAG retrieved something.
|
|
# The original test asserted only the .txt content, but the .py file
|
|
# ("Manual Slop RAG result") can rank first in batched context
|
|
# depending on prior chroma state. Either file's content proves
|
|
# RAG retrieval worked.
|
|
assert ("Manual Slop RAG is great" in content
|
|
or "Manual Slop RAG result" in content), (
|
|
f"Expected either 'Manual Slop RAG is great' or 'Manual Slop RAG result' in retrieved context, got: {content[:200]}"
|
|
)
|
|
break
|
|
if found_rag:
|
|
break
|
|
time.sleep(0.5)
|
|
assert found_rag, "RAG context not found in history"
|
|
|
|
# 6. Verify Incremental Indexing (no changes)
|
|
print("[VERIFY] Verifying incrementality...")
|
|
start = time.time()
|
|
client.click('btn_rebuild_rag_index')
|
|
for _ in range(50):
|
|
if client.get_value('rag_status') == 'ready': break
|
|
time.sleep(0.1)
|
|
duration = time.time() - start
|
|
print(f"[VERIFY] Incremental indexing took {duration:.2f}s")
|
|
assert duration < 1.0, "Incremental indexing too slow (expected < 1s for 2 files)"
|
|
|
|
print("[VERIFY] Phase 4 final verification COMPLETED successfully.")
|
|
|
|
except Exception as e:
|
|
print(f"[VERIFY] ERROR in final verification: {e}")
|
|
raise
|