Module agents.realtime_base_model
Classes
class ErrorEvent (message: str, code: str | None = None)-
Expand source code
@dataclass class ErrorEvent: """Event data for errors""" message: str code: str | None = NoneEvent data for errors
Instance variables
var code : str | Nonevar message : str
class InputTranscriptionCompleted (item_id: str, transcript: str)-
Expand source code
@dataclass class InputTranscriptionCompleted: """Event data for transcription completion""" item_id: str transcript: strEvent data for transcription completion
Instance variables
var item_id : strvar transcript : str
class RealtimeBaseModel-
Expand source code
class RealtimeBaseModel(EventEmitter[Union[BaseEventTypes, TEvent]], Generic[TEvent], ABC): """ Base class for realtime models with event emission capabilities. Allows for extension with additional event types through TEvent. """ def __init__(self) -> None: """Initialize the realtime model""" super().__init__() @abstractmethod async def aclose(self) -> None: """Cleanup resources - must be implemented by subclasses""" pass async def cleanup(self) -> None: """Cleanup resources - calls aclose for compatibility""" await self.aclose() async def __aenter__(self) -> RealtimeBaseModel: """Async context manager entry""" return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """Async context manager exit""" await self.aclose()Base class for realtime models with event emission capabilities. Allows for extension with additional event types through TEvent.
Initialize the realtime model
Ancestors
- EventEmitter
- typing.Generic
- abc.ABC
Methods
async def aclose(self) ‑> None-
Expand source code
@abstractmethod async def aclose(self) -> None: """Cleanup resources - must be implemented by subclasses""" passCleanup resources - must be implemented by subclasses
async def cleanup(self) ‑> None-
Expand source code
async def cleanup(self) -> None: """Cleanup resources - calls aclose for compatibility""" await self.aclose()Cleanup resources - calls aclose for compatibility