Module agents.realtime_base_model

Classes

class ErrorEvent (message: str, code: str | None = None)
Expand source code
@dataclass
class ErrorEvent:
    """Event data for errors"""
    message: str
    code: str | None = None

Event data for errors

Instance variables

var code : str | None
var message : str
class InputTranscriptionCompleted (item_id: str, transcript: str)
Expand source code
@dataclass
class InputTranscriptionCompleted:
    """Event data for transcription completion"""
    item_id: str
    transcript: str

Event data for transcription completion

Instance variables

var item_id : str
var transcript : str
class RealtimeBaseModel
Expand source code
class RealtimeBaseModel(EventEmitter[Union[BaseEventTypes, TEvent]], Generic[TEvent], ABC):
    """
    Base class for realtime models with event emission capabilities.
    Allows for extension with additional event types through TEvent.
    """
    
    def __init__(self) -> None:
        """Initialize the realtime model"""
        super().__init__()

    @abstractmethod
    async def aclose(self) -> None:
        """Cleanup resources - must be implemented by subclasses"""
        pass
    
    async def cleanup(self) -> None:
        """Cleanup resources - calls aclose for compatibility"""
        await self.aclose()
    
    async def __aenter__(self) -> RealtimeBaseModel:
        """Async context manager entry"""
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit"""
        await self.aclose()

Base class for realtime models with event emission capabilities. Allows for extension with additional event types through TEvent.

Initialize the realtime model

Ancestors

Methods

async def aclose(self) ‑> None
Expand source code
@abstractmethod
async def aclose(self) -> None:
    """Cleanup resources - must be implemented by subclasses"""
    pass

Cleanup resources - must be implemented by subclasses

async def cleanup(self) ‑> None
Expand source code
async def cleanup(self) -> None:
    """Cleanup resources - calls aclose for compatibility"""
    await self.aclose()

Cleanup resources - calls aclose for compatibility