Module agents.room.room

Classes

class VideoSDKHandler (*,
meeting_id: str,
auth_token: str | None = None,
name: str,
agent_participant_id: str,
agent_id: str,
pipeline: Pipeline,
loop: asyncio.events.AbstractEventLoop,
vision: bool = False,
recording: bool = False,
record_audio: bool | None = None,
record_screen_share: bool = True,
custom_camera_video_track=None,
custom_microphone_audio_track=None,
audio_sinks=None,
background_audio: bool = False,
on_room_error: Callable[[Any], None] | None = None,
auto_end_session: bool = True,
session_timeout_seconds: int | None = None,
no_participant_timeout_seconds: int | None = 90,
on_session_end: Callable[[str], None] | None = None,
signaling_base_url: str | None = None,
job_logger=None,
traces_options=None,
metrics_options=None,
logs_options=None,
avatar_participant_id: str | None = None)
Expand source code
class VideoSDKHandler(BaseTransportHandler):
    """
    Handles VideoSDK meeting operations and participant management.
    """

    def __init__(
        self,
        *,
        meeting_id: str,
        auth_token: str | None = None,
        name: str,
        agent_participant_id: str,
        agent_id: str,
        pipeline: "Pipeline",
        loop: AbstractEventLoop,
        vision: bool = False,
        recording: bool = False,
        record_audio: Optional[bool] = None,
        record_screen_share: bool = True,
        custom_camera_video_track=None,
        custom_microphone_audio_track=None,
        audio_sinks=None,
        background_audio: bool = False,
        on_room_error: Optional[Callable[[Any], None]] = None,
        # Session management options
        auto_end_session: bool = True,
        session_timeout_seconds: Optional[int] = None,
        no_participant_timeout_seconds: Optional[int] = 90,
        on_session_end: Optional[Callable[[str], None]] = None,
        # VideoSDK connection options
        signaling_base_url: Optional[str] = None,
        job_logger=None,
        traces_options=None,
        metrics_options=None,
        logs_options=None,
        # Avatar options
        avatar_participant_id: Optional[str] = None,
    ):
        """
        Initialize the VideoSDK handler.

        Args:
            meeting_id (str): Unique identifier for the meeting.
            auth_token (str | None, optional): Authentication token. Uses environment variable if not provided.
            name (str): Display name of the agent in the meeting.
            agent_participant_id (str): Participant ID of the agent in the meeting.
            pipeline (Pipeline): Audio/video processing pipeline.
            loop (AbstractEventLoop): Event loop for async operations.
            vision (bool, optional): Whether video processing is enabled. Defaults to False.
            recording (bool, optional): Whether recording is enabled. Defaults to False.
            custom_camera_video_track: Custom video track for camera input.
            custom_microphone_audio_track: Custom audio track for microphone input.
            audio_sinks: List of audio sinks for processing.
            background_audio (bool, optional): Whether to use background audio. Defaults to False.
            on_room_error (Optional[Callable[[Any], None]], optional): Error callback function.
            auto_end_session (bool, optional): Whether to automatically end sessions. Defaults to True.
            session_timeout_seconds (Optional[int], optional): Timeout for session auto-end after participants leave.
            no_participant_timeout_seconds (Optional[int], optional): Timeout to end session if no participant joins after agent connects.
            on_session_end (Optional[Callable[[str], None]], optional): Session end callback function.
            signaling_base_url (Optional[str], optional): Custom signaling server URL.

        Raises:
            ValueError: If VIDEOSDK_AUTH_TOKEN is not set in environment or parameters.
        """
        self.meeting_id = meeting_id
        self.auth_token = auth_token or os.getenv("VIDEOSDK_AUTH_TOKEN")
        if not self.auth_token:
            raise ValueError("VIDEOSDK_AUTH_TOKEN is not set")
            
        self.name = name
        self.agent_id = agent_id
        self.agent_participant_id = agent_participant_id
        self.pipeline = pipeline
        self.loop = loop
        self.vision = vision
        self.custom_camera_video_track = custom_camera_video_track
        self.custom_microphone_audio_track = custom_microphone_audio_track
        self.audio_sinks = audio_sinks or []
        self.background_audio = background_audio
        self._avatar_participant_id = avatar_participant_id

        # Managers
        self.input_stream_manager = InputStreamManager(pipeline=pipeline)
        self.sip_manager = SIPManager(room_id=meeting_id, auth_token=self.auth_token)
        self.recording_manager = RecordingManager(room_id=meeting_id, auth_token=self.auth_token)
        self.transport_event_sender = None

        # Session management
        self.auto_end_session = auto_end_session
        self.session_timeout_seconds = session_timeout_seconds
        self.no_participant_timeout_seconds = no_participant_timeout_seconds
        self.on_session_end = on_session_end
        self._session_ended = False
        self._session_end_task = None
        self._no_participant_timeout_task = None

        # VideoSDK connection
        self.signaling_base_url = signaling_base_url
        self.traces_options = traces_options
        self.metrics_options = metrics_options
        self.logs_options = logs_options

        super().__init__(loop, pipeline)

        # Participant tracking
        self._non_agent_participant_count = 0
        self._first_participant_event = asyncio.Event()
        self._participant_joined_events = {}

        # Meeting and event handling
        self.meeting = None
        self.participants_data = {}

        self.audio_listener_tasks = self.input_stream_manager.audio_listener_tasks
        self.video_listener_tasks = self.input_stream_manager.video_listener_tasks

        self._meeting_joined_data = None
        self.agent_meeting = None
        self._session_id: Optional[str] = None
        self._session_id_collected = False
        self.recording = recording
        self.record_audio = record_audio
        self.record_screen_share = record_screen_share
        self._track_recordings_kinds_by_participant: dict[str, set[str]] = {}
        self._participant_recording_has_share_at_start: dict[str, bool] = {}
        self.recorded_participants: set = set()

        self.traces_flow_manager = TracesFlowManager(room_id=self.meeting_id)
        metrics_collector.set_traces_flow_manager(
            self.traces_flow_manager)

        if custom_microphone_audio_track:
            self.audio_track = custom_microphone_audio_track
            if audio_sinks:
                if self.background_audio:
                    self.agent_audio_track = TeeMixingCustomAudioStreamTrack(
                        loop=self.loop, sinks=audio_sinks, pipeline=pipeline
                    )
                else:
                    self.agent_audio_track = TeeCustomAudioStreamTrack(
                        loop=self.loop, sinks=audio_sinks, pipeline=pipeline
                    )
            else:
                self.agent_audio_track = None
        else:
            if self.background_audio:
                self.audio_track = TeeMixingCustomAudioStreamTrack(
                    loop=self.loop, sinks=audio_sinks, pipeline=pipeline
                )
            else:
                self.audio_track = TeeCustomAudioStreamTrack(
                    loop=self.loop, sinks=audio_sinks, pipeline=pipeline
                )
            self.agent_audio_track = None

        # Create meeting config
        self.meeting_config = {
            "name": self.name,
            "participant_id": self.agent_participant_id,
            "meeting_id": self.meeting_id,
            "token": self.auth_token,
            "mic_enabled": True,
            "webcam_enabled": custom_camera_video_track is not None,
            "custom_microphone_audio_track": self.audio_track,
            "custom_camera_video_track": custom_camera_video_track,
            "peer_type": "agent",
        }
        if self.signaling_base_url is not None:
            self.meeting_config["signaling_base_url"] = self.signaling_base_url

        self.attributes = {}
        self.on_room_error = on_room_error
        self._participant_joined_events: dict[str, asyncio.Event] = {}
        self._left: bool = False
        self._job_logger = job_logger

    async def connect(self):
        """
        Connect to the VideoSDK meeting.
        """
        self.init_meeting()
        await self.join()

    async def disconnect(self):
        """
        Disconnect from the VideoSDK meeting.
        """
        await self.leave()

    def init_meeting(self):
        """
        Initialize the VideoSDK meeting instance.
        """
        self._left: bool = False
        self.sdk_metadata = {
            "sdk": "agents",
            "sdk_version": "1.0.0"
        }
        self.videosdk_meeting_meta_data= {
            "agent_id": self.agent_id,
            "agent_name": self.name,
            "is_videosdk_agent": True,
        }

        self.meeting = VideoSDK.init_meeting(
            **self.meeting_config,
            sdk_metadata=self.sdk_metadata,
            meta_data=self.videosdk_meeting_meta_data,
        )
        self.meeting.add_event_listener(
            MeetingHandler(
                on_meeting_joined=self.on_meeting_joined,
                on_meeting_left=self.on_meeting_left,
                on_participant_joined=self.on_participant_joined,
                on_participant_left=self.on_participant_left,
                on_error=self.on_error,
                on_agent_joined=self.on_agent_joined,
                on_agent_left=self.on_agent_left,
            )
        )

    async def join(self):
        """
        Join the meeting.
        """
        await self.meeting.async_join()

    async def leave(self):
        """
        Leave the meeting and clean up resources.
        """
        if self._left:
            logger.info("Meeting already left")
            return

        logger.info("Leaving meeting and cleaning up resources")
        self._left = True

        if self.recording:
            try:
                await self.stop_and_merge_recordings()
            except Exception as e:
                logger.error(f"Error stopping/merging recordings: {e}")

        try:
            if self.meeting:
                self.meeting.leave()
        except Exception as e:
            logger.error(f"Error leaving meeting: {e}")

        await self.cleanup()

    def on_error(self, data):
        """
        Handle room errors.

        This method is called when VideoSDK encounters an error and
        forwards it to the configured error callback if provided.

        Args:
            data: Error data from VideoSDK.
        """
        if self.on_room_error:
            self.on_room_error(data)
            asyncio.create_task(self._end_session("error_in_meeting"))

    def on_meeting_joined(self, data):
        """
        Handle meeting join event.

        Args:
            data: Meeting join event data from VideoSDK.
        """
        logger.info(f"Agent joined the meeting")
        self._meeting_joined_data = data

        # Notify JobContext that meeting has been joined
        from ..job import get_current_job_context
        job_ctx = get_current_job_context()
        if job_ctx:
            job_ctx.notify_meeting_joined()

        asyncio.create_task(self._collect_session_id())
        asyncio.create_task(self._collect_meeting_attributes())

        if self.no_participant_timeout_seconds is not None and self.no_participant_timeout_seconds > 0:
            self._no_participant_timeout_task = asyncio.create_task(
                self._no_participant_timeout_handler()
            )
            logger.info(
                f"No-participant timeout started: {self.no_participant_timeout_seconds}s"
            )
        if self.recording:
            self.recorded_participants.add(self.meeting.local_participant.id)
            asyncio.create_task(
                self._start_base_recording_for_participant(
                    self.meeting.local_participant.id,
                    self.meeting.local_participant,
                )
            )

    def on_meeting_left(self, data):
        """
        Handle meeting leave event.

        Args:
            data: Meeting leave event data from VideoSDK.
        """
        logger.info(f"Meeting Left: {data}")

        if hasattr(self, "participants_data") and self.participants_data:
            self.participants_data.clear()

        asyncio.create_task(self._end_session("meeting_left"))

    def _is_agent_participant(self, participant: Participant) -> bool:
        """
        Internal method: Check if a participant is an agent (or Avatar Server).
        """
        # Avatar Server participant — identified by pre-registered participant_id
        if self._avatar_participant_id and participant.id == self._avatar_participant_id:
            return True
        
        participant_name = participant.display_name.lower()
        return (
            "agent" in participant_name
            or participant_name == self.name.lower()
            or participant.id == self.meeting.local_participant.id
            if self.meeting and self.meeting.local_participant
            else False
        )

    def _participant_has_share_stream(self, participant: Participant) -> bool:
        """True if participant already has a share stream (avoids duplicate screen track recording)."""
        streams = getattr(participant, "streams", None)
        if not streams:
            return False
        try:
            vals = streams.values() if isinstance(streams, dict) else streams
            return any(getattr(s, "kind", None) == "share" for s in vals)
        except Exception:
            return False

    async def _start_track_recording_kind(self, participant_id: str, kind: str) -> None:
        kinds = self._track_recordings_kinds_by_participant.setdefault(participant_id, set())
        if kind in kinds:
            return
        await self.recording_manager.start_track_recording(participant_id=participant_id, kind=kind)
        kinds.add(kind)

    async def _start_screen_track_recordings(self, participant_id: str) -> None:
        if not self.record_screen_share:
            return
        for kind in _SCREEN_RECORDING_KINDS:
            await self._start_track_recording_kind(participant_id, kind)

    async def _start_base_recording_for_participant(
        self, participant_id: str, participant_obj: Participant | None = None
    ) -> None:
        """Participant composite API (record_audio is None) or track API (audio/video) + optional screen at join."""
        if self.record_audio is None:
            has_share = (
                self._participant_has_share_stream(participant_obj)
                if participant_obj is not None
                else False
            )
            self._participant_recording_has_share_at_start[participant_id] = has_share
            await self.recording_manager.start_participant_recording(participant_id)
            return

        await self._start_track_recording_kind(
            participant_id, "audio" if self.record_audio else "video"
        )
        if participant_obj and self._participant_has_share_stream(participant_obj):
            await self._start_screen_track_recordings(participant_id)

    async def _maybe_start_screen_track_recording(self, participant: Participant) -> None:
        """When share starts later: skip if participant recording already included share at start."""
        pid = participant.id
        if self.record_audio is None and self._participant_recording_has_share_at_start.get(pid, False):
            return
        await self._start_screen_track_recordings(pid)

    def _update_non_agent_participant_count(self):
        """
        Internal method: Update the count of non-agent participants.
        """
        if not self.meeting:
            return

        count = 0
        for participant in self.meeting.participants.values():
            if not self._is_agent_participant(participant):
                count += 1

        self._non_agent_participant_count = count
        logger.debug(f"Non-agent participant count: {count}")

    def _cancel_session_end_task(self):
        """
        Internal method: Cancel the session end task if it exists.
        """
        if self._session_end_task and not self._session_end_task.done():
            self._session_end_task.cancel()
            self._session_end_task = None

    async def _end_session(self, reason: str = "session_ended"):
        """
        Internal method: End the current session.
        """
        if self._session_ended:
            return

        self._cancel_session_end_task()

        logger.info(f"Ending session: {reason}")

        if self.on_session_end:
            try:
                self.on_session_end(reason)
            except Exception as e:
                logger.error(f"Error in session end callback: {e}")

        await self.leave()
        self._session_ended = True

        if not self._first_participant_event.is_set():
            self._first_participant_event.set()
        for evt in self._participant_joined_events.values():
            if not evt.is_set():
                evt.set()

    async def force_end_session(self, reason: str = "manual_hangup") -> None:
        """
        Public helper: forcefully end the session, bypassing participant checks.

        Args:
            reason: Reason string to propagate to session end callbacks.
        """
        await self._end_session(reason)

    async def call_transfer(self,transfer_to: str) -> None:
        """
        Transfer the call to a provided Phone number or SIP endpoint.
        """
        await self.sip_manager.call_transfer(session_id=self._session_id, transfer_to=transfer_to)

    def fetch_call_info(self,session_id: str):
        """
        Fetch SIP call information. Forward to sip_manager.
        """
        return self.sip_manager.fetch_call_info(session_id=session_id)

    def transfer_call(self,call_id: str, transfer_to: str):
        """
        Transfer the call. Forward to sip_manager.
        """
        return self.sip_manager.transfer_call(call_id=call_id, transfer_to=transfer_to)

    def setup_session_end_callback(self, callback):
        """
        Set up the session end callback.
        """
        existing_callback = self.on_session_end

        if existing_callback:

            def chained_callback(reason: str):
                try:
                    existing_callback(reason)
                except Exception as e:
                    logger.error(f"Error in existing session end callback: {e}")
                try:
                    callback(reason)
                except Exception as e:
                    logger.error(f"Error in new session end callback: {e}")

            self.on_session_end = chained_callback
            logger.debug("Session end callback chained with existing callback")
        else:
            self.on_session_end = callback
            logger.debug("Session end callback set up")

    def _schedule_session_end(self, timeout_seconds: int):
        """
        Internal method: Schedule session end after timeout.
        """
        if self._session_end_task and not self._session_end_task.done():
            self._session_end_task.cancel()

        self._session_end_task = asyncio.create_task(
            self._delayed_session_end(timeout_seconds)
        )
        logger.info(f"Session end scheduled in {timeout_seconds} seconds")

    async def _delayed_session_end(self, timeout_seconds: int):
        """
        Internal method: Delayed session end after timeout.
        """
        await asyncio.sleep(timeout_seconds)
        await self._end_session("no_participants")

    async def _no_participant_timeout_handler(self):
        """
        Internal method: End session if no participant joins within the configured timeout.
        """
        await asyncio.sleep(self.no_participant_timeout_seconds)
        if self._non_agent_participant_count == 0:
            logger.info(
                f"No participant joined within {self.no_participant_timeout_seconds}s, ending session"
            )
            await self._end_session("no_participant_joined")

    def on_participant_joined(self, participant: Participant):
        """
        Handle participant join event.

        Args:
            participant (Participant): The participant that joined.
        """
        peer_name = participant.display_name
        self.participants_data[participant.id] = {"name": peer_name}
        self.participants_data[participant.id]["sipUser"] = (
            participant.meta_data.get("sipUser", False)
            if participant.meta_data
            else False
        )
        self.participants_data[participant.id]["sipCallType"] = (
            participant.meta_data.get("callType", False)
            if participant.meta_data
            else False
        )
        
        self.participants_data[participant.id]["enable_agent_events"] = (
            participant.meta_data.get("enableAgentEvents", False)
            if participant.meta_data
            else False
        )
        
        if self.participants_data[participant.id]["enable_agent_events"] and self.transport_event_sender is None:
            logger.info("Initializing transport_event_sender because participant has enableAgentEvents=True")
            self.transport_event_sender = TransportEventSender(self)
            
        logger.info(f"Participant joined: {peer_name}")

        if self._is_agent_participant(participant):
            logger.info(f"Skipping agent/avatar participant in on_participant_joined: {participant.id}")
            return

        sip_user_flag = self.participants_data[participant.id]["sipUser"]
        metrics_collector.add_participant_metrics(
            participant_id=participant.id,
            kind="user",
            sip_user=sip_user_flag,
            join_time=time.time(),
            meta=self.participants_data[participant.id],
        )

        if self.recording and len(self.participants_data) == 1:
            self.recorded_participants.add(participant.id)
            asyncio.create_task(
                self._start_base_recording_for_participant(participant.id, participant)
            )

        if participant.id in self._participant_joined_events:
            self._participant_joined_events[participant.id].set()

        if not self._first_participant_event.is_set():
            self._first_participant_event.set()

        if self._no_participant_timeout_task and not self._no_participant_timeout_task.done():
            self._no_participant_timeout_task.cancel()
            self._no_participant_timeout_task = None
            logger.info("No-participant timeout cancelled: participant joined")

        # Update participant count and cancel session end if participants are present
        self._update_non_agent_participant_count()
        if self._non_agent_participant_count > 0:
            self._cancel_session_end_task()

        def on_stream_enabled(stream: Stream):
            """
            Internal method: Handle stream enabled event.
            """
            if stream.kind == "audio":
                global_event_emitter.emit(
                    "AUDIO_STREAM_ENABLED",
                    {"stream": stream, "participant": participant},
                )
                logger.info(f"Audio stream enabled for participant: {peer_name}")
                try:
                    task = asyncio.create_task(self.add_audio_listener(stream))
                    self.audio_listener_tasks[stream.id] = task
                except Exception as e:
                    logger.error(f"Error creating audio listener task: {e}")

            if stream.kind == "share" and self.recording:
                asyncio.create_task(self._maybe_start_screen_track_recording(participant))
                
            if stream.kind in ("video", "share") and self.vision:
                logger.info(f"{stream.kind} stream enabled for participant: {peer_name}")
                self.video_listener_tasks[stream.id] = asyncio.create_task(
                    self.add_video_listener(stream)
                )

        def on_stream_disabled(stream: Stream):
            """
            Internal method: Handle stream disabled event.
            """
            if stream.kind == "audio":
                audio_task = self.audio_listener_tasks.get(stream.id)
                if audio_task is not None:
                    audio_task.cancel()
                    del self.audio_listener_tasks[stream.id]
            if stream.kind in ("video", "share"):
                video_task = self.video_listener_tasks.get(stream.id)
                if video_task is not None:
                    video_task.cancel()
                    del self.video_listener_tasks[stream.id]

        if participant.id != self.meeting.local_participant.id:
            participant.add_event_listener(
                ParticipantHandler(
                    participant_id=participant.id,
                    on_stream_enabled=on_stream_enabled,
                    on_stream_disabled=on_stream_disabled,
                )
            )

    def on_participant_left(self, participant: Participant):
        """
        Handle participant leave event.

        Args:
            participant (Participant): The participant that left.
        """
        logger.info(f"Participant left: {participant.display_name}")

        if participant.id in self.audio_listener_tasks:
            try:
                self.audio_listener_tasks[participant.id].cancel()
                del self.audio_listener_tasks[participant.id]
            except Exception as e:
                logger.error(
                    f"Error cancelling audio listener task for participant {participant.id}: {e}"
                )

        if participant.id in self.video_listener_tasks:
            try:
                self.video_listener_tasks[participant.id].cancel()
                del self.video_listener_tasks[participant.id]
            except Exception as e:
                logger.error(
                    f"Error cancelling video listener task for participant {participant.id}: {e}"
                )

        global_event_emitter.emit("PARTICIPANT_LEFT", {"participant": participant})

        # Update participant count and check if session should end
        self._update_non_agent_participant_count()

        if participant.id in self.recorded_participants:
            logger.info(
                f"Recorded participant {participant.display_name} left, ending session"
            )
            asyncio.create_task(self._end_session("recorded_participant_left"))
            return

        if self._non_agent_participant_count == 0 and self.auto_end_session:
            if (
                self.session_timeout_seconds is not None
                and self.session_timeout_seconds > 0
            ):
                logger.info(
                    f"All non-agent participants have left, scheduling session end in {self.session_timeout_seconds} seconds"
                )
                self._schedule_session_end(self.session_timeout_seconds)
            else:
                logger.info(
                    "All non-agent participants have left, ending session immediately"
                )
                asyncio.create_task(self._end_session("all_participants_left"))

    def on_agent_joined(self, agent: TransportAgent):
        """
        Handle remote agent join event.

        Args:
            agent (TransportAgent): The agent that joined.
        """
        logger.info(f"Remote agent joined: {agent.display_name} ({agent.id})")

    def on_agent_left(self, agent: TransportAgent):
        """
        Handle remote agent leave event.

        Args:
            agent (TransportAgent): The agent that left.
        """
        logger.info(f"Remote agent left: {agent.display_name} ({agent.id})")

    async def add_audio_listener(self, stream: Stream):
        """
        Forward to input_stream_manager.
        """
        await self.input_stream_manager.add_audio_listener(stream)

    async def add_video_listener(self, stream: Stream):
        """
        Forward to input_stream_manager.
        """
        await self.input_stream_manager.add_video_listener(stream)

    async def wait_for_participant(self, participant_id: str | None = None) -> str:
        """
        Wait for a specific participant to join.
        """
        if participant_id:
            if participant_id in self.participants_data:
                return participant_id

            if participant_id not in self._participant_joined_events:
                self._participant_joined_events[participant_id] = asyncio.Event()

            await self._participant_joined_events[participant_id].wait()

            if self._session_ended:
                return None

            return participant_id
        else:
            if self.participants_data:
                return next(iter(self.participants_data.keys()))

            await self._first_participant_event.wait()

            if self._session_ended or not self.participants_data:
                return None

            return next(iter(self.participants_data.keys()))

    async def subscribe_to_pubsub(self, pubsub_config: PubSubSubscribeConfig):
        """
        Subscribe to pubsub messages.

        Args:
            pubsub_config (PubSubSubscribeConfig): Configuration for pubsub subscription.

        Returns:
            List of existing messages from the subscription.
        """
        old_messages = await self.meeting.pubsub.subscribe(pubsub_config)
        return old_messages

    async def publish_to_pubsub(self, pubsub_config: PubSubPublishConfig):
        """
        Publish message to pubsub.

        Args:
            pubsub_config (PubSubPublishConfig): Configuration for pubsub publishing.
        """
        if self.meeting:
            await self.meeting.pubsub.publish(pubsub_config)

    async def upload_file(self, base64_data, file_name):
        """
        Upload a file to the temporary storage.

        Args:
            base64_data: Base64-encoded file data.
            file_name (str): Name of the file to upload.

        Returns:
            Upload response from VideoSDK API.
        """
        return self.meeting.upload_base64(base64_data, self.auth_token, file_name)

    async def fetch_file(self, url):
        """
        Fetch a file from a URL.

        Args:
            url (str): URL of the file to fetch.

        Returns:
            Base64-encoded file data.
        """
        return self.meeting.fetch_base64(url, self.auth_token)

    async def cleanup(self):
        """
        Clean up resources.
        """
        logger.info("Starting room cleanup")

        self._cancel_session_end_task()

        if self._no_participant_timeout_task and not self._no_participant_timeout_task.done():
            self._no_participant_timeout_task.cancel()
            self._no_participant_timeout_task = None
        
        self.input_stream_manager.cancel_tasks()
        
        if hasattr(self, "transport_event_sender") and self.transport_event_sender:
            self.transport_event_sender.cleanup()
        
        if hasattr(self, "audio_track") and self.audio_track:
            try:
                await self.audio_track.cleanup()
            except Exception as e:
                logger.error(f"Error cleaning up audio track: {e}")
            self.audio_track = None

        if hasattr(self, "agent_audio_track") and self.agent_audio_track:
            try:
                await self.agent_audio_track.cleanup()
            except Exception as e:
                logger.error(f"Error cleaning up agent audio track: {e}")
            self.agent_audio_track = None

        if hasattr(self, "traces_flow_manager") and self.traces_flow_manager:
            try:
                self.traces_flow_manager.agent_meeting_end()
            except Exception as e:
                logger.error(f"Error ending traces flow manager: {e}")
            self.traces_flow_manager = None
            metrics_collector.set_traces_flow_manager(None)
        
        self.participants_data.clear()
        self.recorded_participants.clear()
        self._track_recordings_kinds_by_participant.clear()
        self._participant_recording_has_share_at_start.clear()
        self._participant_joined_events.clear()
        self.meeting = None
        self.pipeline = None
        self.custom_camera_video_track = None
        self.custom_microphone_audio_track = None
        self.audio_sinks = None
        self.on_room_error = None
        self.on_session_end = None
        self._job_logger = None
        self._session_ended = True
        self._session_id = None
        self._session_id_collected = False
        self._non_agent_participant_count = 0

        logger.info("Room cleanup completed")

    async def _collect_session_id(self) -> None:
        """
        Internal method: Collect session ID from room and set it in metrics.
        """
        if self.meeting and not self._session_id_collected:
            try:
                session_id = getattr(self.meeting, "session_id", None)
                if session_id:
                    self._session_id = session_id
                    logger.info(f"Session ID collected: {session_id}")
                    metrics_collector.set_session_id(session_id)
                    metrics_collector.add_participant_metrics(
                        participant_id=self.meeting.local_participant.id,
                        kind="agent",
                        sip_user=False,
                        join_time=time.time(),
                        meta={"name": self.name},
                    )
                    self._session_id_collected = True
                    if self.traces_flow_manager:
                        self.traces_flow_manager.set_session_id(session_id)
                    if self._job_logger:
                        self._job_logger.update_context(sessionId=session_id)
            except Exception as e:
                logger.error(f"Error collecting session ID: {e}")

    async def _collect_meeting_attributes(self) -> None:
        """
        Internal method: Collect meeting attributes and initialize telemetry.
        """
        if not self.meeting:
            logger.error("Meeting not initialized")
            return

        try:
            if hasattr(self.meeting, "get_attributes"):
                attributes = self.meeting.get_attributes()

                if attributes:
                    peer_id = getattr(self.meeting, "participant_id", "agent")

                    traces_config = attributes.get("traces", {})
                    if self.traces_options:
                        if not self.traces_options.enabled:
                            traces_config["enabled"] = False
                        elif self.traces_options.enabled and self.traces_options.export_url:
                            traces_config["enabled"] = True
                            traces_config["pbEndPoint"] = self.traces_options.export_url
                            traces_config["export_headers"] = self.traces_options.export_headers

                    auto_initialize_telemetry_and_logs(
                        room_id=self.meeting_id,
                        peer_id=peer_id,
                        room_attributes=attributes,
                        session_id=self._session_id,
                        sdk_metadata=self.sdk_metadata,
                        custom_traces_config=traces_config,
                    )

                    if self._job_logger:
                        logs_config = attributes.get("logs", {})
                        observability_jwt = attributes.get("observability", "")
                        
                        is_logs_enabled = logs_config.get("enabled", False)
                        log_endpoint = logs_config.get("endPoint", "")
                        custom_headers = None
                        
                        if self.logs_options:
                            if not self.logs_options.enabled:
                                is_logs_enabled = False
                            elif self.logs_options.enabled and self.logs_options.export_url:
                                is_logs_enabled = True
                                log_endpoint = self.logs_options.export_url
                                custom_headers = self.logs_options.export_headers

                        if is_logs_enabled and log_endpoint:
                            self._job_logger.set_endpoint(log_endpoint, observability_jwt, custom_headers)
                            logger.debug(f"Log endpoint configured: {log_endpoint}")
                else:
                    logger.error("No meeting attributes found")
            else:
                logger.error("Meeting object does not have 'get_attributes' method")

            if self._meeting_joined_data and self.traces_flow_manager:
                start_time = time.perf_counter()
                agent_joined_attributes = {
                    "roomId": self.meeting_id,
                    "agent_ParticipantId": self.agent_participant_id,
                    "sessionId": self._session_id,
                    "agent_name": self.name,
                    "peerId": self.meeting.local_participant.id,
                    "sdk_metadata": self.sdk_metadata,
                    "start_time": start_time,
                }
                self.traces_flow_manager.start_agent_joined_meeting(
                    agent_joined_attributes
                )
        except Exception as e:
            logger.error(f"Error collecting meeting attributes and creating spans: {e}")

    async def stop_participants_recording(self):
        """
        Stop recording for all participants. Forward to recording_manager.
        """
        await self.recording_manager.stop_participant_recording(self.meeting.local_participant.id)
        for participant_id in self.participants_data.keys():
            await self.recording_manager.stop_participant_recording(participant_id)

    async def start_participant_recording(self, id: str):
        """
        Start recording. Forward to recording_manager.
        """
        await self.recording_manager.start_participant_recording(id)

    async def stop_participant_recording(self, id: str):
        """
        Stop recording. Forward to recording_manager.
        """
        await self.recording_manager.stop_participant_recording(id)

    async def merge_participant_recordings(self):
        """
        Merge recordings. Forward to recording_manager.
        """
        await self.recording_manager.merge_participant_recordings(
            session_id=self._session_id,
            local_participant_id=self.meeting.local_participant.id,
            participants_data=self.participants_data
        )

    async def stop_and_merge_recordings(self):
        """
        Stop and merge recordings. Forward to recording_manager.
        """
        await self.recording_manager.stop_and_merge_recordings(
            session_id=self._session_id,
            local_participant_id=self.meeting.local_participant.id,
            participants_data=self.participants_data,
            track_kinds_by_participant=self._track_recordings_kinds_by_participant,
            stop_participants_recording=self.record_audio is None,
        )

