Module agents.room.room
Classes
class VideoSDKHandler (*,
meeting_id: str,
auth_token: str | None = None,
name: str,
agent_participant_id: str,
agent_id: str,
pipeline: Pipeline,
loop: asyncio.events.AbstractEventLoop,
vision: bool = False,
recording: bool = False,
custom_camera_video_track=None,
custom_microphone_audio_track=None,
audio_sinks=None,
background_audio: bool = False,
on_room_error: Callable[[Any], None] | None = None,
auto_end_session: bool = True,
session_timeout_seconds: int | None = None,
on_session_end: Callable[[str], None] | None = None,
signaling_base_url: str | None = None)-
Expand source code
class VideoSDKHandler(BaseTransportHandler): """ Handles VideoSDK meeting operations and participant management. """ def __init__( self, *, meeting_id: str, auth_token: str | None = None, name: str, agent_participant_id: str, agent_id: str, pipeline: Pipeline, loop: AbstractEventLoop, vision: bool = False, recording: bool = False, custom_camera_video_track=None, custom_microphone_audio_track=None, audio_sinks=None, background_audio: bool = False, on_room_error: Optional[Callable[[Any], None]] = None, # Session management options auto_end_session: bool = True, session_timeout_seconds: Optional[int] = None, on_session_end: Optional[Callable[[str], None]] = None, # VideoSDK connection options signaling_base_url: Optional[str] = None, ): """ Initialize the VideoSDK handler. Args: meeting_id (str): Unique identifier for the meeting. auth_token (str | None, optional): Authentication token. Uses environment variable if not provided. name (str): Display name of the agent in the meeting. agent_participant_id (str): Participant ID of the agent in the meeting. pipeline (Pipeline): Audio/video processing pipeline. loop (AbstractEventLoop): Event loop for async operations. vision (bool, optional): Whether video processing is enabled. Defaults to False. recording (bool, optional): Whether recording is enabled. Defaults to False. custom_camera_video_track: Custom video track for camera input. custom_microphone_audio_track: Custom audio track for microphone input. audio_sinks: List of audio sinks for processing. background_audio (bool, optional): Whether to use background audio. Defaults to False. on_room_error (Optional[Callable[[Any], None]], optional): Error callback function. auto_end_session (bool, optional): Whether to automatically end sessions. Defaults to True. session_timeout_seconds (Optional[int], optional): Timeout for session auto-end. on_session_end (Optional[Callable[[str], None]], optional): Session end callback function. signaling_base_url (Optional[str], optional): Custom signaling server URL. Raises: ValueError: If VIDEOSDK_AUTH_TOKEN is not set in environment or parameters. """ self.meeting_id = meeting_id self.auth_token = auth_token self.name = name self.agent_id = agent_id self.agent_participant_id = agent_participant_id self.pipeline = pipeline self.loop = loop self.vision = vision self.custom_camera_video_track = custom_camera_video_track self.custom_microphone_audio_track = custom_microphone_audio_track self.audio_sinks = audio_sinks or [] self.background_audio = background_audio # Session management self.auto_end_session = auto_end_session self.session_timeout_seconds = session_timeout_seconds self.on_session_end = on_session_end self._session_ended = False self._session_end_task = None # VideoSDK connection self.signaling_base_url = signaling_base_url super().__init__(loop, pipeline) # Participant tracking self._non_agent_participant_count = 0 self._first_participant_event = asyncio.Event() self._participant_joined_events = {} # Meeting and event handling self.meeting = None self.participants_data = {} self.recorded_participants = set() self.audio_listener_tasks = {} self.video_listener_tasks = {} self._meeting_joined_data = None self.agent_meeting = None self._session_id: Optional[str] = None self._session_id_collected = False self.recording = recording self.traces_flow_manager = TracesFlowManager(room_id=self.meeting_id) cascading_metrics_collector.set_traces_flow_manager( self.traces_flow_manager) if custom_microphone_audio_track: self.audio_track = custom_microphone_audio_track if audio_sinks: if self.background_audio: self.agent_audio_track = TeeMixingCustomAudioStreamTrack( loop=self.loop, sinks=audio_sinks, pipeline=pipeline ) else: self.agent_audio_track = TeeCustomAudioStreamTrack( loop=self.loop, sinks=audio_sinks, pipeline=pipeline ) else: self.agent_audio_track = None else: if self.background_audio: self.audio_track = TeeMixingCustomAudioStreamTrack( loop=self.loop, sinks=audio_sinks, pipeline=pipeline ) else: self.audio_track = TeeCustomAudioStreamTrack( loop=self.loop, sinks=audio_sinks, pipeline=pipeline ) self.agent_audio_track = None self.auth_token = auth_token or os.getenv("VIDEOSDK_AUTH_TOKEN") if not self.auth_token: raise ValueError("VIDEOSDK_AUTH_TOKEN is not set") # Create meeting config as a dictionary instead of using MeetingConfig self.meeting_config = { "name": self.name, "participant_id": self.agent_participant_id, "meeting_id": self.meeting_id, "token": self.auth_token, "mic_enabled": True, "webcam_enabled": custom_camera_video_track is not None, "custom_microphone_audio_track": self.audio_track, "custom_camera_video_track": custom_camera_video_track, } if self.signaling_base_url is not None: self.meeting_config["signaling_base_url"] = self.signaling_base_url self.attributes = {} self.on_room_error = on_room_error self._participant_joined_events: dict[str, asyncio.Event] = {} self._left: bool = False # Session management self.auto_end_session = auto_end_session async def connect(self): """ Connect to the VideoSDK meeting. """ self.init_meeting() await self.join() async def disconnect(self): """ Disconnect from the VideoSDK meeting. """ await self.leave() def init_meeting(self): """ Initialize the VideoSDK meeting instance. """ self._left: bool = False self.sdk_metadata = { "sdk": "agents", "sdk_version": "0.0.65" } self.videosdk_meeting_meta_data= { "agent_id": self.agent_id, "agent_name": self.name, "is_videosdk_agent": True } self.meeting = VideoSDK.init_meeting( **self.meeting_config, sdk_metadata=self.sdk_metadata, meta_data=self.videosdk_meeting_meta_data) self.meeting.add_event_listener( MeetingHandler( on_meeting_joined=self.on_meeting_joined, on_meeting_left=self.on_meeting_left, on_participant_joined=self.on_participant_joined, on_participant_left=self.on_participant_left, on_error=self.on_error, ) ) async def join(self): """ Join the meeting. """ await self.meeting.async_join() async def leave(self): """ Leave the meeting and clean up resources. """ if self._left: logger.info("Meeting already left") return logger.info("Leaving meeting and cleaning up resources") self._left = True if self.recording: try: await self.stop_and_merge_recordings() except Exception as e: logger.error(f"Error stopping/merging recordings: {e}") try: if self.meeting: self.meeting.leave() except Exception as e: logger.error(f"Error leaving meeting: {e}") await self.cleanup() def on_error(self, data): """ Handle room errors. This method is called when VideoSDK encounters an error and forwards it to the configured error callback if provided. Args: data: Error data from VideoSDK. """ if self.on_room_error: self.on_room_error(data) asyncio.create_task(self._end_session("error_in_meeting")) def on_meeting_joined(self, data): """ Handle meeting join event. Args: data: Meeting join event data from VideoSDK. """ logger.info(f"Agent joined the meeting") self._meeting_joined_data = data asyncio.create_task(self._collect_session_id()) asyncio.create_task(self._collect_meeting_attributes()) if self.recording: self.recorded_participants.add(self.meeting.local_participant.id) asyncio.create_task( self.start_participant_recording( self.meeting.local_participant.id) ) def on_meeting_left(self, data): """ Handle meeting leave event. Args: data: Meeting leave event data from VideoSDK. """ logger.info(f"Meeting Left: {data}") if hasattr(self, 'participants_data') and self.participants_data: self.participants_data.clear() asyncio.create_task(self._end_session("meeting_left")) def _is_agent_participant(self, participant: Participant) -> bool: """ Internal method: Check if a participant is an agent. """ # Consider participants with names containing 'agent' or matching our agent name as agents participant_name = participant.display_name.lower() return ( "agent" in participant_name or participant_name == self.name.lower() or participant.id == self.meeting.local_participant.id if self.meeting and self.meeting.local_participant else False ) def _update_non_agent_participant_count(self): """ Internal method: Update the count of non-agent participants. """ if not self.meeting: return count = 0 for participant in self.meeting.participants.values(): if not self._is_agent_participant(participant): count += 1 self._non_agent_participant_count = count logger.debug(f"Non-agent participant count: {count}") def _cancel_session_end_task(self): """ Internal method: Cancel the session end task if it exists. """ if self._session_end_task and not self._session_end_task.done(): self._session_end_task.cancel() self._session_end_task = None async def _end_session(self, reason: str = "session_ended"): """ Internal method: End the current session. """ if self._session_ended: return self._cancel_session_end_task() logger.info(f"Ending session: {reason}") if self.on_session_end: try: self.on_session_end(reason) except Exception as e: logger.error(f"Error in session end callback: {e}") # Leave the meeting FIRST, then mark session as ended await self.leave() # Mark session as ended AFTER leaving self._session_ended = True async def force_end_session(self, reason: str = "manual_hangup") -> None: """ Public helper: forcefully end the session, bypassing participant checks. Args: reason: Reason string to propagate to session end callbacks. """ await self._end_session(reason) async def call_transfer(self, token: str, transfer_to: str) -> None: """ Transfer the call to a provided Phone number or SIP endpoint. Args: token: VideoSDK auth token. transfer_to: Phone number or SIP endpoint to transfer the call to. """ try: session_id = self._session_id room_id = self.meeting_id if not session_id: raise ValueError("Session ID is not set.") if not room_id: raise ValueError("Room ID is not set.") logger.info(f"[CALL TRANSFER] Fetching SIP call info | roomId={room_id}, sessionId={session_id}") sip_call = self.fetch_call_info(token, room_id, session_id) if not sip_call: logger.error("[CALL TRANSFER] No active SIP call found for given session ID.") raise RuntimeError("Unable to perform transfer: No active SIP call found.") call_id = sip_call["callId"] logger.info(f"[CALL TRANSFER] Found SIP Call ID: {call_id}") result = self.transfer_call( token=token, call_id=call_id, transfer_to=transfer_to ) logger.info(f"[CALL TRANSFER] Transfer successful: {result}") except Exception as e: logger.error("[CALL TRANSFER] Error occurred during call transfer", exc_info=True) raise def fetch_call_info(self, token: str, room_id: str, session_id: str): """ Fetch SIP call information for the given room, then match by sessionId. """ try: headers = {"Authorization": token} params = {"roomId": room_id} logger.info(f"[FETCH CALL INFO] Requesting call info | roomId={room_id}") response = requests.get(FETCH_CALL_INFO_URL, headers=headers, params=params) response.raise_for_status() data = response.json() calls = data.get("data", []) for call in calls: if call.get("sessionId") == session_id: logger.info(f"[FETCH CALL INFO] Matching call found: {call.get('callId')}") return call logger.warning("[FETCH CALL INFO] No SIP call matched with sessionId") return None except requests.RequestException as e: logger.error("[FETCH CALL INFO] HTTP request failed", exc_info=True) raise except Exception as e: logger.error("[FETCH CALL INFO] Unexpected error", exc_info=True) raise def transfer_call(self, token: str, call_id: str, transfer_to: str): """ Transfer the call to a new number. """ try: logger.info(f"[TRANSFER CALL] Initiating transfer | callId={call_id}, transferTo={transfer_to}") headers = { "Authorization": token, "Content-Type": "application/json" } payload = { "callId": call_id, "transferTo": transfer_to } response = requests.post(TRANSFER_CALL_URL, json=payload, headers=headers) response.raise_for_status() return response.json() except requests.RequestException as e: logger.error("[TRANSFER CALL] HTTP request failed", exc_info=True) raise except Exception as e: logger.error("[TRANSFER CALL] Unexpected error", exc_info=True) raise def setup_session_end_callback(self, callback): """ Set up the session end callback. This chains callbacks - if there's already a callback set (e.g., from worker), both will be called. Args: callback: Function to call when session ends. """ existing_callback = self.on_session_end if existing_callback: def chained_callback(reason: str): try: existing_callback(reason) except Exception as e: logger.error(f"Error in existing session end callback: {e}") try: callback(reason) except Exception as e: logger.error(f"Error in new session end callback: {e}") self.on_session_end = chained_callback logger.debug("Session end callback chained with existing callback") else: self.on_session_end = callback logger.debug("Session end callback set up") def _schedule_session_end(self, timeout_seconds: int): """ Internal method: Schedule session end after timeout. """ if self._session_end_task and not self._session_end_task.done(): self._session_end_task.cancel() self._session_end_task = asyncio.create_task( self._delayed_session_end(timeout_seconds) ) logger.info(f"Session end scheduled in {timeout_seconds} seconds") async def _delayed_session_end(self, timeout_seconds: int): """ Internal method: Delayed session end after timeout. """ await asyncio.sleep(timeout_seconds) await self._end_session("no_participants") def on_participant_joined(self, participant: Participant): """ Handle participant join event. Args: participant (Participant): The participant that joined. """ peer_name = participant.display_name self.participants_data[participant.id] = {"name": peer_name} self.participants_data[participant.id]["sipUser"] = participant.meta_data.get("sipUser", False) if participant.meta_data else False self.participants_data[participant.id]["sipCallType"] = participant.meta_data.get("callType", False) if participant.meta_data else False logger.info(f"Participant joined: {peer_name}") if self.recording and len(self.participants_data) == 1: self.recorded_participants.add(participant.id) asyncio.create_task( self.start_participant_recording(participant.id)) if participant.id in self._participant_joined_events: self._participant_joined_events[participant.id].set() if not self._first_participant_event.is_set(): self._first_participant_event.set() # Update participant count and cancel session end if participants are present self._update_non_agent_participant_count() if self._non_agent_participant_count > 0: self._cancel_session_end_task() def on_stream_enabled(stream: Stream): """ Internal method: Handle stream enabled event. """ if stream.kind == "audio": global_event_emitter.emit("AUDIO_STREAM_ENABLED", { "stream": stream, "participant": participant}) logger.info( f"Audio stream enabled for participant: {peer_name}") try: task = asyncio.create_task(self.add_audio_listener(stream)) self.audio_listener_tasks[stream.id] = task except Exception as e: logger.error(f"Error creating audio listener task: {e}") if stream.kind == "video" and self.vision: self.video_listener_tasks[stream.id] = asyncio.create_task( self.add_video_listener(stream) ) def on_stream_disabled(stream: Stream): """ Internal method: Handle stream disabled event. """ if stream.kind == "audio": audio_task = self.audio_listener_tasks[stream.id] if audio_task is not None: audio_task.cancel() del self.audio_listener_tasks[stream.id] if stream.kind == "video": video_task = self.video_listener_tasks[stream.id] if video_task is not None: video_task.cancel() del self.video_listener_tasks[stream.id] if participant.id != self.meeting.local_participant.id: participant.add_event_listener( ParticipantHandler( participant_id=participant.id, on_stream_enabled=on_stream_enabled, on_stream_disabled=on_stream_disabled, ) ) def on_participant_left(self, participant: Participant): """ Handle participant leave event. Args: participant (Participant): The participant that left. """ logger.info(f"Participant left: {participant.display_name}") if participant.id in self.audio_listener_tasks: try: self.audio_listener_tasks[participant.id].cancel() del self.audio_listener_tasks[participant.id] except Exception as e: logger.error(f"Error cancelling audio listener task for participant {participant.id}: {e}") if participant.id in self.video_listener_tasks: try: self.video_listener_tasks[participant.id].cancel() del self.video_listener_tasks[participant.id] except Exception as e: logger.error(f"Error cancelling video listener task for participant {participant.id}: {e}") global_event_emitter.emit( "PARTICIPANT_LEFT", {"participant": participant}) # Update participant count and check if session should end self._update_non_agent_participant_count() if participant.id in self.recorded_participants: logger.info(f"Recorded participant {participant.display_name} left, ending session") asyncio.create_task(self._end_session("recorded_participant_left")) return if self._non_agent_participant_count == 0 and self.auto_end_session: if self.session_timeout_seconds is not None and self.session_timeout_seconds > 0: logger.info( f"All non-agent participants have left, scheduling session end in {self.session_timeout_seconds} seconds") self._schedule_session_end(self.session_timeout_seconds) else: logger.info("All non-agent participants have left, ending session immediately") asyncio.create_task(self._end_session("all_participants_left")) async def add_audio_listener(self, stream: Stream): """ Add audio listener for a participant stream. """ while True: try: await asyncio.sleep(0.01) frame = await stream.track.recv() global_event_emitter.emit("ON_SPEECH_IN", {"frame": frame, "stream": stream}) audio_data = frame.to_ndarray()[0] pcm_frame = audio_data.flatten().astype(np.int16).tobytes() if self.pipeline: await self.pipeline.on_audio_delta(pcm_frame) else: logger.warning( "No pipeline available for audio processing") except Exception as e: logger.error(f"Audio processing error: {e}") break async def add_video_listener(self, stream: Stream): """ Add video listener for a participant stream. """ while True: try: await asyncio.sleep(0.01) frame = await stream.track.recv() if self.pipeline: await self.pipeline.on_video_delta(frame) except Exception as e: logger.error("Video processing error:", e) break async def wait_for_participant(self, participant_id: str | None = None) -> str: """ Wait for a specific participant to join, or wait for the first participant if none specified. Args: participant_id (str | None, optional): Optional participant ID to wait for. If None, waits for first participant. Returns: str: The participant ID that joined. """ if participant_id: if participant_id in self.participants_data: return participant_id if participant_id not in self._participant_joined_events: self._participant_joined_events[participant_id] = asyncio.Event( ) await self._participant_joined_events[participant_id].wait() return participant_id else: if self.participants_data: return next(iter(self.participants_data.keys())) await self._first_participant_event.wait() return next(iter(self.participants_data.keys())) async def subscribe_to_pubsub(self, pubsub_config: PubSubSubscribeConfig): """ Subscribe to pubsub messages. Args: pubsub_config (PubSubSubscribeConfig): Configuration for pubsub subscription. Returns: List of existing messages from the subscription. """ old_messages = await self.meeting.pubsub.subscribe(pubsub_config) return old_messages async def publish_to_pubsub(self, pubsub_config: PubSubPublishConfig): """ Publish message to pubsub. Args: pubsub_config (PubSubPublishConfig): Configuration for pubsub publishing. """ if self.meeting: await self.meeting.pubsub.publish(pubsub_config) async def upload_file(self, base64_data, file_name): """ Upload a file to the temporary storage. Args: base64_data: Base64-encoded file data. file_name (str): Name of the file to upload. Returns: Upload response from VideoSDK API. """ return self.meeting.upload_base64(base64_data, self.auth_token, file_name) async def fetch_file(self, url): """ Fetch a file from a URL. Args: url (str): URL of the file to fetch. Returns: Base64-encoded file data. """ return self.meeting.fetch_base64(url, self.auth_token) async def cleanup(self): """ Clean up resources. """ logger.info("Starting room cleanup") self._cancel_session_end_task() self.participants_data.clear() for task_id, task in list(self.audio_listener_tasks.items()): try: if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error cancelling audio listener task {task_id}: {e}") self.audio_listener_tasks.clear() for task_id, task in list(self.video_listener_tasks.items()): try: if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error cancelling video listener task {task_id}: {e}") self.video_listener_tasks.clear() if hasattr(self, "audio_track") and self.audio_track: try: await self.audio_track.cleanup() except Exception as e: logger.error(f"Error cleaning up audio track: {e}") self.audio_track = None if hasattr(self, "agent_audio_track") and self.agent_audio_track: try: await self.agent_audio_track.cleanup() except Exception as e: logger.error(f"Error cleaning up agent audio track: {e}") self.agent_audio_track = None if hasattr(self, "traces_flow_manager") and self.traces_flow_manager: try: self.traces_flow_manager.agent_meeting_end() except Exception as e: logger.error(f"Error ending traces flow manager: {e}") self.traces_flow_manager = None cascading_metrics_collector.set_traces_flow_manager(None) self.participants_data.clear() self.recorded_participants.clear() self._participant_joined_events.clear() self.meeting = None self.pipeline = None self.custom_camera_video_track = None self.custom_microphone_audio_track = None self.audio_sinks = None self.on_room_error = None self.on_session_end = None self._session_ended = True self._session_id = None self._session_id_collected = False self._non_agent_participant_count = 0 logger.info("Room cleanup completed") async def _collect_session_id(self) -> None: """ Internal method: Collect session ID from room and set it in metrics. """ if self.meeting and not self._session_id_collected: try: session_id = getattr(self.meeting, "session_id", None) if session_id: self._session_id = session_id cascading_metrics_collector.set_session_id(session_id) realtime_metrics_collector.set_session_id(session_id) self._session_id_collected = True if self.traces_flow_manager: self.traces_flow_manager.set_session_id(session_id) except Exception as e: logger.error(f"Error collecting session ID: {e}") async def _collect_meeting_attributes(self) -> None: """ Internal method: Collect meeting attributes and initialize telemetry. """ if not self.meeting: logger.error("Meeting not initialized") return try: if hasattr(self.meeting, "get_attributes"): attributes = self.meeting.get_attributes() if attributes: peer_id = getattr(self.meeting, "participant_id", "agent") auto_initialize_telemetry_and_logs( room_id=self.meeting_id, peer_id=peer_id, room_attributes=attributes, session_id=self._session_id, sdk_metadata=self.sdk_metadata, ) else: logger.error("No meeting attributes found") else: logger.error( "Meeting object does not have 'get_attributes' method") if self._meeting_joined_data and self.traces_flow_manager: start_time = time.perf_counter() agent_joined_attributes = { "roomId": self.meeting_id, "agent_ParticipantId": self.agent_participant_id, "sessionId": self._session_id, "agent_name": self.name, "peerId": self.meeting.local_participant.id, "sdk_metadata": self.sdk_metadata, "start_time": start_time, } self.traces_flow_manager.start_agent_joined_meeting( agent_joined_attributes ) except Exception as e: logger.error( f"Error collecting meeting attributes and creating spans: {e}") async def stop_participants_recording(self): """ Stop recording for all participants. """ for participant_id in list(self.recorded_participants): logger.info(f"stopping participant recording for id {participant_id}") await self.stop_participant_recording(participant_id) async def start_participant_recording(self, id: str): """ Start recording for a specific participant. Args: id (str): Participant ID to start recording for. """ headers = {"Authorization": self.auth_token, "Content-Type": "application/json"} response = requests.request( "POST", START_RECORDING_URL, json={"roomId": self.meeting_id, "participantId": id}, headers=headers, ) logger.info(f"starting participant recording response completed for id {id} and response{response.text}") async def stop_participant_recording(self, id: str): """ Stop recording for a specific participant. Args: id (str): Participant ID to stop recording for. """ headers = {"Authorization": self.auth_token, "Content-Type": "application/json"} response = requests.request( "POST", STOP_RECORDING_URL, json={"roomId": self.meeting_id, "participantId": id}, headers=headers, ) logger.info(f"stop participant recording response for id {id} and response{response.text}") async def merge_participant_recordings(self): """ Merge recordings from all participants. """ remote_recorded_ids = [pid for pid in self.recorded_participants if pid != self.meeting.local_participant.id] headers = {"Authorization": self.auth_token, "Content-Type": "application/json"} response = requests.request( "POST", MERGE_RECORDINGS_URL, json={ "sessionId": self.meeting.session_id, "channel1": [{"participantId": self.meeting.local_participant.id}], "channel2": [ {"participantId": participant_id} for participant_id in remote_recorded_ids ], }, headers=headers, ) logger.info(f"merging participant recordings completed response:{response.text}" ) async def stop_and_merge_recordings(self): """ Stop all recordings and merge them. """ await self.stop_participants_recording() await self.merge_participant_recordings() logger.info("stopped and merged recordings")Handles VideoSDK meeting operations and participant management.
Initialize the VideoSDK handler.
Args
meeting_id:str- Unique identifier for the meeting.
auth_token:str | None, optional- Authentication token. Uses environment variable if not provided.
name:str- Display name of the agent in the meeting.
agent_participant_id:str- Participant ID of the agent in the meeting.
pipeline:Pipeline- Audio/video processing pipeline.
loop:AbstractEventLoop- Event loop for async operations.
vision:bool, optional- Whether video processing is enabled. Defaults to False.
recording:bool, optional- Whether recording is enabled. Defaults to False.
custom_camera_video_track- Custom video track for camera input.
custom_microphone_audio_track- Custom audio track for microphone input.
audio_sinks- List of audio sinks for processing.
background_audio:bool, optional- Whether to use background audio. Defaults to False.
on_room_error:Optional[Callable[[Any], None]], optional- Error callback function.
auto_end_session:bool, optional- Whether to automatically end sessions. Defaults to True.
session_timeout_seconds:Optional[int], optional- Timeout for session auto-end.
on_session_end:Optional[Callable[[str], None]], optional- Session end callback function.
signaling_base_url:Optional[str], optional- Custom signaling server URL.
Raises
ValueError- If VIDEOSDK_AUTH_TOKEN is not set in environment or parameters.
Ancestors
- BaseTransportHandler
- abc.ABC
Methods
async def add_audio_listener(self, stream: videosdk.stream.Stream)-
Expand source code
async def add_audio_listener(self, stream: Stream): """ Add audio listener for a participant stream. """ while True: try: await asyncio.sleep(0.01) frame = await stream.track.recv() global_event_emitter.emit("ON_SPEECH_IN", {"frame": frame, "stream": stream}) audio_data = frame.to_ndarray()[0] pcm_frame = audio_data.flatten().astype(np.int16).tobytes() if self.pipeline: await self.pipeline.on_audio_delta(pcm_frame) else: logger.warning( "No pipeline available for audio processing") except Exception as e: logger.error(f"Audio processing error: {e}") breakAdd audio listener for a participant stream.
async def add_video_listener(self, stream: videosdk.stream.Stream)-
Expand source code
async def add_video_listener(self, stream: Stream): """ Add video listener for a participant stream. """ while True: try: await asyncio.sleep(0.01) frame = await stream.track.recv() if self.pipeline: await self.pipeline.on_video_delta(frame) except Exception as e: logger.error("Video processing error:", e) breakAdd video listener for a participant stream.
async def call_transfer(self, token: str, transfer_to: str) ‑> None-
Expand source code
async def call_transfer(self, token: str, transfer_to: str) -> None: """ Transfer the call to a provided Phone number or SIP endpoint. Args: token: VideoSDK auth token. transfer_to: Phone number or SIP endpoint to transfer the call to. """ try: session_id = self._session_id room_id = self.meeting_id if not session_id: raise ValueError("Session ID is not set.") if not room_id: raise ValueError("Room ID is not set.") logger.info(f"[CALL TRANSFER] Fetching SIP call info | roomId={room_id}, sessionId={session_id}") sip_call = self.fetch_call_info(token, room_id, session_id) if not sip_call: logger.error("[CALL TRANSFER] No active SIP call found for given session ID.") raise RuntimeError("Unable to perform transfer: No active SIP call found.") call_id = sip_call["callId"] logger.info(f"[CALL TRANSFER] Found SIP Call ID: {call_id}") result = self.transfer_call( token=token, call_id=call_id, transfer_to=transfer_to ) logger.info(f"[CALL TRANSFER] Transfer successful: {result}") except Exception as e: logger.error("[CALL TRANSFER] Error occurred during call transfer", exc_info=True) raiseTransfer the call to a provided Phone number or SIP endpoint.
Args
token- VideoSDK auth token.
transfer_to- Phone number or SIP endpoint to transfer the call to.
async def cleanup(self)-
Expand source code
async def cleanup(self): """ Clean up resources. """ logger.info("Starting room cleanup") self._cancel_session_end_task() self.participants_data.clear() for task_id, task in list(self.audio_listener_tasks.items()): try: if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error cancelling audio listener task {task_id}: {e}") self.audio_listener_tasks.clear() for task_id, task in list(self.video_listener_tasks.items()): try: if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error cancelling video listener task {task_id}: {e}") self.video_listener_tasks.clear() if hasattr(self, "audio_track") and self.audio_track: try: await self.audio_track.cleanup() except Exception as e: logger.error(f"Error cleaning up audio track: {e}") self.audio_track = None if hasattr(self, "agent_audio_track") and self.agent_audio_track: try: await self.agent_audio_track.cleanup() except Exception as e: logger.error(f"Error cleaning up agent audio track: {e}") self.agent_audio_track = None if hasattr(self, "traces_flow_manager") and self.traces_flow_manager: try: self.traces_flow_manager.agent_meeting_end() except Exception as e: logger.error(f"Error ending traces flow manager: {e}") self.traces_flow_manager = None cascading_metrics_collector.set_traces_flow_manager(None) self.participants_data.clear() self.recorded_participants.clear() self._participant_joined_events.clear() self.meeting = None self.pipeline = None self.custom_camera_video_track = None self.custom_microphone_audio_track = None self.audio_sinks = None self.on_room_error = None self.on_session_end = None self._session_ended = True self._session_id = None self._session_id_collected = False self._non_agent_participant_count = 0 logger.info("Room cleanup completed")Clean up resources.
async def connect(self)-
Expand source code
async def connect(self): """ Connect to the VideoSDK meeting. """ self.init_meeting() await self.join()Connect to the VideoSDK meeting.
async def disconnect(self)-
Expand source code
async def disconnect(self): """ Disconnect from the VideoSDK meeting. """ await self.leave()Disconnect from the VideoSDK meeting.
def fetch_call_info(self, token: str, room_id: str, session_id: str)-
Expand source code
def fetch_call_info(self, token: str, room_id: str, session_id: str): """ Fetch SIP call information for the given room, then match by sessionId. """ try: headers = {"Authorization": token} params = {"roomId": room_id} logger.info(f"[FETCH CALL INFO] Requesting call info | roomId={room_id}") response = requests.get(FETCH_CALL_INFO_URL, headers=headers, params=params) response.raise_for_status() data = response.json() calls = data.get("data", []) for call in calls: if call.get("sessionId") == session_id: logger.info(f"[FETCH CALL INFO] Matching call found: {call.get('callId')}") return call logger.warning("[FETCH CALL INFO] No SIP call matched with sessionId") return None except requests.RequestException as e: logger.error("[FETCH CALL INFO] HTTP request failed", exc_info=True) raise except Exception as e: logger.error("[FETCH CALL INFO] Unexpected error", exc_info=True) raiseFetch SIP call information for the given room, then match by sessionId.
async def fetch_file(self, url)-
Expand source code
async def fetch_file(self, url): """ Fetch a file from a URL. Args: url (str): URL of the file to fetch. Returns: Base64-encoded file data. """ return self.meeting.fetch_base64(url, self.auth_token)Fetch a file from a URL.
Args
url:str- URL of the file to fetch.
Returns
Base64-encoded file data.
async def force_end_session(self, reason: str = 'manual_hangup') ‑> None-
Expand source code
async def force_end_session(self, reason: str = "manual_hangup") -> None: """ Public helper: forcefully end the session, bypassing participant checks. Args: reason: Reason string to propagate to session end callbacks. """ await self._end_session(reason)Public helper: forcefully end the session, bypassing participant checks.
Args
reason- Reason string to propagate to session end callbacks.
def init_meeting(self)-
Expand source code
def init_meeting(self): """ Initialize the VideoSDK meeting instance. """ self._left: bool = False self.sdk_metadata = { "sdk": "agents", "sdk_version": "0.0.65" } self.videosdk_meeting_meta_data= { "agent_id": self.agent_id, "agent_name": self.name, "is_videosdk_agent": True } self.meeting = VideoSDK.init_meeting( **self.meeting_config, sdk_metadata=self.sdk_metadata, meta_data=self.videosdk_meeting_meta_data) self.meeting.add_event_listener( MeetingHandler( on_meeting_joined=self.on_meeting_joined, on_meeting_left=self.on_meeting_left, on_participant_joined=self.on_participant_joined, on_participant_left=self.on_participant_left, on_error=self.on_error, ) )Initialize the VideoSDK meeting instance.
async def join(self)-
Expand source code
async def join(self): """ Join the meeting. """ await self.meeting.async_join()Join the meeting.
async def leave(self)-
Expand source code
async def leave(self): """ Leave the meeting and clean up resources. """ if self._left: logger.info("Meeting already left") return logger.info("Leaving meeting and cleaning up resources") self._left = True if self.recording: try: await self.stop_and_merge_recordings() except Exception as e: logger.error(f"Error stopping/merging recordings: {e}") try: if self.meeting: self.meeting.leave() except Exception as e: logger.error(f"Error leaving meeting: {e}") await self.cleanup()Leave the meeting and clean up resources.
async def merge_participant_recordings(self)-
Expand source code
async def merge_participant_recordings(self): """ Merge recordings from all participants. """ remote_recorded_ids = [pid for pid in self.recorded_participants if pid != self.meeting.local_participant.id] headers = {"Authorization": self.auth_token, "Content-Type": "application/json"} response = requests.request( "POST", MERGE_RECORDINGS_URL, json={ "sessionId": self.meeting.session_id, "channel1": [{"participantId": self.meeting.local_participant.id}], "channel2": [ {"participantId": participant_id} for participant_id in remote_recorded_ids ], }, headers=headers, ) logger.info(f"merging participant recordings completed response:{response.text}" )Merge recordings from all participants.
def on_error(self, data)-
Expand source code
def on_error(self, data): """ Handle room errors. This method is called when VideoSDK encounters an error and forwards it to the configured error callback if provided. Args: data: Error data from VideoSDK. """ if self.on_room_error: self.on_room_error(data) asyncio.create_task(self._end_session("error_in_meeting"))Handle room errors.
This method is called when VideoSDK encounters an error and forwards it to the configured error callback if provided.
Args
data- Error data from VideoSDK.
def on_meeting_joined(self, data)-
Expand source code
def on_meeting_joined(self, data): """ Handle meeting join event. Args: data: Meeting join event data from VideoSDK. """ logger.info(f"Agent joined the meeting") self._meeting_joined_data = data asyncio.create_task(self._collect_session_id()) asyncio.create_task(self._collect_meeting_attributes()) if self.recording: self.recorded_participants.add(self.meeting.local_participant.id) asyncio.create_task( self.start_participant_recording( self.meeting.local_participant.id) )Handle meeting join event.
Args
data- Meeting join event data from VideoSDK.
def on_meeting_left(self, data)-
Expand source code
def on_meeting_left(self, data): """ Handle meeting leave event. Args: data: Meeting leave event data from VideoSDK. """ logger.info(f"Meeting Left: {data}") if hasattr(self, 'participants_data') and self.participants_data: self.participants_data.clear() asyncio.create_task(self._end_session("meeting_left"))Handle meeting leave event.
Args
data- Meeting leave event data from VideoSDK.
def on_participant_joined(self, participant: videosdk.participant.Participant)-
Expand source code
def on_participant_joined(self, participant: Participant): """ Handle participant join event. Args: participant (Participant): The participant that joined. """ peer_name = participant.display_name self.participants_data[participant.id] = {"name": peer_name} self.participants_data[participant.id]["sipUser"] = participant.meta_data.get("sipUser", False) if participant.meta_data else False self.participants_data[participant.id]["sipCallType"] = participant.meta_data.get("callType", False) if participant.meta_data else False logger.info(f"Participant joined: {peer_name}") if self.recording and len(self.participants_data) == 1: self.recorded_participants.add(participant.id) asyncio.create_task( self.start_participant_recording(participant.id)) if participant.id in self._participant_joined_events: self._participant_joined_events[participant.id].set() if not self._first_participant_event.is_set(): self._first_participant_event.set() # Update participant count and cancel session end if participants are present self._update_non_agent_participant_count() if self._non_agent_participant_count > 0: self._cancel_session_end_task() def on_stream_enabled(stream: Stream): """ Internal method: Handle stream enabled event. """ if stream.kind == "audio": global_event_emitter.emit("AUDIO_STREAM_ENABLED", { "stream": stream, "participant": participant}) logger.info( f"Audio stream enabled for participant: {peer_name}") try: task = asyncio.create_task(self.add_audio_listener(stream)) self.audio_listener_tasks[stream.id] = task except Exception as e: logger.error(f"Error creating audio listener task: {e}") if stream.kind == "video" and self.vision: self.video_listener_tasks[stream.id] = asyncio.create_task( self.add_video_listener(stream) ) def on_stream_disabled(stream: Stream): """ Internal method: Handle stream disabled event. """ if stream.kind == "audio": audio_task = self.audio_listener_tasks[stream.id] if audio_task is not None: audio_task.cancel() del self.audio_listener_tasks[stream.id] if stream.kind == "video": video_task = self.video_listener_tasks[stream.id] if video_task is not None: video_task.cancel() del self.video_listener_tasks[stream.id] if participant.id != self.meeting.local_participant.id: participant.add_event_listener( ParticipantHandler( participant_id=participant.id, on_stream_enabled=on_stream_enabled, on_stream_disabled=on_stream_disabled, ) )Handle participant join event.
Args
participant:Participant- The participant that joined.
def on_participant_left(self, participant: videosdk.participant.Participant)-
Expand source code
def on_participant_left(self, participant: Participant): """ Handle participant leave event. Args: participant (Participant): The participant that left. """ logger.info(f"Participant left: {participant.display_name}") if participant.id in self.audio_listener_tasks: try: self.audio_listener_tasks[participant.id].cancel() del self.audio_listener_tasks[participant.id] except Exception as e: logger.error(f"Error cancelling audio listener task for participant {participant.id}: {e}") if participant.id in self.video_listener_tasks: try: self.video_listener_tasks[participant.id].cancel() del self.video_listener_tasks[participant.id] except Exception as e: logger.error(f"Error cancelling video listener task for participant {participant.id}: {e}") global_event_emitter.emit( "PARTICIPANT_LEFT", {"participant": participant}) # Update participant count and check if session should end self._update_non_agent_participant_count() if participant.id in self.recorded_participants: logger.info(f"Recorded participant {participant.display_name} left, ending session") asyncio.create_task(self._end_session("recorded_participant_left")) return if self._non_agent_participant_count == 0 and self.auto_end_session: if self.session_timeout_seconds is not None and self.session_timeout_seconds > 0: logger.info( f"All non-agent participants have left, scheduling session end in {self.session_timeout_seconds} seconds") self._schedule_session_end(self.session_timeout_seconds) else: logger.info("All non-agent participants have left, ending session immediately") asyncio.create_task(self._end_session("all_participants_left"))Handle participant leave event.
Args
participant:Participant- The participant that left.
async def publish_to_pubsub(self, pubsub_config: videosdk.utils.PubSubPublishConfig)-
Expand source code
async def publish_to_pubsub(self, pubsub_config: PubSubPublishConfig): """ Publish message to pubsub. Args: pubsub_config (PubSubPublishConfig): Configuration for pubsub publishing. """ if self.meeting: await self.meeting.pubsub.publish(pubsub_config)Publish message to pubsub.
Args
pubsub_config:PubSubPublishConfig- Configuration for pubsub publishing.
def setup_session_end_callback(self, callback)-
Expand source code
def setup_session_end_callback(self, callback): """ Set up the session end callback. This chains callbacks - if there's already a callback set (e.g., from worker), both will be called. Args: callback: Function to call when session ends. """ existing_callback = self.on_session_end if existing_callback: def chained_callback(reason: str): try: existing_callback(reason) except Exception as e: logger.error(f"Error in existing session end callback: {e}") try: callback(reason) except Exception as e: logger.error(f"Error in new session end callback: {e}") self.on_session_end = chained_callback logger.debug("Session end callback chained with existing callback") else: self.on_session_end = callback logger.debug("Session end callback set up")Set up the session end callback.
This chains callbacks - if there's already a callback set (e.g., from worker), both will be called.
Args
callback- Function to call when session ends.
async def start_participant_recording(self, id: str)-
Expand source code
async def start_participant_recording(self, id: str): """ Start recording for a specific participant. Args: id (str): Participant ID to start recording for. """ headers = {"Authorization": self.auth_token, "Content-Type": "application/json"} response = requests.request( "POST", START_RECORDING_URL, json={"roomId": self.meeting_id, "participantId": id}, headers=headers, ) logger.info(f"starting participant recording response completed for id {id} and response{response.text}")Start recording for a specific participant.
Args
id:str- Participant ID to start recording for.
async def stop_and_merge_recordings(self)-
Expand source code
async def stop_and_merge_recordings(self): """ Stop all recordings and merge them. """ await self.stop_participants_recording() await self.merge_participant_recordings() logger.info("stopped and merged recordings")Stop all recordings and merge them.
async def stop_participant_recording(self, id: str)-
Expand source code
async def stop_participant_recording(self, id: str): """ Stop recording for a specific participant. Args: id (str): Participant ID to stop recording for. """ headers = {"Authorization": self.auth_token, "Content-Type": "application/json"} response = requests.request( "POST", STOP_RECORDING_URL, json={"roomId": self.meeting_id, "participantId": id}, headers=headers, ) logger.info(f"stop participant recording response for id {id} and response{response.text}")Stop recording for a specific participant.
Args
id:str- Participant ID to stop recording for.
async def stop_participants_recording(self)-
Expand source code
async def stop_participants_recording(self): """ Stop recording for all participants. """ for participant_id in list(self.recorded_participants): logger.info(f"stopping participant recording for id {participant_id}") await self.stop_participant_recording(participant_id)Stop recording for all participants.
async def subscribe_to_pubsub(self, pubsub_config: videosdk.utils.PubSubSubscribeConfig)-
Expand source code
async def subscribe_to_pubsub(self, pubsub_config: PubSubSubscribeConfig): """ Subscribe to pubsub messages. Args: pubsub_config (PubSubSubscribeConfig): Configuration for pubsub subscription. Returns: List of existing messages from the subscription. """ old_messages = await self.meeting.pubsub.subscribe(pubsub_config) return old_messagesSubscribe to pubsub messages.
Args
pubsub_config:PubSubSubscribeConfig- Configuration for pubsub subscription.
Returns
List of existing messages from the subscription.
def transfer_call(self, token: str, call_id: str, transfer_to: str)-
Expand source code
def transfer_call(self, token: str, call_id: str, transfer_to: str): """ Transfer the call to a new number. """ try: logger.info(f"[TRANSFER CALL] Initiating transfer | callId={call_id}, transferTo={transfer_to}") headers = { "Authorization": token, "Content-Type": "application/json" } payload = { "callId": call_id, "transferTo": transfer_to } response = requests.post(TRANSFER_CALL_URL, json=payload, headers=headers) response.raise_for_status() return response.json() except requests.RequestException as e: logger.error("[TRANSFER CALL] HTTP request failed", exc_info=True) raise except Exception as e: logger.error("[TRANSFER CALL] Unexpected error", exc_info=True) raiseTransfer the call to a new number.
async def upload_file(self, base64_data, file_name)-
Expand source code
async def upload_file(self, base64_data, file_name): """ Upload a file to the temporary storage. Args: base64_data: Base64-encoded file data. file_name (str): Name of the file to upload. Returns: Upload response from VideoSDK API. """ return self.meeting.upload_base64(base64_data, self.auth_token, file_name)Upload a file to the temporary storage.
Args
base64_data- Base64-encoded file data.
file_name:str- Name of the file to upload.
Returns
Upload response from VideoSDK API.
async def wait_for_participant(self, participant_id: str | None = None) ‑> str-
Expand source code
async def wait_for_participant(self, participant_id: str | None = None) -> str: """ Wait for a specific participant to join, or wait for the first participant if none specified. Args: participant_id (str | None, optional): Optional participant ID to wait for. If None, waits for first participant. Returns: str: The participant ID that joined. """ if participant_id: if participant_id in self.participants_data: return participant_id if participant_id not in self._participant_joined_events: self._participant_joined_events[participant_id] = asyncio.Event( ) await self._participant_joined_events[participant_id].wait() return participant_id else: if self.participants_data: return next(iter(self.participants_data.keys())) await self._first_participant_event.wait() return next(iter(self.participants_data.keys()))Wait for a specific participant to join, or wait for the first participant if none specified.
Args
participant_id:str | None, optional- Optional participant ID to wait for. If None, waits for first participant.
Returns
str- The participant ID that joined.