Module agents.event_emitter
Classes
class EventEmitter-
Expand source code
class EventEmitter(Generic[T]): def __init__(self) -> None: self._handlers: Dict[T, List[Callable[..., Any]]] = {} def on( self, event: T, callback: Callable[..., Any] | None = None ) -> Callable[..., Any]: def register(handler: Callable[..., Any]) -> Callable[..., Any]: if asyncio.iscoroutinefunction(handler): raise ValueError( "Async handlers are not supported. Use a sync wrapper." ) handlers = self._handlers.setdefault(event, []) if handler not in handlers: handlers.append(handler) return handler return register if callback is None else register(callback) def off(self, event: T, callback: Callable[..., Any]) -> None: if event in self._handlers: try: self._handlers[event].remove(callback) except ValueError: pass if not self._handlers[event]: del self._handlers[event] def emit(self, event: T, *args: Any) -> None: callbacks = self._handlers.get(event) if not callbacks: return arguments = args if args else ({},) for cb in callbacks[:]: try: self._invoke(cb, arguments) except Exception as ex: logger.error(f"Handler raised exception on event '{event}': {ex}") def _invoke(self, func: Callable[..., Any], args: tuple[Any, ...]) -> None: code = func.__code__ argcount = code.co_argcount flags = code.co_flags has_varargs = flags & 0x04 != 0 # If the function expects no arguments (only self), don't pass any if argcount == 1 and hasattr(func, "__self__"): # Only self parameter func() elif has_varargs: func(*args) else: func(*args[:argcount])Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultAncestors
- typing.Generic
Subclasses
- Agent
- AgentSession
- CascadingPipeline
- ConversationFlow
- Denoise
- EOU
- EventBus
- LLM
- Pipeline
- RealtimeBaseModel
- RealTimePipeline
- STT
- TTS
- VAD
Methods
def emit(self, event: -T, *args: Any) ‑> None-
Expand source code
def emit(self, event: T, *args: Any) -> None: callbacks = self._handlers.get(event) if not callbacks: return arguments = args if args else ({},) for cb in callbacks[:]: try: self._invoke(cb, arguments) except Exception as ex: logger.error(f"Handler raised exception on event '{event}': {ex}") def off(self, event: -T, callback: Callable[..., Any]) ‑> None-
Expand source code
def off(self, event: T, callback: Callable[..., Any]) -> None: if event in self._handlers: try: self._handlers[event].remove(callback) except ValueError: pass if not self._handlers[event]: del self._handlers[event] def on(self, event: -T, callback: Callable[..., Any] | None = None) ‑> Callable[..., Any]-
Expand source code
def on( self, event: T, callback: Callable[..., Any] | None = None ) -> Callable[..., Any]: def register(handler: Callable[..., Any]) -> Callable[..., Any]: if asyncio.iscoroutinefunction(handler): raise ValueError( "Async handlers are not supported. Use a sync wrapper." ) handlers = self._handlers.setdefault(event, []) if handler not in handlers: handlers.append(handler) return handler return register if callback is None else register(callback)