Handles VideoSDK meeting operations and participant management.

Initialize the VideoSDK handler.

Args

meeting_id : str
Unique identifier for the meeting.
auth_token : str | None, optional
Authentication token. Uses environment variable if not provided.
name : str
Display name of the agent in the meeting.
agent_participant_id : str
Participant ID of the agent in the meeting.
pipeline : Pipeline
Audio/video processing pipeline.
loop : AbstractEventLoop
Event loop for async operations.
vision : bool, optional
Whether video processing is enabled. Defaults to False.
recording : bool, optional
Whether recording is enabled. Defaults to False.
custom_camera_video_track
Custom video track for camera input.
custom_microphone_audio_track
Custom audio track for microphone input.
audio_sinks
List of audio sinks for processing.
background_audio : bool, optional
Whether to use background audio. Defaults to False.
on_room_error : Optional[Callable[[Any], None]], optional
Error callback function.
auto_end_session : bool, optional
Whether to automatically end sessions. Defaults to True.
session_timeout_seconds : Optional[int], optional
Timeout for session auto-end after participants leave.
no_participant_timeout_seconds : Optional[int], optional
Timeout to end session if no participant joins after agent connects.
on_session_end : Optional[Callable[[str], None]], optional
Session end callback function.
signaling_base_url : Optional[str], optional
Custom signaling server URL.

