Files
manual_slop/src/events.py
Ed_ 35480a26dc test(audit): fix critical test suite deadlocks and write exhaustive architectural report
- Fix 'Triple Bingo' history synchronization explosion during streaming

- Implement stateless event buffering in ApiHookClient to prevent dropped events

- Ensure 'tool_execution' events emit consistently across all LLM providers

- Add hard timeouts to all background thread wait() conditions

- Add thorough teardown cleanup to conftest.py's reset_ai_client fixture

- Write highly detailed report_gemini.md exposing asyncio lifecycle flaws
2026-03-05 01:42:47 -05:00

101 lines
2.9 KiB
Python

"""
Decoupled event emission system for cross-module communication.
"""
import asyncio
from typing import Callable, Any, Dict, List, Tuple
class EventEmitter:
"""
Simple event emitter for decoupled communication between modules.
"""
def __init__(self) -> None:
"""Initializes the EventEmitter with an empty listener map."""
self._listeners: Dict[str, List[Callable[..., Any]]] = {}
def on(self, event_name: str, callback: Callable[..., Any]) -> None:
"""
Registers a callback for a specific event.
Args:
event_name: The name of the event to listen for.
callback: The function to call when the event is emitted.
"""
if event_name not in self._listeners:
self._listeners[event_name] = []
self._listeners[event_name].append(callback)
def emit(self, event_name: str, *args: Any, **kwargs: Any) -> None:
"""
Emits an event, calling all registered callbacks.
Args:
event_name: The name of the event to emit.
*args: Positional arguments to pass to callbacks.
**kwargs: Keyword arguments to pass to callbacks.
"""
if event_name in self._listeners:
for callback in self._listeners[event_name]:
callback(*args, **kwargs)
def clear(self) -> None:
"""Clears all registered listeners."""
self._listeners.clear()
class AsyncEventQueue:
"""
Asynchronous event queue for decoupled communication using asyncio.Queue.
"""
def __init__(self) -> None:
"""Initializes the AsyncEventQueue with an internal asyncio.Queue."""
self._queue: asyncio.Queue[Tuple[str, Any]] = asyncio.Queue()
async def put(self, event_name: str, payload: Any = None) -> None:
"""
Puts an event into the queue.
Args:
event_name: The name of the event.
payload: Optional data associated with the event.
"""
await self._queue.put((event_name, payload))
async def get(self) -> Tuple[str, Any]:
"""
Gets an event from the queue.
Returns:
A tuple containing (event_name, payload).
"""
return await self._queue.get()
def task_done(self) -> None:
"""Signals that a formerly enqueued task is complete."""
self._queue.task_done()
async def join(self) -> None:
"""Blocks until all items in the queue have been gotten and processed."""
await self._queue.join()
class UserRequestEvent:
"""
Payload for a user request event.
"""
def __init__(self, prompt: str, stable_md: str, file_items: List[Any], disc_text: str, base_dir: str) -> None:
self.prompt = prompt
self.stable_md = stable_md
self.file_items = file_items
self.disc_text = disc_text
self.base_dir = base_dir
def to_dict(self) -> Dict[str, Any]:
return {
"prompt": self.prompt,
"stable_md": self.stable_md,
"file_items": self.file_items,
"disc_text": self.disc_text,
"base_dir": self.base_dir
}