fix(conductor): Apply review suggestions for track 'event_driven_metrics_20260223'

This commit is contained in:
2026-02-23 16:45:34 -05:00
parent eaaf09dc3c
commit 66f728e7a3
4 changed files with 55 additions and 23 deletions

View File

@@ -1,14 +1,37 @@
"""
Decoupled event emission system for cross-module communication.
"""
from typing import Callable, Any, Dict, List
class EventEmitter:
"""
Simple event emitter for decoupled communication between modules.
"""
def __init__(self):
self._listeners = {}
"""Initializes the EventEmitter with an empty listener map."""
self._listeners: Dict[str, List[Callable]] = {}
def on(self, event_name, callback):
def on(self, event_name: str, callback: Callable):
"""
Registers a callback for a specific event.
Args:
event_name: The name of the event to listen for.
callback: The function to call when the event is emitted.
"""
if event_name not in self._listeners:
self._listeners[event_name] = []
self._listeners[event_name].append(callback)
def emit(self, event_name, *args, **kwargs):
def emit(self, event_name: str, *args: Any, **kwargs: Any):
"""
Emits an event, calling all registered callbacks.
Args:
event_name: The name of the event to emit.
*args: Positional arguments to pass to callbacks.
**kwargs: Keyword arguments to pass to callbacks.
"""
if event_name in self._listeners:
for callback in self._listeners[event_name]:
callback(*args, **kwargs)