feat(ai-server): Add AIProxyClient queue communication layer
This commit is contained in:
+10
-3
@@ -11,10 +11,17 @@ from pathlib import Path
|
||||
from typing import Generator, Any
|
||||
from unittest.mock import patch
|
||||
|
||||
# Ensure project root is in path for imports
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
thirdparty_dir = os.path.join(os.path.dirname(__file__), "..", "thirdparty")
|
||||
if thirdparty_dir not in sys.path:
|
||||
sys.path.insert(0, thirdparty_dir)
|
||||
|
||||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||||
if project_root not in sys.path:
|
||||
sys.path.insert(0, project_root)
|
||||
|
||||
from defer.sugar import install
|
||||
install()
|
||||
|
||||
# Import the App class after patching if necessary, but here we just need the type hint
|
||||
from src.gui_2 import App
|
||||
|
||||
class VerificationLogger:
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
import pytest
|
||||
import threading
|
||||
from unittest.mock import MagicMock, patch
|
||||
from src.ai_client_proxy import AIProxyClient
|
||||
|
||||
|
||||
def test_proxy_initialization():
|
||||
proxy = AIProxyClient()
|
||||
assert proxy._status == "disconnected"
|
||||
assert proxy._pending == {}
|
||||
|
||||
|
||||
def test_proxy_status_property():
|
||||
proxy = AIProxyClient()
|
||||
assert proxy.status in ("disconnected", "init", "ready", "busy", "error")
|
||||
|
||||
|
||||
def test_proxy_status_reflects_internal_state():
|
||||
proxy = AIProxyClient()
|
||||
assert proxy.status == "disconnected"
|
||||
proxy._status = "ready"
|
||||
assert proxy.status == "ready"
|
||||
|
||||
|
||||
def test_send_command_without_server_returns_error():
|
||||
proxy = AIProxyClient()
|
||||
proxy._status = "ready"
|
||||
result = proxy.send_command("list_models", {"provider": "gemini"})
|
||||
assert "error" in result
|
||||
|
||||
|
||||
def test_pending_dict_structure():
|
||||
proxy = AIProxyClient()
|
||||
assert isinstance(proxy._pending, dict)
|
||||
assert len(proxy._pending) == 0
|
||||
|
||||
|
||||
def test_stop_when_not_started():
|
||||
proxy = AIProxyClient()
|
||||
proxy.stop()
|
||||
assert proxy._status == "disconnected"
|
||||
assert proxy._process is None
|
||||
Reference in New Issue
Block a user