Raises

ValueError
If VIDEOSDK_AUTH_TOKEN is not set in environment or parameters.

Ancestors

Methods

async def add_audio_listener(self, stream: videosdk.stream.Stream)
Expand source code
async def add_audio_listener(self, stream: Stream):
    """
    Forward to input_stream_manager.
    """
    await self.input_stream_manager.add_audio_listener(stream)

Forward to input_stream_manager.

async def add_video_listener(self, stream: videosdk.stream.Stream)
Expand source code
async def add_video_listener(self, stream: Stream):
    """
    Forward to input_stream_manager.
    """
    await self.input_stream_manager.add_video_listener(stream)

Forward to input_stream_manager.

async def call_transfer(self, transfer_to: str) ‑> None
Expand source code
async def call_transfer(self,transfer_to: str) -> None:
    """
    Transfer the call to a provided Phone number or SIP endpoint.
    """
    await self.sip_manager.call_transfer(session_id=self._session_id, transfer_to=transfer_to)

Transfer the call to a provided Phone number or SIP endpoint.

async def cleanup(self)
Expand source code
async def cleanup(self):
    """
    Clean up resources.
    """
    logger.info("Starting room cleanup")

    self._cancel_session_end_task()

    if self._no_participant_timeout_task and not self._no_participant_timeout_task.done():
        self._no_participant_timeout_task.cancel()
        self._no_participant_timeout_task = None
    
    self.input_stream_manager.cancel_tasks()
    
    if hasattr(self, "transport_event_sender") and self.transport_event_sender:
        self.transport_event_sender.cleanup()
    
    if hasattr(self, "audio_track") and self.audio_track:
        try:
            await self.audio_track.cleanup()
        except Exception as e:
            logger.error(f"Error cleaning up audio track: {e}")
        self.audio_track = None

    if hasattr(self, "agent_audio_track") and self.agent_audio_track:
        try:
            await self.agent_audio_track.cleanup()
        except Exception as e:
            logger.error(f"Error cleaning up agent audio track: {e}")
        self.agent_audio_track = None

    if hasattr(self, "traces_flow_manager") and self.traces_flow_manager:
        try:
            self.traces_flow_manager.agent_meeting_end()
        except Exception as e:
            logger.error(f"Error ending traces flow manager: {e}")
        self.traces_flow_manager = None
        metrics_collector.set_traces_flow_manager(None)
    
    self.participants_data.clear()
    self.recorded_participants.clear()
    self._track_recordings_kinds_by_participant.clear()
    self._participant_recording_has_share_at_start.clear()
    self._participant_joined_events.clear()
    self.meeting = None
    self.pipeline = None
    self.custom_camera_video_track = None
    self.custom_microphone_audio_track = None
    self.audio_sinks = None
    self.on_room_error = None
    self.on_session_end = None
    self._job_logger = None
    self._session_ended = True
    self._session_id = None
    self._session_id_collected = False
    self._non_agent_participant_count = 0

    logger.info("Room cleanup completed")

