Module agents.transports.base
Classes
class BaseTransportHandler (loop, pipeline)-
Expand source code
class BaseTransportHandler(ABC): """ Abstract base class for all transport layers (VideoSDK, WebSocket, WebRTC). """ def __init__(self, loop, pipeline): self.loop = loop self.pipeline = pipeline self.audio_track = None @abstractmethod async def connect(self): """Establish the connection (join room or start server)""" pass @abstractmethod async def disconnect(self): """Close the connection""" pass @abstractmethod async def wait_for_participant(self, participant_id: Optional[str] = None) -> str: """Wait for a user to connect/join""" pass @abstractmethod async def publish_to_pubsub(self, pubsub_config: Any): """Publish a message to the pubsub topic""" pass @abstractmethod async def cleanup(self): """Free resources""" pass def setup_session_end_callback(self, callback): """Set up the callback to be called when the session ends.""" self._on_session_end = callbackAbstract base class for all transport layers (VideoSDK, WebSocket, WebRTC).
Ancestors
- abc.ABC
Subclasses
Methods
async def cleanup(self)-
Expand source code
@abstractmethod async def cleanup(self): """Free resources""" passFree resources
async def connect(self)-
Expand source code
@abstractmethod async def connect(self): """Establish the connection (join room or start server)""" passEstablish the connection (join room or start server)
async def disconnect(self)-
Expand source code
@abstractmethod async def disconnect(self): """Close the connection""" passClose the connection
async def publish_to_pubsub(self, pubsub_config: Any)-
Expand source code
@abstractmethod async def publish_to_pubsub(self, pubsub_config: Any): """Publish a message to the pubsub topic""" passPublish a message to the pubsub topic
def setup_session_end_callback(self, callback)-
Expand source code
def setup_session_end_callback(self, callback): """Set up the callback to be called when the session ends.""" self._on_session_end = callbackSet up the callback to be called when the session ends.
async def wait_for_participant(self, participant_id: str | None = None) ‑> str-
Expand source code
@abstractmethod async def wait_for_participant(self, participant_id: Optional[str] = None) -> str: """Wait for a user to connect/join""" passWait for a user to connect/join