Module agents.event_emitter
Classes
class EventEmitter-
Expand source code
class EventEmitter(Generic[T]): """A generic synchronous event emitter that supports registering, removing, and invoking event handlers.""" def __init__(self) -> None: self._handlers: Dict[T, List[Callable[..., Any]]] = {} def on( self, event: T, callback: Callable[..., Any] | None = None ) -> Callable[..., Any]: """Register a synchronous handler for an event. Can be used directly or as a decorator.""" def register(handler: Callable[..., Any]) -> Callable[..., Any]: if asyncio.iscoroutinefunction(handler): raise ValueError( "Async handlers are not supported. Use a sync wrapper." ) handlers = self._handlers.setdefault(event, []) if handler not in handlers: handlers.append(handler) return handler return register if callback is None else register(callback) def off(self, event: T, callback: Callable[..., Any]) -> None: """Remove a previously registered handler for an event.""" if event in self._handlers: try: self._handlers[event].remove(callback) except ValueError: pass if not self._handlers[event]: del self._handlers[event] def emit(self, event: T, *args: Any) -> None: """Emit an event, invoking all registered handlers with the provided arguments.""" callbacks = self._handlers.get(event) if not callbacks: return arguments = args if args else ({},) for cb in callbacks[:]: try: self._invoke(cb, arguments) except Exception as ex: logger.error(f"Handler raised exception on event '{event}': {ex}") def _invoke(self, func: Callable[..., Any], args: tuple[Any, ...]) -> None: """Invoke a handler, adapting the argument count to match the handler's signature.""" code = func.__code__ argcount = code.co_argcount flags = code.co_flags has_varargs = flags & 0x04 != 0 # If the function expects no arguments (only self), don't pass any if argcount == 1 and hasattr(func, "__self__"): # Only self parameter func() elif has_varargs: func(*args) else: func(*args[:argcount])A generic synchronous event emitter that supports registering, removing, and invoking event handlers.
Ancestors
- typing.Generic
Subclasses
- Agent
- AgentSession
- AvatarInput
- ContentGeneration
- Denoise
- EOU
- EventBus
- LLM
- Pipeline
- PipelineOrchestrator
- RealtimeBaseModel
- RealtimeLLMAdapter
- SpeechGeneration
- SpeechUnderstanding
- STT
- TTS
- VAD
Methods
def emit(self, event: -T, *args: Any) ‑> None-
Expand source code
def emit(self, event: T, *args: Any) -> None: """Emit an event, invoking all registered handlers with the provided arguments.""" callbacks = self._handlers.get(event) if not callbacks: return arguments = args if args else ({},) for cb in callbacks[:]: try: self._invoke(cb, arguments) except Exception as ex: logger.error(f"Handler raised exception on event '{event}': {ex}")Emit an event, invoking all registered handlers with the provided arguments.
def off(self, event: -T, callback: Callable[..., Any]) ‑> None-
Expand source code
def off(self, event: T, callback: Callable[..., Any]) -> None: """Remove a previously registered handler for an event.""" if event in self._handlers: try: self._handlers[event].remove(callback) except ValueError: pass if not self._handlers[event]: del self._handlers[event]Remove a previously registered handler for an event.
def on(self, event: -T, callback: Callable[..., Any] | None = None) ‑> Callable[..., Any]-
Expand source code
def on( self, event: T, callback: Callable[..., Any] | None = None ) -> Callable[..., Any]: """Register a synchronous handler for an event. Can be used directly or as a decorator.""" def register(handler: Callable[..., Any]) -> Callable[..., Any]: if asyncio.iscoroutinefunction(handler): raise ValueError( "Async handlers are not supported. Use a sync wrapper." ) handlers = self._handlers.setdefault(event, []) if handler not in handlers: handlers.append(handler) return handler return register if callback is None else register(callback)Register a synchronous handler for an event. Can be used directly or as a decorator.