Clean up resources.

async def connect(self)
Expand source code
async def connect(self):
    """
    Connect to the VideoSDK meeting.
    """
    self.init_meeting()
    await self.join()

Connect to the VideoSDK meeting.

async def disconnect(self)
Expand source code
async def disconnect(self):
    """
    Disconnect from the VideoSDK meeting.
    """
    await self.leave()

Disconnect from the VideoSDK meeting.

def fetch_call_info(self, session_id: str)
Expand source code
def fetch_call_info(self,session_id: str):
    """
    Fetch SIP call information. Forward to sip_manager.
    """
    return self.sip_manager.fetch_call_info(session_id=session_id)

Fetch SIP call information. Forward to sip_manager.

async def fetch_file(self, url)
Expand source code
async def fetch_file(self, url):
    """
    Fetch a file from a URL.

    Args:
        url (str): URL of the file to fetch.

    Returns:
        Base64-encoded file data.
    """
    return self.meeting.fetch_base64(url, self.auth_token)

Fetch a file from a URL.

Args

url : str
URL of the file to fetch.

Returns

Base64-encoded file data.

async def force_end_session(self, reason: str = 'manual_hangup') ‑> None
Expand source code
async def force_end_session(self, reason: str = "manual_hangup") -> None:
    """
    Public helper: forcefully end the session, bypassing participant checks.

    Args:
        reason: Reason string to propagate to session end callbacks.
    """
    await self._end_session(reason)

