""" Decoupled event emission system for cross-module communication. """ import asyncio from typing import Callable, Any, Dict, List, Tuple class EventEmitter: """ Simple event emitter for decoupled communication between modules. """ def __init__(self): """Initializes the EventEmitter with an empty listener map.""" self._listeners: Dict[str, List[Callable]] = {} def on(self, event_name: str, callback: Callable): """ Registers a callback for a specific event. Args: event_name: The name of the event to listen for. callback: The function to call when the event is emitted. """ if event_name not in self._listeners: self._listeners[event_name] = [] self._listeners[event_name].append(callback) def emit(self, event_name: str, *args: Any, **kwargs: Any): """ Emits an event, calling all registered callbacks. Args: event_name: The name of the event to emit. *args: Positional arguments to pass to callbacks. **kwargs: Keyword arguments to pass to callbacks. """ if event_name in self._listeners: for callback in self._listeners[event_name]: callback(*args, **kwargs) class AsyncEventQueue: """ Asynchronous event queue for decoupled communication using asyncio.Queue. """ def __init__(self): """Initializes the AsyncEventQueue with an internal asyncio.Queue.""" self._queue: asyncio.Queue = asyncio.Queue() async def put(self, event_name: str, payload: Any = None): """ Puts an event into the queue. Args: event_name: The name of the event. payload: Optional data associated with the event. """ await self._queue.put((event_name, payload)) async def get(self) -> Tuple[str, Any]: """ Gets an event from the queue. Returns: A tuple containing (event_name, payload). """ return await self._queue.get()