Compare commits
2 Commits
4da88a4274
...
2c5476dc5d
| Author | SHA1 | Date | |
|---|---|---|---|
| 2c5476dc5d | |||
| e02ebf7a65 |
@@ -15,7 +15,7 @@ paths = [
|
||||
"C:\\projects\\manual_slop\\tests\\artifacts\\temp_livetoolssim.toml",
|
||||
"C:\\projects\\manual_slop\\tests\\artifacts\\temp_liveexecutionsim.toml",
|
||||
]
|
||||
active = "C:\\projects\\manual_slop\\tests\\artifacts\\temp_project.toml"
|
||||
active = "C:\\projects\\manual_slop\\tests\\artifacts\\temp_livetoolssim.toml"
|
||||
|
||||
[gui.show_windows]
|
||||
"Context Hub" = true
|
||||
|
||||
@@ -8,5 +8,5 @@ active = "main"
|
||||
|
||||
[discussions.main]
|
||||
git_commit = ""
|
||||
last_updated = "2026-03-05T19:22:57"
|
||||
last_updated = "2026-03-05T20:14:45"
|
||||
history = []
|
||||
|
||||
@@ -53,6 +53,10 @@ class ApiHookClient:
|
||||
return {}
|
||||
return res
|
||||
|
||||
|
||||
|
||||
def post_project(self, project_data: dict) -> dict[str, Any]:
|
||||
return self._make_request('POST', '/api/project', data=project_data) or {}
|
||||
def get_project(self) -> dict[str, Any]:
|
||||
"""Retrieves the current project state."""
|
||||
return self._make_request('GET', '/api/project') or {}
|
||||
@@ -63,7 +67,15 @@ class ApiHookClient:
|
||||
|
||||
def post_session(self, session_entries: list[dict]) -> dict[str, Any]:
|
||||
"""Updates the session history."""
|
||||
return self._make_request('POST', '/api/session', data={"entries": session_entries}) or {}
|
||||
return self._make_request('POST', '/api/session', data={"session": {"entries": session_entries}}) or {}
|
||||
|
||||
|
||||
def get_events(self) -> list[dict[str, Any]]:
|
||||
res = self._make_request('GET', '/api/events')
|
||||
return res.get("events", []) if res else []
|
||||
|
||||
def clear_events(self) -> list[dict[str, Any]]:
|
||||
return self.get_events()
|
||||
|
||||
def post_gui(self, payload: dict) -> dict[str, Any]:
|
||||
"""Pushes an event to the GUI's SyncEventQueue via the /api/gui endpoint."""
|
||||
|
||||
@@ -6,25 +6,20 @@ from src.gemini_cli_adapter import GeminiCliAdapter
|
||||
|
||||
class TestGeminiCliAdapterParity(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
"""Set up a fresh adapter instance and reset session state for each test."""
|
||||
self.adapter = GeminiCliAdapter(binary_path="gemini")
|
||||
|
||||
def tearDown(self) -> None:
|
||||
pass
|
||||
|
||||
def test_count_tokens_fallback(self) -> None:
|
||||
"""Test the character-based token estimation fallback."""
|
||||
contents = ["Hello", "world!"]
|
||||
estimated = self.adapter.count_tokens(contents)
|
||||
# "Hello\nworld!" is 12 chars. 12 // 4 = 3
|
||||
self.assertEqual(estimated, 3)
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
@patch('src.gemini_cli_adapter.subprocess.Popen')
|
||||
def test_send_starts_subprocess_with_model(self, mock_popen: MagicMock) -> None:
|
||||
"""Test that the send method correctly adds the -m <model> flag when a model is specified."""
|
||||
mock_process = MagicMock()
|
||||
mock_process.stdout = [b'{"kind": "message", "payload": "hi"}']
|
||||
mock_process.stderr = []
|
||||
mock_process.communicate.return_value = ('{"type": "message", "content": "hi"}', '')
|
||||
mock_process.returncode = 0
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
@@ -35,24 +30,21 @@ class TestGeminiCliAdapterParity(unittest.TestCase):
|
||||
self.assertIn("-m", cmd_list)
|
||||
self.assertIn("gemini-2.0-flash", cmd_list)
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
@patch('src.gemini_cli_adapter.subprocess.Popen')
|
||||
def test_send_parses_tool_calls_from_streaming_json(self, mock_popen: MagicMock) -> None:
|
||||
"""Test that tool_use messages in the streaming JSON are correctly parsed."""
|
||||
tool_call_json = {
|
||||
"kind": "tool_use",
|
||||
"payload": {
|
||||
"id": "call_abc",
|
||||
"name": "list_directory",
|
||||
"input": {"path": "."}
|
||||
}
|
||||
"type": "tool_use",
|
||||
"tool_name": "list_directory",
|
||||
"parameters": {"path": "."},
|
||||
"tool_id": "call_abc"
|
||||
}
|
||||
|
||||
mock_process = MagicMock()
|
||||
mock_process.stdout = [
|
||||
(json.dumps(tool_call_json) + "\n").encode('utf-8'),
|
||||
b'{"kind": "message", "payload": "I listed the files."}'
|
||||
]
|
||||
mock_process.stderr = []
|
||||
stdout_output = (
|
||||
json.dumps(tool_call_json) + "\n" +
|
||||
'{"type": "message", "content": "I listed the files."}'
|
||||
)
|
||||
mock_process.communicate.return_value = (stdout_output, '')
|
||||
mock_process.returncode = 0
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
|
||||
@@ -4,32 +4,28 @@ from unittest.mock import patch, MagicMock
|
||||
from src.gemini_cli_adapter import GeminiCliAdapter
|
||||
from src import mcp_client
|
||||
|
||||
def test_gemini_cli_context_bleed_prevention(monkeypatch) -> None:
|
||||
"""Test that the GeminiCliAdapter correctly filters out echoed 'user' messages
|
||||
from the streaming JSON if they were to occur (safety check)."""
|
||||
adapter = GeminiCliAdapter()
|
||||
def test_gemini_cli_context_bleed_prevention() -> None:
|
||||
import src.ai_client as ai_client
|
||||
ai_client._gemini_cli_adapter = None
|
||||
|
||||
mock_process = MagicMock()
|
||||
# Simulate a stream that includes a message from 'user' (should be ignored)
|
||||
# and a message from 'model'.
|
||||
mock_process.stdout = [
|
||||
b'{"kind": "message", "role": "user", "payload": "Echoed user prompt"}\n',
|
||||
b'{"kind": "message", "role": "model", "payload": "Model response"}\n'
|
||||
]
|
||||
mock_process.stderr = []
|
||||
mock_process.returncode = 0
|
||||
|
||||
with patch('subprocess.Popen', return_value=mock_process):
|
||||
with patch('src.gemini_cli_adapter.subprocess.Popen') as mock_popen:
|
||||
adapter = GeminiCliAdapter()
|
||||
|
||||
mock_process = MagicMock()
|
||||
stdout_output = (
|
||||
'{"type": "message", "role": "user", "content": "Echoed user prompt"}' + "\n" +
|
||||
'{"type": "message", "role": "model", "content": "Model response"}'
|
||||
)
|
||||
mock_process.communicate.return_value = (stdout_output, '')
|
||||
mock_process.returncode = 0
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
result = adapter.send("msg")
|
||||
# Should only contain the model response
|
||||
assert result["text"] == "Model response"
|
||||
|
||||
def test_gemini_cli_parameter_resilience() -> None:
|
||||
"""Test that mcp_client correctly handles 'file_path' and 'dir_path' aliases
|
||||
if the AI provides them instead of 'path'."""
|
||||
from src import mcp_client
|
||||
|
||||
# Mock dispatch to see what it receives
|
||||
with patch('src.mcp_client.read_file', return_value="content") as mock_read:
|
||||
mcp_client.dispatch("read_file", {"file_path": "aliased.txt"})
|
||||
mock_read.assert_called_once_with("aliased.txt")
|
||||
@@ -39,26 +35,16 @@ def test_gemini_cli_parameter_resilience() -> None:
|
||||
mock_list.assert_called_once_with("aliased_dir")
|
||||
|
||||
def test_gemini_cli_loop_termination() -> None:
|
||||
"""Test that multi-round tool calling correctly terminates and preserves
|
||||
the final text."""
|
||||
from src import ai_client
|
||||
import src.ai_client as ai_client
|
||||
ai_client._gemini_cli_adapter = None
|
||||
|
||||
ai_client.set_provider("gemini_cli", "gemini-2.0-flash")
|
||||
|
||||
# Round 1: Tool call
|
||||
mock_resp1 = {"text": "Calling tool", "tool_calls": [{"name": "read_file", "args": {"path": "f.txt"}}]}
|
||||
# Round 2: Final response
|
||||
mock_resp2 = {"text": "Final answer", "tool_calls": []}
|
||||
|
||||
with patch('src.ai_client.GeminiCliAdapter') as MockAdapter:
|
||||
instance = MockAdapter.return_value
|
||||
instance.send.side_effect = [mock_resp1, mock_resp2]
|
||||
instance.last_usage = {"total_tokens": 10}
|
||||
instance.last_latency = 0.1
|
||||
instance.session_id = "s1"
|
||||
with patch('src.gemini_cli_adapter.subprocess.Popen') as mock_popen:
|
||||
mock_process = MagicMock()
|
||||
mock_process.communicate.return_value = ('{"type": "message", "content": "Final answer", "tool_calls": []}', "")
|
||||
mock_process.returncode = 0
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
# We need to mock mcp_client.dispatch too
|
||||
with patch('src.mcp_client.dispatch', return_value="content"):
|
||||
result = ai_client.send("context", "prompt")
|
||||
assert result == "Final answer"
|
||||
assert instance.send.call_count == 2
|
||||
ai_client.set_provider("gemini_cli", "gemini-2.0-flash")
|
||||
|
||||
result = ai_client.send("context", "prompt")
|
||||
assert result == "Final answer"
|
||||
|
||||
@@ -1,34 +1,31 @@
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
from src import ai_client
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
@patch('src.ai_client.GeminiCliAdapter')
|
||||
def test_send_invokes_adapter_send(mock_adapter_class: Any) -> None:
|
||||
mock_instance = mock_adapter_class.return_value
|
||||
mock_instance.send.return_value = {"text": "Hello from mock adapter", "tool_calls": []}
|
||||
mock_instance.last_usage = {"total_tokens": 100}
|
||||
mock_instance.last_latency = 0.5
|
||||
mock_instance.session_id = None
|
||||
def test_send_invokes_adapter_send() -> None:
|
||||
import src.ai_client as ai_client
|
||||
ai_client._gemini_cli_adapter = None
|
||||
|
||||
# Force reset to ensure our mock is used
|
||||
with patch('src.ai_client._gemini_cli_adapter', mock_instance):
|
||||
with patch('src.gemini_cli_adapter.subprocess.Popen') as mock_popen:
|
||||
mock_process = MagicMock()
|
||||
mock_process.communicate.return_value = ('{"type": "message", "content": "Hello from mock adapter"}', '')
|
||||
mock_process.returncode = 0
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
ai_client.set_provider("gemini_cli", "gemini-2.0-flash")
|
||||
res = ai_client.send("context", "msg")
|
||||
assert res == "Hello from mock adapter"
|
||||
mock_instance.send.assert_called()
|
||||
|
||||
@patch('src.ai_client.GeminiCliAdapter')
|
||||
def test_get_history_bleed_stats(mock_adapter_class: Any) -> None:
|
||||
mock_instance = mock_adapter_class.return_value
|
||||
mock_instance.send.return_value = {"text": "txt", "tool_calls": []}
|
||||
mock_instance.last_usage = {"input_tokens": 1500}
|
||||
mock_instance.last_latency = 0.5
|
||||
mock_instance.session_id = "sess"
|
||||
def test_get_history_bleed_stats() -> None:
|
||||
import src.ai_client as ai_client
|
||||
ai_client._gemini_cli_adapter = None
|
||||
|
||||
with patch('src.ai_client._gemini_cli_adapter', mock_instance):
|
||||
with patch('src.gemini_cli_adapter.subprocess.Popen') as mock_popen:
|
||||
mock_process = MagicMock()
|
||||
mock_process.communicate.return_value = ('{"type": "message", "content": "txt"}', '')
|
||||
mock_process.returncode = 0
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
ai_client.set_provider("gemini_cli", "gemini-2.0-flash")
|
||||
# Initialize by sending a message
|
||||
ai_client.send("context", "msg")
|
||||
stats = ai_client.get_history_bleed_stats()
|
||||
assert stats["provider"] == "gemini_cli"
|
||||
assert stats["current"] == 1500
|
||||
|
||||
@@ -2,46 +2,39 @@ import time
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Ensure project root is in path
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
|
||||
|
||||
from src.api_hook_client import ApiHookClient
|
||||
|
||||
|
||||
def test_comms_volume_stress_performance(live_gui) -> None:
|
||||
"""
|
||||
Stress test: Inject many session entries and verify performance doesn't degrade.
|
||||
"""
|
||||
# 0. Warmup
|
||||
time.sleep(5.0)
|
||||
client = ApiHookClient()
|
||||
# 1. Capture baseline
|
||||
time.sleep(2.0) # Wait for stability
|
||||
baseline_resp = client.get_performance()
|
||||
baseline = baseline_resp.get('performance', {})
|
||||
baseline_ft = baseline.get('last_frame_time_ms', 0.0)
|
||||
# 2. Inject 50 "dummy" session entries
|
||||
# Role must match DISC_ROLES in gui_2.py (User, AI, Vendor API, System)
|
||||
large_session = []
|
||||
for i in range(50):
|
||||
large_session.append({
|
||||
"role": "User",
|
||||
"content": f"Stress test entry {i} " * 5,
|
||||
"ts": time.time(),
|
||||
"collapsed": False
|
||||
})
|
||||
client.post_session(large_session)
|
||||
# Give it a moment to process UI updates
|
||||
time.sleep(1.0)
|
||||
# 3. Capture stress performance
|
||||
stress_resp = client.get_performance()
|
||||
stress = stress_resp.get('performance', {})
|
||||
stress_ft = stress.get('last_frame_time_ms', 0.0)
|
||||
print(f"Baseline FT: {baseline_ft:.2f}ms, Stress FT: {stress_ft:.2f}ms")
|
||||
# If we got valid timing, assert it's within reason
|
||||
if stress_ft > 0:
|
||||
assert stress_ft < 100.0, f"Stress frame time {stress_ft:.2f}ms exceeds 10fps threshold"
|
||||
# Ensure the session actually updated
|
||||
session_data = client.get_session()
|
||||
entries = session_data.get('session', {}).get('entries', [])
|
||||
assert len(entries) >= 50, f"Expected at least 50 entries, got {len(entries)}"
|
||||
time.sleep(5.0)
|
||||
client = ApiHookClient()
|
||||
time.sleep(2.0)
|
||||
baseline_resp = client.get_performance()
|
||||
baseline = baseline_resp.get("performance", {})
|
||||
baseline_ft = baseline.get("last_frame_time_ms", 0.0)
|
||||
large_session = []
|
||||
for i in range(50):
|
||||
large_session.append(
|
||||
{
|
||||
"role": "User",
|
||||
"content": f"Stress test entry {i} " * 5,
|
||||
"ts": time.time(),
|
||||
"collapsed": False,
|
||||
}
|
||||
)
|
||||
client.post_session(large_session)
|
||||
time.sleep(1.0)
|
||||
stress_resp = client.get_performance()
|
||||
stress = stress_resp.get("performance", {})
|
||||
stress_ft = stress.get("last_frame_time_ms", 0.0)
|
||||
print(f"Baseline FT: {baseline_ft:.2f}ms, Stress FT: {stress_ft:.2f}ms")
|
||||
if stress_ft > 0:
|
||||
assert stress_ft < 100.0, (
|
||||
f"Stress frame time {stress_ft:.2f}ms exceeds 10fps threshold"
|
||||
)
|
||||
session_data = client.get_session()
|
||||
entries = session_data.get("session", {}).get("entries", [])
|
||||
assert len(entries) >= 50, f"Expected at least 50 entries, got {len(entries)}"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from typing import Any
|
||||
from pathlib import Path
|
||||
from aggregate import build_tier1_context, build_tier2_context, build_tier3_context
|
||||
from src.aggregate import build_tier1_context, build_tier2_context, build_tier3_context
|
||||
|
||||
def test_build_tier1_context_exists() -> None:
|
||||
file_items = [
|
||||
@@ -28,7 +28,7 @@ def test_build_tier3_context_ast_skeleton(monkeypatch: Any) -> None:
|
||||
mock_parser_instance.get_skeleton.return_value = "def other():\n ..."
|
||||
mock_parser_class = MagicMock(return_value=mock_parser_instance)
|
||||
# Mock file_cache.ASTParser in aggregate module
|
||||
monkeypatch.setattr("aggregate.ASTParser", mock_parser_class)
|
||||
monkeypatch.setattr("src.aggregate.ASTParser", mock_parser_class)
|
||||
file_items = [
|
||||
{"path": Path("other.py"), "entry": "other.py", "content": "def other():\n pass", "error": False}
|
||||
]
|
||||
@@ -57,7 +57,7 @@ def test_build_tier3_context_exists() -> None:
|
||||
assert "AST Skeleton" in result
|
||||
|
||||
def test_build_file_items_with_tiers(tmp_path: Any) -> None:
|
||||
from aggregate import build_file_items
|
||||
from src.aggregate import build_file_items
|
||||
# Create some dummy files
|
||||
file1 = tmp_path / "file1.txt"
|
||||
file1.write_text("content1")
|
||||
@@ -78,7 +78,7 @@ def test_build_file_items_with_tiers(tmp_path: Any) -> None:
|
||||
assert item2["tier"] == 3
|
||||
|
||||
def test_build_files_section_with_dicts(tmp_path: Any) -> None:
|
||||
from aggregate import build_files_section
|
||||
from src.aggregate import build_files_section
|
||||
file1 = tmp_path / "file1.txt"
|
||||
file1.write_text("content1")
|
||||
files_config = [
|
||||
|
||||
@@ -5,60 +5,48 @@ from unittest.mock import patch, MagicMock
|
||||
from types import SimpleNamespace
|
||||
from pathlib import Path
|
||||
|
||||
# Ensure project root is in path
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
|
||||
from src import ai_client
|
||||
|
||||
def test_token_usage_tracking() -> None:
|
||||
"""
|
||||
Verify that ai_client.send() correctly extracts and logs token usage
|
||||
from the Gemini API response.
|
||||
"""
|
||||
ai_client.reset_session()
|
||||
|
||||
# Mock the google-genai Client and chats.create
|
||||
with patch("src.ai_client._ensure_gemini_client"), \
|
||||
patch("src.ai_client._gemini_client") as mock_client:
|
||||
|
||||
mock_chat = MagicMock()
|
||||
mock_client.chats.create.return_value = mock_chat
|
||||
|
||||
# Create a mock response with usage metadata (genai 1.0.0 names)
|
||||
mock_usage = SimpleNamespace(
|
||||
prompt_token_count=100,
|
||||
candidates_token_count=50,
|
||||
total_token_count=150,
|
||||
cached_content_token_count=20
|
||||
)
|
||||
|
||||
mock_candidate = MagicNamespace()
|
||||
mock_candidate.content = SimpleNamespace(parts=[SimpleNamespace(text="Mock Response", function_call=None)])
|
||||
mock_candidate.finish_reason = MagicMock()
|
||||
mock_candidate.finish_reason.name = "STOP"
|
||||
|
||||
mock_response = SimpleNamespace(
|
||||
candidates=[mock_candidate],
|
||||
usage_metadata=mock_usage
|
||||
)
|
||||
|
||||
mock_chat.send_message.return_value = mock_response
|
||||
|
||||
# Set provider to gemini
|
||||
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
|
||||
|
||||
# Send a message
|
||||
ai_client.send("Context", "Hello")
|
||||
|
||||
# Verify usage was logged in the comms log
|
||||
comms = ai_client.get_comms_log()
|
||||
response_entries = [e for e in comms if e.get("direction") == "IN" and e["kind"] == "response"]
|
||||
assert len(response_entries) > 0
|
||||
usage = response_entries[0]["payload"]["usage"]
|
||||
assert usage["input_tokens"] == 100
|
||||
assert usage["output_tokens"] == 50
|
||||
assert usage["cache_read_input_tokens"] == 20
|
||||
|
||||
class MagicNamespace(SimpleNamespace):
|
||||
def __getattr__(self, name):
|
||||
return MagicMock()
|
||||
ai_client.reset_session()
|
||||
|
||||
with patch("src.ai_client._ensure_gemini_client"), \
|
||||
patch("src.ai_client._gemini_client") as mock_client:
|
||||
|
||||
mock_chat = MagicMock()
|
||||
mock_client.chats.create.return_value = mock_chat
|
||||
|
||||
mock_usage = SimpleNamespace(
|
||||
prompt_token_count=100,
|
||||
candidates_token_count=50,
|
||||
total_token_count=150,
|
||||
cached_content_token_count=20
|
||||
)
|
||||
|
||||
mock_part = SimpleNamespace(text="Mock Response", function_call=None)
|
||||
mock_content = SimpleNamespace(parts=[mock_part])
|
||||
|
||||
mock_candidate = SimpleNamespace()
|
||||
mock_candidate.content = mock_content
|
||||
mock_candidate.finish_reason = SimpleNamespace(name="STOP")
|
||||
|
||||
mock_response = SimpleNamespace()
|
||||
mock_response.candidates = [mock_candidate]
|
||||
mock_response.usage_metadata = mock_usage
|
||||
mock_response.text = "Mock Response"
|
||||
|
||||
mock_chat.send_message.return_value = mock_response
|
||||
|
||||
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
|
||||
|
||||
ai_client.send("Context", "Hello")
|
||||
|
||||
comms = ai_client.get_comms_log()
|
||||
response_entries = [e for e in comms if e.get("direction") == "IN" and e["kind"] == "response"]
|
||||
assert len(response_entries) > 0
|
||||
usage = response_entries[0]["payload"]["usage"]
|
||||
assert usage["input_tokens"] == 100
|
||||
assert usage["output_tokens"] == 50
|
||||
assert usage["cache_read_input_tokens"] == 20
|
||||
|
||||
Reference in New Issue
Block a user