Files
manual_slop/tests/test_async_events.py
Ed_ 60396f03f8 refactor(types): auto -> None sweep across entire codebase
Applied 236 return type annotations to functions with no return values
across 100+ files (core modules, tests, scripts, simulations).
Added Phase 4 to python_style_refactor track for remaining 597 items
(untyped params, vars, and functions with return values).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 11:16:56 -05:00

43 lines
1.2 KiB
Python

import asyncio
import pytest
from events import AsyncEventQueue
def test_async_event_queue_put_get() -> None:
"""Verify that an event can be asynchronously put and retrieved from the queue."""
async def run_test():
queue = AsyncEventQueue()
event_name = "test_event"
payload = {"data": "hello"}
await queue.put(event_name, payload)
ret_name, ret_payload = await queue.get()
assert ret_name == event_name
assert ret_payload == payload
asyncio.run(run_test())
def test_async_event_queue_multiple() -> None:
"""Verify that multiple events can be asynchronously put and retrieved in order."""
async def run_test():
queue = AsyncEventQueue()
await queue.put("event1", 1)
await queue.put("event2", 2)
name1, val1 = await queue.get()
name2, val2 = await queue.get()
assert name1 == "event1"
assert val1 == 1
assert name2 == "event2"
assert val2 == 2
asyncio.run(run_test())
def test_async_event_queue_none_payload() -> None:
"""Verify that an event with None payload works correctly."""
async def run_test():
queue = AsyncEventQueue()
await queue.put("no_payload")
name, payload = await queue.get()
assert name == "no_payload"
assert payload is None
asyncio.run(run_test())