Public helper: forcefully end the session, bypassing participant checks.

Args

reason
Reason string to propagate to session end callbacks.
def init_meeting(self)
Expand source code
def init_meeting(self):
    """
    Initialize the VideoSDK meeting instance.
    """
    self._left: bool = False
    self.sdk_metadata = {
        "sdk": "agents",
        "sdk_version": "1.0.0"
    }
    self.videosdk_meeting_meta_data= {
        "agent_id": self.agent_id,
        "agent_name": self.name,
        "is_videosdk_agent": True,
    }

    self.meeting = VideoSDK.init_meeting(
        **self.meeting_config,
        sdk_metadata=self.sdk_metadata,
        meta_data=self.videosdk_meeting_meta_data,
    )
    self.meeting.add_event_listener(
        MeetingHandler(
            on_meeting_joined=self.on_meeting_joined,
            on_meeting_left=self.on_meeting_left,
            on_participant_joined=self.on_participant_joined,
            on_participant_left=self.on_participant_left,
            on_error=self.on_error,
            on_agent_joined=self.on_agent_joined,
            on_agent_left=self.on_agent_left,
        )
    )

Initialize the VideoSDK meeting instance.

async def join(self)
Expand source code
async def join(self):
    """
    Join the meeting.
    """
    await self.meeting.async_join()

