Module agents.transports.base

Classes

class BaseTransportHandler (loop, pipeline)
Expand source code
class BaseTransportHandler(ABC):
    """
    Abstract base class for all transport layers (VideoSDK, WebSocket, WebRTC).
    """
    def __init__(self, loop, pipeline):
        self.loop = loop
        self.pipeline = pipeline
        self.audio_track = None 
    @abstractmethod
    async def connect(self):
        """Establish the connection (join room or start server)"""
        pass

    @abstractmethod
    async def disconnect(self):
        """Close the connection"""
        pass

    @abstractmethod
    async def wait_for_participant(self, participant_id: Optional[str] = None) -> str:
        """Wait for a user to connect/join"""
        pass

    @abstractmethod
    async def publish_to_pubsub(self, pubsub_config: Any):
        """Publish a message to the pubsub topic"""
        pass

    @abstractmethod
    async def cleanup(self):
        """Free resources"""
        pass

    def setup_session_end_callback(self, callback):
        """Set up the callback to be called when the session ends."""
        self._on_session_end = callback

Abstract base class for all transport layers (VideoSDK, WebSocket, WebRTC).

Ancestors

  • abc.ABC

Subclasses

Methods

async def cleanup(self)
Expand source code
@abstractmethod
async def cleanup(self):
    """Free resources"""
    pass

Free resources

async def connect(self)
Expand source code
@abstractmethod
async def connect(self):
    """Establish the connection (join room or start server)"""
    pass

Establish the connection (join room or start server)

async def disconnect(self)
Expand source code
@abstractmethod
async def disconnect(self):
    """Close the connection"""
    pass

Close the connection

async def publish_to_pubsub(self, pubsub_config: Any)
Expand source code
@abstractmethod
async def publish_to_pubsub(self, pubsub_config: Any):
    """Publish a message to the pubsub topic"""
    pass

Publish a message to the pubsub topic

def setup_session_end_callback(self, callback)
Expand source code
def setup_session_end_callback(self, callback):
    """Set up the callback to be called when the session ends."""
    self._on_session_end = callback

Set up the callback to be called when the session ends.

async def wait_for_participant(self, participant_id: str | None = None) ‑> str
Expand source code
@abstractmethod
async def wait_for_participant(self, participant_id: Optional[str] = None) -> str:
    """Wait for a user to connect/join"""
    pass

Wait for a user to connect/join