e9fa69ddc1
Phase 5 of any_type_componentization_20260621. Promotes the WebSocket
broadcast signature in src/api_hooks.py from (channel, payload: dict) to
a typed WebSocketMessage dataclass (16 Any sites):
NEW dataclass (inline in src/api_hooks.py):
- WebSocketMessage (frozen=True): channel: str, payload: JsonValue
MODIFIED:
- _serialize_for_api(obj: Any) -> JsonValue (typed return)
- broadcast(channel: str, payload: dict[str, Any]) -> broadcast(message: WebSocketMessage)
- _get_app_attr / _set_app_attr signatures UNCHANGED (Pattern 4 preserved)
NEW tests/test_api_hooks_dataclasses.py (12 tests, all pass):
- test_websocket_message_construction
- test_websocket_message_with_list_payload
- test_websocket_message_with_nested_payload
- test_websocket_message_is_frozen
- test_websocket_message_to_json
- test_serialize_for_api_returns_dict_for_to_dict_object
- test_serialize_for_api_handles_nested_lists
- test_serialize_for_api_handles_purepath
- test_serialize_for_api_passthrough_for_primitives
- test_serialize_for_api_handles_mixed_nesting
- test_get_app_attr_signature_preserved (Pattern 4 invariant)
- test_set_app_attr_signature_preserved (Pattern 4 invariant)
MODIFIED tests/test_websocket_server.py:
- Updated broadcast() call site to use WebSocketMessage(channel=..., payload=...)
- Added WebSocketMessage import
Verified:
uv run pytest tests/test_api_hooks_dataclasses.py tests/test_api_hooks_warmup.py tests/test_websocket_server.py --timeout=30
23 passed in 5.03s (12 new + 10 existing + 1 websocket)
45 lines
1.3 KiB
Python
45 lines
1.3 KiB
Python
import pytest
|
|
import asyncio
|
|
import json
|
|
import websockets
|
|
from src.api_hooks import WebSocketMessage, WebSocketServer
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_subscription_and_broadcast():
|
|
# Mock app
|
|
app = type("MockApp", (), {"test_hooks_enabled": True})()
|
|
|
|
# Start server on a specific port
|
|
port = 9005
|
|
server = WebSocketServer(app, port=port)
|
|
server.start()
|
|
|
|
# Wait for server to start
|
|
await asyncio.sleep(0.5)
|
|
|
|
try:
|
|
uri = f"ws://127.0.0.1:{server.port}"
|
|
async with websockets.connect(uri) as websocket:
|
|
# Subscribe to events channel
|
|
subscribe_msg = {"action": "subscribe", "channel": "events"}
|
|
await websocket.send(json.dumps(subscribe_msg))
|
|
|
|
# Receive confirmation
|
|
response = await websocket.recv()
|
|
data = json.loads(response)
|
|
assert data["type"] == "subscription_confirmed"
|
|
assert data["channel"] == "events"
|
|
|
|
# Broadcast an event from the server
|
|
event_payload = {"event": "test_event", "data": "hello"}
|
|
server.broadcast(WebSocketMessage(channel="events", payload=event_payload))
|
|
|
|
# Receive the broadcast
|
|
broadcast_response = await websocket.recv()
|
|
broadcast_data = json.loads(broadcast_response)
|
|
assert broadcast_data["channel"] == "events"
|
|
assert broadcast_data["payload"] == event_payload
|
|
|
|
finally:
|
|
server.stop()
|