Join the meeting.

async def leave(self)
Expand source code
async def leave(self):
    """
    Leave the meeting and clean up resources.
    """
    if self._left:
        logger.info("Meeting already left")
        return

    logger.info("Leaving meeting and cleaning up resources")
    self._left = True

    if self.recording:
        try:
            await self.stop_and_merge_recordings()
        except Exception as e:
            logger.error(f"Error stopping/merging recordings: {e}")

    try:
        if self.meeting:
            self.meeting.leave()
    except Exception as e:
        logger.error(f"Error leaving meeting: {e}")

    await self.cleanup()

Leave the meeting and clean up resources.

async def merge_participant_recordings(self)
Expand source code
async def merge_participant_recordings(self):
    """
    Merge recordings. Forward to recording_manager.
    """
    await self.recording_manager.merge_participant_recordings(
        session_id=self._session_id,
        local_participant_id=self.meeting.local_participant.id,
        participants_data=self.participants_data
    )

Merge recordings. Forward to recording_manager.

def on_agent_joined(self, agent: videosdk.agent.Agent)
Expand source code
def on_agent_joined(self, agent: TransportAgent):
    """
    Handle remote agent join event.

    Args:
        agent (TransportAgent): The agent that joined.
    """
    logger.info(f"Remote agent joined: {agent.display_name} ({agent.id})")

Handle remote agent join event.

Args

agent : TransportAgent
The agent that joined.
def on_agent_left(self, agent: videosdk.agent.Agent)
Expand source code
def on_agent_left(self, agent: TransportAgent):
    """
    Handle remote agent leave event.

    Args:
        agent (TransportAgent): The agent that left.
    """
    logger.info(f"Remote agent left: {agent.display_name} ({agent.id})")

Handle remote agent leave event.

Args

agent : TransportAgent
The agent that left.
def on_error(self, data)
Expand source code
def on_error(self, data):
    """
    Handle room errors.

    This method is called when VideoSDK encounters an error and
    forwards it to the configured error callback if provided.

    Args:
        data: Error data from VideoSDK.
    """
    if self.on_room_error:
        self.on_room_error(data)
        asyncio.create_task(self._end_session("error_in_meeting"))

Handle room errors.

This method is called when VideoSDK encounters an error and forwards it to the configured error callback if provided.

Args

data
Error data from VideoSDK.
def on_meeting_joined(self, data)
Expand source code
def on_meeting_joined(self, data):
    """
    Handle meeting join event.

    Args:
        data: Meeting join event data from VideoSDK.
    """
    logger.info(f"Agent joined the meeting")
    self._meeting_joined_data = data

    # Notify JobContext that meeting has been joined
    from ..job import get_current_job_context
    job_ctx = get_current_job_context()
    if job_ctx:
        job_ctx.notify_meeting_joined()

    asyncio.create_task(self._collect_session_id())
    asyncio.create_task(self._collect_meeting_attributes())

    if self.no_participant_timeout_seconds is not None and self.no_participant_timeout_seconds > 0:
        self._no_participant_timeout_task = asyncio.create_task(
            self._no_participant_timeout_handler()
        )
        logger.info(
            f"No-participant timeout started: {self.no_participant_timeout_seconds}s"
        )
    if self.recording:
        self.recorded_participants.add(self.meeting.local_participant.id)
        asyncio.create_task(
            self._start_base_recording_for_participant(
                self.meeting.local_participant.id,
                self.meeting.local_participant,
            )
        )

Handle meeting join event.

Args

data
Meeting join event data from VideoSDK.
def on_meeting_left(self, data)
Expand source code
def on_meeting_left(self, data):
    """
    Handle meeting leave event.

    Args:
        data: Meeting leave event data from VideoSDK.
    """
    logger.info(f"Meeting Left: {data}")

    if hasattr(self, "participants_data") and self.participants_data:
        self.participants_data.clear()

    asyncio.create_task(self._end_session("meeting_left"))

Handle meeting leave event.

Args

data
Meeting leave event data from VideoSDK.
def on_participant_joined(self, participant: videosdk.participant.Participant)
Expand source code
def on_participant_joined(self, participant: Participant):
    """
    Handle participant join event.

    Args:
        participant (Participant): The participant that joined.
    """
    peer_name = participant.display_name
    self.participants_data[participant.id] = {"name": peer_name}
    self.participants_data[participant.id]["sipUser"] = (
        participant.meta_data.get("sipUser", False)
        if participant.meta_data
        else False
    )
    self.participants_data[participant.id]["sipCallType"] = (
        participant.meta_data.get("callType", False)
        if participant.meta_data
        else False
    )
    
    self.participants_data[participant.id]["enable_agent_events"] = (
        participant.meta_data.get("enableAgentEvents", False)
        if participant.meta_data
        else False
    )
    
    if self.participants_data[participant.id]["enable_agent_events"] and self.transport_event_sender is None:
        logger.info("Initializing transport_event_sender because participant has enableAgentEvents=True")
        self.transport_event_sender = TransportEventSender(self)
        
    logger.info(f"Participant joined: {peer_name}")

    if self._is_agent_participant(participant):
        logger.info(f"Skipping agent/avatar participant in on_participant_joined: {participant.id}")
        return

    sip_user_flag = self.participants_data[participant.id]["sipUser"]
    metrics_collector.add_participant_metrics(
        participant_id=participant.id,
        kind="user",
        sip_user=sip_user_flag,
        join_time=time.time(),
        meta=self.participants_data[participant.id],
    )

    if self.recording and len(self.participants_data) == 1:
        self.recorded_participants.add(participant.id)
        asyncio.create_task(
            self._start_base_recording_for_participant(participant.id, participant)
        )

    if participant.id in self._participant_joined_events:
        self._participant_joined_events[participant.id].set()

    if not self._first_participant_event.is_set():
        self._first_participant_event.set()

    if self._no_participant_timeout_task and not self._no_participant_timeout_task.done():
        self._no_participant_timeout_task.cancel()
        self._no_participant_timeout_task = None
        logger.info("No-participant timeout cancelled: participant joined")

    # Update participant count and cancel session end if participants are present
    self._update_non_agent_participant_count()
    if self._non_agent_participant_count > 0:
        self._cancel_session_end_task()

    def on_stream_enabled(stream: Stream):
        """
        Internal method: Handle stream enabled event.
        """
        if stream.kind == "audio":
            global_event_emitter.emit(
                "AUDIO_STREAM_ENABLED",
                {"stream": stream, "participant": participant},
            )
            logger.info(f"Audio stream enabled for participant: {peer_name}")
            try:
                task = asyncio.create_task(self.add_audio_listener(stream))
                self.audio_listener_tasks[stream.id] = task
            except Exception as e:
                logger.error(f"Error creating audio listener task: {e}")

        if stream.kind == "share" and self.recording:
            asyncio.create_task(self._maybe_start_screen_track_recording(participant))
            
        if stream.kind in ("video", "share") and self.vision:
            logger.info(f"{stream.kind} stream enabled for participant: {peer_name}")
            self.video_listener_tasks[stream.id] = asyncio.create_task(
                self.add_video_listener(stream)
            )

    def on_stream_disabled(stream: Stream):
        """
        Internal method: Handle stream disabled event.
        """
        if stream.kind == "audio":
            audio_task = self.audio_listener_tasks.get(stream.id)
            if audio_task is not None:
                audio_task.cancel()
                del self.audio_listener_tasks[stream.id]
        if stream.kind in ("video", "share"):
            video_task = self.video_listener_tasks.get(stream.id)
            if video_task is not None:
                video_task.cancel()
                del self.video_listener_tasks[stream.id]

    if participant.id != self.meeting.local_participant.id:
        participant.add_event_listener(
            ParticipantHandler(
                participant_id=participant.id,
                on_stream_enabled=on_stream_enabled,
                on_stream_disabled=on_stream_disabled,
            )
        )

Handle participant join event.

Args

participant : Participant
The participant that joined.
def on_participant_left(self, participant: videosdk.participant.Participant)
Expand source code
def on_participant_left(self, participant: Participant):
    """
    Handle participant leave event.

    Args:
        participant (Participant): The participant that left.
    """
    logger.info(f"Participant left: {participant.display_name}")

    if participant.id in self.audio_listener_tasks:
        try:
            self.audio_listener_tasks[participant.id].cancel()
            del self.audio_listener_tasks[participant.id]
        except Exception as e:
            logger.error(
                f"Error cancelling audio listener task for participant {participant.id}: {e}"
            )

    if participant.id in self.video_listener_tasks:
        try:
            self.video_listener_tasks[participant.id].cancel()
            del self.video_listener_tasks[participant.id]
        except Exception as e:
            logger.error(
                f"Error cancelling video listener task for participant {participant.id}: {e}"
            )

    global_event_emitter.emit("PARTICIPANT_LEFT", {"participant": participant})

    # Update participant count and check if session should end
    self._update_non_agent_participant_count()

    if participant.id in self.recorded_participants:
        logger.info(
            f"Recorded participant {participant.display_name} left, ending session"
        )
        asyncio.create_task(self._end_session("recorded_participant_left"))
        return

    if self._non_agent_participant_count == 0 and self.auto_end_session:
        if (
            self.session_timeout_seconds is not None
            and self.session_timeout_seconds > 0
        ):
            logger.info(
                f"All non-agent participants have left, scheduling session end in {self.session_timeout_seconds} seconds"
            )
            self._schedule_session_end(self.session_timeout_seconds)
        else:
            logger.info(
                "All non-agent participants have left, ending session immediately"
            )
            asyncio.create_task(self._end_session("all_participants_left"))

Handle participant leave event.

Args

participant : Participant
The participant that left.
async def publish_to_pubsub(self, pubsub_config: videosdk.utils.PubSubPublishConfig)
Expand source code
async def publish_to_pubsub(self, pubsub_config: PubSubPublishConfig):
    """
    Publish message to pubsub.

    Args:
        pubsub_config (PubSubPublishConfig): Configuration for pubsub publishing.
    """
    if self.meeting:
        await self.meeting.pubsub.publish(pubsub_config)

Publish message to pubsub.

Args

pubsub_config : PubSubPublishConfig
Configuration for pubsub publishing.
def setup_session_end_callback(self, callback)
Expand source code
def setup_session_end_callback(self, callback):
    """
    Set up the session end callback.
    """
    existing_callback = self.on_session_end

    if existing_callback:

        def chained_callback(reason: str):
            try:
                existing_callback(reason)
            except Exception as e:
                logger.error(f"Error in existing session end callback: {e}")
            try:
                callback(reason)
            except Exception as e:
                logger.error(f"Error in new session end callback: {e}")

        self.on_session_end = chained_callback
        logger.debug("Session end callback chained with existing callback")
    else:
        self.on_session_end = callback
        logger.debug("Session end callback set up")

Set up the session end callback.

async def start_participant_recording(self, id: str)
Expand source code
async def start_participant_recording(self, id: str):
    """
    Start recording. Forward to recording_manager.
    """
    await self.recording_manager.start_participant_recording(id)

Start recording. Forward to recording_manager.

async def stop_and_merge_recordings(self)
Expand source code
async def stop_and_merge_recordings(self):
    """
    Stop and merge recordings. Forward to recording_manager.
    """
    await self.recording_manager.stop_and_merge_recordings(
        session_id=self._session_id,
        local_participant_id=self.meeting.local_participant.id,
        participants_data=self.participants_data,
        track_kinds_by_participant=self._track_recordings_kinds_by_participant,
        stop_participants_recording=self.record_audio is None,
    )

Stop and merge recordings. Forward to recording_manager.

async def stop_participant_recording(self, id: str)
Expand source code
async def stop_participant_recording(self, id: str):
    """
    Stop recording. Forward to recording_manager.
    """
    await self.recording_manager.stop_participant_recording(id)

Stop recording. Forward to recording_manager.

async def stop_participants_recording(self)
Expand source code
async def stop_participants_recording(self):
    """
    Stop recording for all participants. Forward to recording_manager.
    """
    await self.recording_manager.stop_participant_recording(self.meeting.local_participant.id)
    for participant_id in self.participants_data.keys():
        await self.recording_manager.stop_participant_recording(participant_id)

Stop recording for all participants. Forward to recording_manager.

async def subscribe_to_pubsub(self, pubsub_config: videosdk.utils.PubSubSubscribeConfig)
Expand source code
async def subscribe_to_pubsub(self, pubsub_config: PubSubSubscribeConfig):
    """
    Subscribe to pubsub messages.

    Args:
        pubsub_config (PubSubSubscribeConfig): Configuration for pubsub subscription.

    Returns:
        List of existing messages from the subscription.
    """
    old_messages = await self.meeting.pubsub.subscribe(pubsub_config)
    return old_messages

Subscribe to pubsub messages.

Args

pubsub_config : PubSubSubscribeConfig
Configuration for pubsub subscription.

Returns

List of existing messages from the subscription.

def transfer_call(self, call_id: str, transfer_to: str)
Expand source code
def transfer_call(self,call_id: str, transfer_to: str):
    """
    Transfer the call. Forward to sip_manager.
    """
    return self.sip_manager.transfer_call(call_id=call_id, transfer_to=transfer_to)

Transfer the call. Forward to sip_manager.

async def upload_file(self, base64_data, file_name)
Expand source code
async def upload_file(self, base64_data, file_name):
    """
    Upload a file to the temporary storage.

    Args:
        base64_data: Base64-encoded file data.
        file_name (str): Name of the file to upload.

    Returns:
        Upload response from VideoSDK API.
    """
    return self.meeting.upload_base64(base64_data, self.auth_token, file_name)

Upload a file to the temporary storage.

Args

base64_data
Base64-encoded file data.
file_name : str
Name of the file to upload.

Returns

Upload response from VideoSDK API.

async def wait_for_participant(self, participant_id: str | None = None) ‑> str
Expand source code
async def wait_for_participant(self, participant_id: str | None = None) -> str:
    """
    Wait for a specific participant to join.
    """
    if participant_id:
        if participant_id in self.participants_data:
            return participant_id

        if participant_id not in self._participant_joined_events:
            self._participant_joined_events[participant_id] = asyncio.Event()

        await self._participant_joined_events[participant_id].wait()

        if self._session_ended:
            return None

        return participant_id
    else:
        if self.participants_data:
            return next(iter(self.participants_data.keys()))

        await self._first_participant_event.wait()

        if self._session_ended or not self.participants_data:
            return None

        return next(iter(self.participants_data.keys()))

Wait for a specific participant to join.