Module agents.room.room
Classes
class VideoSDKHandler (*,
meeting_id: str,
auth_token: str | None = None,
name: str,
agent_participant_id: str,
agent_id: str,
pipeline: Pipeline,
loop: asyncio.events.AbstractEventLoop,
vision: bool = False,
recording: bool = False,
record_audio: bool | None = None,
record_screen_share: bool = True,
custom_camera_video_track=None,
custom_microphone_audio_track=None,
audio_sinks=None,
background_audio: bool = False,
on_room_error: Callable[[Any], None] | None = None,
auto_end_session: bool = True,
session_timeout_seconds: int | None = None,
no_participant_timeout_seconds: int | None = 90,
on_session_end: Callable[[str], None] | None = None,
signaling_base_url: str | None = None,
job_logger=None,
traces_options=None,
metrics_options=None,
logs_options=None,
avatar_participant_id: str | None = None)-
Expand source code
class VideoSDKHandler(BaseTransportHandler): """ Handles VideoSDK meeting operations and participant management. """ def __init__( self, *, meeting_id: str, auth_token: str | None = None, name: str, agent_participant_id: str, agent_id: str, pipeline: "Pipeline", loop: AbstractEventLoop, vision: bool = False, recording: bool = False, record_audio: Optional[bool] = None, record_screen_share: bool = True, custom_camera_video_track=None, custom_microphone_audio_track=None, audio_sinks=None, background_audio: bool = False, on_room_error: Optional[Callable[[Any], None]] = None, # Session management options auto_end_session: bool = True, session_timeout_seconds: Optional[int] = None, no_participant_timeout_seconds: Optional[int] = 90, on_session_end: Optional[Callable[[str], None]] = None, # VideoSDK connection options signaling_base_url: Optional[str] = None, job_logger=None, traces_options=None, metrics_options=None, logs_options=None, # Avatar options avatar_participant_id: Optional[str] = None, ): """ Initialize the VideoSDK handler. Args: meeting_id (str): Unique identifier for the meeting. auth_token (str | None, optional): Authentication token. Uses environment variable if not provided. name (str): Display name of the agent in the meeting. agent_participant_id (str): Participant ID of the agent in the meeting. pipeline (Pipeline): Audio/video processing pipeline. loop (AbstractEventLoop): Event loop for async operations. vision (bool, optional): Whether video processing is enabled. Defaults to False. recording (bool, optional): Whether recording is enabled. Defaults to False. custom_camera_video_track: Custom video track for camera input. custom_microphone_audio_track: Custom audio track for microphone input. audio_sinks: List of audio sinks for processing. background_audio (bool, optional): Whether to use background audio. Defaults to False. on_room_error (Optional[Callable[[Any], None]], optional): Error callback function. auto_end_session (bool, optional): Whether to automatically end sessions. Defaults to True. session_timeout_seconds (Optional[int], optional): Timeout for session auto-end after participants leave. no_participant_timeout_seconds (Optional[int], optional): Timeout to end session if no participant joins after agent connects. on_session_end (Optional[Callable[[str], None]], optional): Session end callback function. signaling_base_url (Optional[str], optional): Custom signaling server URL. Raises: ValueError: If VIDEOSDK_AUTH_TOKEN is not set in environment or parameters. """ self.meeting_id = meeting_id self.auth_token = auth_token or os.getenv("VIDEOSDK_AUTH_TOKEN") if not self.auth_token: raise ValueError("VIDEOSDK_AUTH_TOKEN is not set") self.name = name self.agent_id = agent_id self.agent_participant_id = agent_participant_id self.pipeline = pipeline self.loop = loop self.vision = vision self.custom_camera_video_track = custom_camera_video_track self.custom_microphone_audio_track = custom_microphone_audio_track self.audio_sinks = audio_sinks or [] self.background_audio = background_audio self._avatar_participant_id = avatar_participant_id # Managers self.input_stream_manager = InputStreamManager(pipeline=pipeline) self.sip_manager = SIPManager(room_id=meeting_id, auth_token=self.auth_token) self.recording_manager = RecordingManager(room_id=meeting_id, auth_token=self.auth_token) self.transport_event_sender = None # Session management self.auto_end_session = auto_end_session self.session_timeout_seconds = session_timeout_seconds self.no_participant_timeout_seconds = no_participant_timeout_seconds self.on_session_end = on_session_end self._session_ended = False self._session_end_task = None self._no_participant_timeout_task = None # VideoSDK connection self.signaling_base_url = signaling_base_url self.traces_options = traces_options self.metrics_options = metrics_options self.logs_options = logs_options super().__init__(loop, pipeline) # Participant tracking self._non_agent_participant_count = 0 self._first_participant_event = asyncio.Event() self._participant_joined_events = {} # Meeting and event handling self.meeting = None self.participants_data = {} self.audio_listener_tasks = self.input_stream_manager.audio_listener_tasks self.video_listener_tasks = self.input_stream_manager.video_listener_tasks self._meeting_joined_data = None self.agent_meeting = None self._session_id: Optional[str] = None self._session_id_collected = False self.recording = recording self.record_audio = record_audio self.record_screen_share = record_screen_share self._track_recordings_kinds_by_participant: dict[str, set[str]] = {} self._participant_recording_has_share_at_start: dict[str, bool] = {} self.recorded_participants: set = set() self.traces_flow_manager = TracesFlowManager(room_id=self.meeting_id) metrics_collector.set_traces_flow_manager( self.traces_flow_manager) if custom_microphone_audio_track: self.audio_track = custom_microphone_audio_track if audio_sinks: if self.background_audio: self.agent_audio_track = TeeMixingCustomAudioStreamTrack( loop=self.loop, sinks=audio_sinks, pipeline=pipeline ) else: self.agent_audio_track = TeeCustomAudioStreamTrack( loop=self.loop, sinks=audio_sinks, pipeline=pipeline ) else: self.agent_audio_track = None else: if self.background_audio: self.audio_track = TeeMixingCustomAudioStreamTrack( loop=self.loop, sinks=audio_sinks, pipeline=pipeline ) else: self.audio_track = TeeCustomAudioStreamTrack( loop=self.loop, sinks=audio_sinks, pipeline=pipeline ) self.agent_audio_track = None # Create meeting config self.meeting_config = { "name": self.name, "participant_id": self.agent_participant_id, "meeting_id": self.meeting_id, "token": self.auth_token, "mic_enabled": True, "webcam_enabled": custom_camera_video_track is not None, "custom_microphone_audio_track": self.audio_track, "custom_camera_video_track": custom_camera_video_track, "peer_type": "agent", } if self.signaling_base_url is not None: self.meeting_config["signaling_base_url"] = self.signaling_base_url self.attributes = {} self.on_room_error = on_room_error self._participant_joined_events: dict[str, asyncio.Event] = {} self._left: bool = False self._job_logger = job_logger async def connect(self): """ Connect to the VideoSDK meeting. """ self.init_meeting() await self.join() async def disconnect(self): """ Disconnect from the VideoSDK meeting. """ await self.leave() def init_meeting(self): """ Initialize the VideoSDK meeting instance. """ self._left: bool = False self.sdk_metadata = { "sdk": "agents", "sdk_version": "1.0.0" } self.videosdk_meeting_meta_data= { "agent_id": self.agent_id, "agent_name": self.name, "is_videosdk_agent": True, } self.meeting = VideoSDK.init_meeting( **self.meeting_config, sdk_metadata=self.sdk_metadata, meta_data=self.videosdk_meeting_meta_data, ) self.meeting.add_event_listener( MeetingHandler( on_meeting_joined=self.on_meeting_joined, on_meeting_left=self.on_meeting_left, on_participant_joined=self.on_participant_joined, on_participant_left=self.on_participant_left, on_error=self.on_error, on_agent_joined=self.on_agent_joined, on_agent_left=self.on_agent_left, ) ) async def join(self): """ Join the meeting. """ await self.meeting.async_join() async def leave(self): """ Leave the meeting and clean up resources. """ if self._left: logger.info("Meeting already left") return logger.info("Leaving meeting and cleaning up resources") self._left = True if self.recording: try: await self.stop_and_merge_recordings() except Exception as e: logger.error(f"Error stopping/merging recordings: {e}") try: if self.meeting: self.meeting.leave() except Exception as e: logger.error(f"Error leaving meeting: {e}") await self.cleanup() def on_error(self, data): """ Handle room errors. This method is called when VideoSDK encounters an error and forwards it to the configured error callback if provided. Args: data: Error data from VideoSDK. """ if self.on_room_error: self.on_room_error(data) asyncio.create_task(self._end_session("error_in_meeting")) def on_meeting_joined(self, data): """ Handle meeting join event. Args: data: Meeting join event data from VideoSDK. """ logger.info(f"Agent joined the meeting") self._meeting_joined_data = data # Notify JobContext that meeting has been joined from ..job import get_current_job_context job_ctx = get_current_job_context() if job_ctx: job_ctx.notify_meeting_joined() asyncio.create_task(self._collect_session_id()) asyncio.create_task(self._collect_meeting_attributes()) if self.no_participant_timeout_seconds is not None and self.no_participant_timeout_seconds > 0: self._no_participant_timeout_task = asyncio.create_task( self._no_participant_timeout_handler() ) logger.info( f"No-participant timeout started: {self.no_participant_timeout_seconds}s" ) if self.recording: self.recorded_participants.add(self.meeting.local_participant.id) asyncio.create_task( self._start_base_recording_for_participant( self.meeting.local_participant.id, self.meeting.local_participant, ) ) def on_meeting_left(self, data): """ Handle meeting leave event. Args: data: Meeting leave event data from VideoSDK. """ logger.info(f"Meeting Left: {data}") if hasattr(self, "participants_data") and self.participants_data: self.participants_data.clear() asyncio.create_task(self._end_session("meeting_left")) def _is_agent_participant(self, participant: Participant) -> bool: """ Internal method: Check if a participant is an agent (or Avatar Server). """ # Avatar Server participant — identified by pre-registered participant_id if self._avatar_participant_id and participant.id == self._avatar_participant_id: return True participant_name = participant.display_name.lower() return ( "agent" in participant_name or participant_name == self.name.lower() or participant.id == self.meeting.local_participant.id if self.meeting and self.meeting.local_participant else False ) def _participant_has_share_stream(self, participant: Participant) -> bool: """True if participant already has a share stream (avoids duplicate screen track recording).""" streams = getattr(participant, "streams", None) if not streams: return False try: vals = streams.values() if isinstance(streams, dict) else streams return any(getattr(s, "kind", None) == "share" for s in vals) except Exception: return False async def _start_track_recording_kind(self, participant_id: str, kind: str) -> None: kinds = self._track_recordings_kinds_by_participant.setdefault(participant_id, set()) if kind in kinds: return await self.recording_manager.start_track_recording(participant_id=participant_id, kind=kind) kinds.add(kind) async def _start_screen_track_recordings(self, participant_id: str) -> None: if not self.record_screen_share: return for kind in _SCREEN_RECORDING_KINDS: await self._start_track_recording_kind(participant_id, kind) async def _start_base_recording_for_participant( self, participant_id: str, participant_obj: Participant | None = None ) -> None: """Participant composite API (record_audio is None) or track API (audio/video) + optional screen at join.""" if self.record_audio is None: has_share = ( self._participant_has_share_stream(participant_obj) if participant_obj is not None else False ) self._participant_recording_has_share_at_start[participant_id] = has_share await self.recording_manager.start_participant_recording(participant_id) return await self._start_track_recording_kind( participant_id, "audio" if self.record_audio else "video" ) if participant_obj and self._participant_has_share_stream(participant_obj): await self._start_screen_track_recordings(participant_id) async def _maybe_start_screen_track_recording(self, participant: Participant) -> None: """When share starts later: skip if participant recording already included share at start.""" pid = participant.id if self.record_audio is None and self._participant_recording_has_share_at_start.get(pid, False): return await self._start_screen_track_recordings(pid) def _update_non_agent_participant_count(self): """ Internal method: Update the count of non-agent participants. """ if not self.meeting: return count = 0 for participant in self.meeting.participants.values(): if not self._is_agent_participant(participant): count += 1 self._non_agent_participant_count = count logger.debug(f"Non-agent participant count: {count}") def _cancel_session_end_task(self): """ Internal method: Cancel the session end task if it exists. """ if self._session_end_task and not self._session_end_task.done(): self._session_end_task.cancel() self._session_end_task = None async def _end_session(self, reason: str = "session_ended"): """ Internal method: End the current session. """ if self._session_ended: return self._cancel_session_end_task() logger.info(f"Ending session: {reason}") if self.on_session_end: try: self.on_session_end(reason) except Exception as e: logger.error(f"Error in session end callback: {e}") await self.leave() self._session_ended = True if not self._first_participant_event.is_set(): self._first_participant_event.set() for evt in self._participant_joined_events.values(): if not evt.is_set(): evt.set() async def force_end_session(self, reason: str = "manual_hangup") -> None: """ Public helper: forcefully end the session, bypassing participant checks. Args: reason: Reason string to propagate to session end callbacks. """ await self._end_session(reason) async def call_transfer(self,transfer_to: str) -> None: """ Transfer the call to a provided Phone number or SIP endpoint. """ await self.sip_manager.call_transfer(session_id=self._session_id, transfer_to=transfer_to) def fetch_call_info(self,session_id: str): """ Fetch SIP call information. Forward to sip_manager. """ return self.sip_manager.fetch_call_info(session_id=session_id) def transfer_call(self,call_id: str, transfer_to: str): """ Transfer the call. Forward to sip_manager. """ return self.sip_manager.transfer_call(call_id=call_id, transfer_to=transfer_to) def setup_session_end_callback(self, callback): """ Set up the session end callback. """ existing_callback = self.on_session_end if existing_callback: def chained_callback(reason: str): try: existing_callback(reason) except Exception as e: logger.error(f"Error in existing session end callback: {e}") try: callback(reason) except Exception as e: logger.error(f"Error in new session end callback: {e}") self.on_session_end = chained_callback logger.debug("Session end callback chained with existing callback") else: self.on_session_end = callback logger.debug("Session end callback set up") def _schedule_session_end(self, timeout_seconds: int): """ Internal method: Schedule session end after timeout. """ if self._session_end_task and not self._session_end_task.done(): self._session_end_task.cancel() self._session_end_task = asyncio.create_task( self._delayed_session_end(timeout_seconds) ) logger.info(f"Session end scheduled in {timeout_seconds} seconds") async def _delayed_session_end(self, timeout_seconds: int): """ Internal method: Delayed session end after timeout. """ await asyncio.sleep(timeout_seconds) await self._end_session("no_participants") async def _no_participant_timeout_handler(self): """ Internal method: End session if no participant joins within the configured timeout. """ await asyncio.sleep(self.no_participant_timeout_seconds) if self._non_agent_participant_count == 0: logger.info( f"No participant joined within {self.no_participant_timeout_seconds}s, ending session" ) await self._end_session("no_participant_joined") def on_participant_joined(self, participant: Participant): """ Handle participant join event. Args: participant (Participant): The participant that joined. """ peer_name = participant.display_name self.participants_data[participant.id] = {"name": peer_name} self.participants_data[participant.id]["sipUser"] = ( participant.meta_data.get("sipUser", False) if participant.meta_data else False ) self.participants_data[participant.id]["sipCallType"] = ( participant.meta_data.get("callType", False) if participant.meta_data else False ) self.participants_data[participant.id]["enable_agent_events"] = ( participant.meta_data.get("enableAgentEvents", False) if participant.meta_data else False ) if self.participants_data[participant.id]["enable_agent_events"] and self.transport_event_sender is None: logger.info("Initializing transport_event_sender because participant has enableAgentEvents=True") self.transport_event_sender = TransportEventSender(self) logger.info(f"Participant joined: {peer_name}") if self._is_agent_participant(participant): logger.info(f"Skipping agent/avatar participant in on_participant_joined: {participant.id}") return sip_user_flag = self.participants_data[participant.id]["sipUser"] metrics_collector.add_participant_metrics( participant_id=participant.id, kind="user", sip_user=sip_user_flag, join_time=time.time(), meta=self.participants_data[participant.id], ) if self.recording and len(self.participants_data) == 1: self.recorded_participants.add(participant.id) asyncio.create_task( self._start_base_recording_for_participant(participant.id, participant) ) if participant.id in self._participant_joined_events: self._participant_joined_events[participant.id].set() if not self._first_participant_event.is_set(): self._first_participant_event.set() if self._no_participant_timeout_task and not self._no_participant_timeout_task.done(): self._no_participant_timeout_task.cancel() self._no_participant_timeout_task = None logger.info("No-participant timeout cancelled: participant joined") # Update participant count and cancel session end if participants are present self._update_non_agent_participant_count() if self._non_agent_participant_count > 0: self._cancel_session_end_task() def on_stream_enabled(stream: Stream): """ Internal method: Handle stream enabled event. """ if stream.kind == "audio": global_event_emitter.emit( "AUDIO_STREAM_ENABLED", {"stream": stream, "participant": participant}, ) logger.info(f"Audio stream enabled for participant: {peer_name}") try: task = asyncio.create_task(self.add_audio_listener(stream)) self.audio_listener_tasks[stream.id] = task except Exception as e: logger.error(f"Error creating audio listener task: {e}") if stream.kind == "share" and self.recording: asyncio.create_task(self._maybe_start_screen_track_recording(participant)) if stream.kind in ("video", "share") and self.vision: logger.info(f"{stream.kind} stream enabled for participant: {peer_name}") self.video_listener_tasks[stream.id] = asyncio.create_task( self.add_video_listener(stream) ) def on_stream_disabled(stream: Stream): """ Internal method: Handle stream disabled event. """ if stream.kind == "audio": audio_task = self.audio_listener_tasks.get(stream.id) if audio_task is not None: audio_task.cancel() del self.audio_listener_tasks[stream.id] if stream.kind in ("video", "share"): video_task = self.video_listener_tasks.get(stream.id) if video_task is not None: video_task.cancel() del self.video_listener_tasks[stream.id] if participant.id != self.meeting.local_participant.id: participant.add_event_listener( ParticipantHandler( participant_id=participant.id, on_stream_enabled=on_stream_enabled, on_stream_disabled=on_stream_disabled, ) ) def on_participant_left(self, participant: Participant): """ Handle participant leave event. Args: participant (Participant): The participant that left. """ logger.info(f"Participant left: {participant.display_name}") if participant.id in self.audio_listener_tasks: try: self.audio_listener_tasks[participant.id].cancel() del self.audio_listener_tasks[participant.id] except Exception as e: logger.error( f"Error cancelling audio listener task for participant {participant.id}: {e}" ) if participant.id in self.video_listener_tasks: try: self.video_listener_tasks[participant.id].cancel() del self.video_listener_tasks[participant.id] except Exception as e: logger.error( f"Error cancelling video listener task for participant {participant.id}: {e}" ) global_event_emitter.emit("PARTICIPANT_LEFT", {"participant": participant}) # Update participant count and check if session should end self._update_non_agent_participant_count() if participant.id in self.recorded_participants: logger.info( f"Recorded participant {participant.display_name} left, ending session" ) asyncio.create_task(self._end_session("recorded_participant_left")) return if self._non_agent_participant_count == 0 and self.auto_end_session: if ( self.session_timeout_seconds is not None and self.session_timeout_seconds > 0 ): logger.info( f"All non-agent participants have left, scheduling session end in {self.session_timeout_seconds} seconds" ) self._schedule_session_end(self.session_timeout_seconds) else: logger.info( "All non-agent participants have left, ending session immediately" ) asyncio.create_task(self._end_session("all_participants_left")) def on_agent_joined(self, agent: TransportAgent): """ Handle remote agent join event. Args: agent (TransportAgent): The agent that joined. """ logger.info(f"Remote agent joined: {agent.display_name} ({agent.id})") def on_agent_left(self, agent: TransportAgent): """ Handle remote agent leave event. Args: agent (TransportAgent): The agent that left. """ logger.info(f"Remote agent left: {agent.display_name} ({agent.id})") async def add_audio_listener(self, stream: Stream): """ Forward to input_stream_manager. """ await self.input_stream_manager.add_audio_listener(stream) async def add_video_listener(self, stream: Stream): """ Forward to input_stream_manager. """ await self.input_stream_manager.add_video_listener(stream) async def wait_for_participant(self, participant_id: str | None = None) -> str: """ Wait for a specific participant to join. """ if participant_id: if participant_id in self.participants_data: return participant_id if participant_id not in self._participant_joined_events: self._participant_joined_events[participant_id] = asyncio.Event() await self._participant_joined_events[participant_id].wait() if self._session_ended: return None return participant_id else: if self.participants_data: return next(iter(self.participants_data.keys())) await self._first_participant_event.wait() if self._session_ended or not self.participants_data: return None return next(iter(self.participants_data.keys())) async def subscribe_to_pubsub(self, pubsub_config: PubSubSubscribeConfig): """ Subscribe to pubsub messages. Args: pubsub_config (PubSubSubscribeConfig): Configuration for pubsub subscription. Returns: List of existing messages from the subscription. """ old_messages = await self.meeting.pubsub.subscribe(pubsub_config) return old_messages async def publish_to_pubsub(self, pubsub_config: PubSubPublishConfig): """ Publish message to pubsub. Args: pubsub_config (PubSubPublishConfig): Configuration for pubsub publishing. """ if self.meeting: await self.meeting.pubsub.publish(pubsub_config) async def upload_file(self, base64_data, file_name): """ Upload a file to the temporary storage. Args: base64_data: Base64-encoded file data. file_name (str): Name of the file to upload. Returns: Upload response from VideoSDK API. """ return self.meeting.upload_base64(base64_data, self.auth_token, file_name) async def fetch_file(self, url): """ Fetch a file from a URL. Args: url (str): URL of the file to fetch. Returns: Base64-encoded file data. """ return self.meeting.fetch_base64(url, self.auth_token) async def cleanup(self): """ Clean up resources. """ logger.info("Starting room cleanup") self._cancel_session_end_task() if self._no_participant_timeout_task and not self._no_participant_timeout_task.done(): self._no_participant_timeout_task.cancel() self._no_participant_timeout_task = None self.input_stream_manager.cancel_tasks() if hasattr(self, "transport_event_sender") and self.transport_event_sender: self.transport_event_sender.cleanup() if hasattr(self, "audio_track") and self.audio_track: try: await self.audio_track.cleanup() except Exception as e: logger.error(f"Error cleaning up audio track: {e}") self.audio_track = None if hasattr(self, "agent_audio_track") and self.agent_audio_track: try: await self.agent_audio_track.cleanup() except Exception as e: logger.error(f"Error cleaning up agent audio track: {e}") self.agent_audio_track = None if hasattr(self, "traces_flow_manager") and self.traces_flow_manager: try: self.traces_flow_manager.agent_meeting_end() except Exception as e: logger.error(f"Error ending traces flow manager: {e}") self.traces_flow_manager = None metrics_collector.set_traces_flow_manager(None) self.participants_data.clear() self.recorded_participants.clear() self._track_recordings_kinds_by_participant.clear() self._participant_recording_has_share_at_start.clear() self._participant_joined_events.clear() self.meeting = None self.pipeline = None self.custom_camera_video_track = None self.custom_microphone_audio_track = None self.audio_sinks = None self.on_room_error = None self.on_session_end = None self._job_logger = None self._session_ended = True self._session_id = None self._session_id_collected = False self._non_agent_participant_count = 0 logger.info("Room cleanup completed") async def _collect_session_id(self) -> None: """ Internal method: Collect session ID from room and set it in metrics. """ if self.meeting and not self._session_id_collected: try: session_id = getattr(self.meeting, "session_id", None) if session_id: self._session_id = session_id logger.info(f"Session ID collected: {session_id}") metrics_collector.set_session_id(session_id) metrics_collector.add_participant_metrics( participant_id=self.meeting.local_participant.id, kind="agent", sip_user=False, join_time=time.time(), meta={"name": self.name}, ) self._session_id_collected = True if self.traces_flow_manager: self.traces_flow_manager.set_session_id(session_id) if self._job_logger: self._job_logger.update_context(sessionId=session_id) except Exception as e: logger.error(f"Error collecting session ID: {e}") async def _collect_meeting_attributes(self) -> None: """ Internal method: Collect meeting attributes and initialize telemetry. """ if not self.meeting: logger.error("Meeting not initialized") return try: if hasattr(self.meeting, "get_attributes"): attributes = self.meeting.get_attributes() if attributes: peer_id = getattr(self.meeting, "participant_id", "agent") traces_config = attributes.get("traces", {}) if self.traces_options: if not self.traces_options.enabled: traces_config["enabled"] = False elif self.traces_options.enabled and self.traces_options.export_url: traces_config["enabled"] = True traces_config["pbEndPoint"] = self.traces_options.export_url traces_config["export_headers"] = self.traces_options.export_headers auto_initialize_telemetry_and_logs( room_id=self.meeting_id, peer_id=peer_id, room_attributes=attributes, session_id=self._session_id, sdk_metadata=self.sdk_metadata, custom_traces_config=traces_config, ) if self._job_logger: logs_config = attributes.get("logs", {}) observability_jwt = attributes.get("observability", "") is_logs_enabled = logs_config.get("enabled", False) log_endpoint = logs_config.get("endPoint", "") custom_headers = None if self.logs_options: if not self.logs_options.enabled: is_logs_enabled = False elif self.logs_options.enabled and self.logs_options.export_url: is_logs_enabled = True log_endpoint = self.logs_options.export_url custom_headers = self.logs_options.export_headers if is_logs_enabled and log_endpoint: self._job_logger.set_endpoint(log_endpoint, observability_jwt, custom_headers) logger.debug(f"Log endpoint configured: {log_endpoint}") else: logger.error("No meeting attributes found") else: logger.error("Meeting object does not have 'get_attributes' method") if self._meeting_joined_data and self.traces_flow_manager: start_time = time.perf_counter() agent_joined_attributes = { "roomId": self.meeting_id, "agent_ParticipantId": self.agent_participant_id, "sessionId": self._session_id, "agent_name": self.name, "peerId": self.meeting.local_participant.id, "sdk_metadata": self.sdk_metadata, "start_time": start_time, } self.traces_flow_manager.start_agent_joined_meeting( agent_joined_attributes ) except Exception as e: logger.error(f"Error collecting meeting attributes and creating spans: {e}") async def stop_participants_recording(self): """ Stop recording for all participants. Forward to recording_manager. """ await self.recording_manager.stop_participant_recording(self.meeting.local_participant.id) for participant_id in self.participants_data.keys(): await self.recording_manager.stop_participant_recording(participant_id) async def start_participant_recording(self, id: str): """ Start recording. Forward to recording_manager. """ await self.recording_manager.start_participant_recording(id) async def stop_participant_recording(self, id: str): """ Stop recording. Forward to recording_manager. """ await self.recording_manager.stop_participant_recording(id) async def merge_participant_recordings(self): """ Merge recordings. Forward to recording_manager. """ await self.recording_manager.merge_participant_recordings( session_id=self._session_id, local_participant_id=self.meeting.local_participant.id, participants_data=self.participants_data ) async def stop_and_merge_recordings(self): """ Stop and merge recordings. Forward to recording_manager. """ await self.recording_manager.stop_and_merge_recordings( session_id=self._session_id, local_participant_id=self.meeting.local_participant.id, participants_data=self.participants_data, track_kinds_by_participant=self._track_recordings_kinds_by_participant, stop_participants_recording=self.record_audio is None, )Handles VideoSDK meeting operations and participant management.
Initialize the VideoSDK handler.
Args
meeting_id:str- Unique identifier for the meeting.
auth_token:str | None, optional- Authentication token. Uses environment variable if not provided.
name:str- Display name of the agent in the meeting.
agent_participant_id:str- Participant ID of the agent in the meeting.
pipeline:Pipeline- Audio/video processing pipeline.
loop:AbstractEventLoop- Event loop for async operations.
vision:bool, optional- Whether video processing is enabled. Defaults to False.
recording:bool, optional- Whether recording is enabled. Defaults to False.
custom_camera_video_track- Custom video track for camera input.
custom_microphone_audio_track- Custom audio track for microphone input.
audio_sinks- List of audio sinks for processing.
background_audio:bool, optional- Whether to use background audio. Defaults to False.
on_room_error:Optional[Callable[[Any], None]], optional- Error callback function.
auto_end_session:bool, optional- Whether to automatically end sessions. Defaults to True.
session_timeout_seconds:Optional[int], optional- Timeout for session auto-end after participants leave.
no_participant_timeout_seconds:Optional[int], optional- Timeout to end session if no participant joins after agent connects.
on_session_end:Optional[Callable[[str], None]], optional- Session end callback function.
signaling_base_url:Optional[str], optional- Custom signaling server URL.
Raises
ValueError- If VIDEOSDK_AUTH_TOKEN is not set in environment or parameters.
Ancestors
- BaseTransportHandler
- abc.ABC
Methods
async def add_audio_listener(self, stream: videosdk.stream.Stream)-
Expand source code
async def add_audio_listener(self, stream: Stream): """ Forward to input_stream_manager. """ await self.input_stream_manager.add_audio_listener(stream)Forward to input_stream_manager.
async def add_video_listener(self, stream: videosdk.stream.Stream)-
Expand source code
async def add_video_listener(self, stream: Stream): """ Forward to input_stream_manager. """ await self.input_stream_manager.add_video_listener(stream)Forward to input_stream_manager.
async def call_transfer(self, transfer_to: str) ‑> None-
Expand source code
async def call_transfer(self,transfer_to: str) -> None: """ Transfer the call to a provided Phone number or SIP endpoint. """ await self.sip_manager.call_transfer(session_id=self._session_id, transfer_to=transfer_to)Transfer the call to a provided Phone number or SIP endpoint.
async def cleanup(self)-
Expand source code
async def cleanup(self): """ Clean up resources. """ logger.info("Starting room cleanup") self._cancel_session_end_task() if self._no_participant_timeout_task and not self._no_participant_timeout_task.done(): self._no_participant_timeout_task.cancel() self._no_participant_timeout_task = None self.input_stream_manager.cancel_tasks() if hasattr(self, "transport_event_sender") and self.transport_event_sender: self.transport_event_sender.cleanup() if hasattr(self, "audio_track") and self.audio_track: try: await self.audio_track.cleanup() except Exception as e: logger.error(f"Error cleaning up audio track: {e}") self.audio_track = None if hasattr(self, "agent_audio_track") and self.agent_audio_track: try: await self.agent_audio_track.cleanup() except Exception as e: logger.error(f"Error cleaning up agent audio track: {e}") self.agent_audio_track = None if hasattr(self, "traces_flow_manager") and self.traces_flow_manager: try: self.traces_flow_manager.agent_meeting_end() except Exception as e: logger.error(f"Error ending traces flow manager: {e}") self.traces_flow_manager = None metrics_collector.set_traces_flow_manager(None) self.participants_data.clear() self.recorded_participants.clear() self._track_recordings_kinds_by_participant.clear() self._participant_recording_has_share_at_start.clear() self._participant_joined_events.clear() self.meeting = None self.pipeline = None self.custom_camera_video_track = None self.custom_microphone_audio_track = None self.audio_sinks = None self.on_room_error = None self.on_session_end = None self._job_logger = None self._session_ended = True self._session_id = None self._session_id_collected = False self._non_agent_participant_count = 0 logger.info("Room cleanup completed")Clean up resources.
async def connect(self)-
Expand source code
async def connect(self): """ Connect to the VideoSDK meeting. """ self.init_meeting() await self.join()Connect to the VideoSDK meeting.
async def disconnect(self)-
Expand source code
async def disconnect(self): """ Disconnect from the VideoSDK meeting. """ await self.leave()Disconnect from the VideoSDK meeting.
def fetch_call_info(self, session_id: str)-
Expand source code
def fetch_call_info(self,session_id: str): """ Fetch SIP call information. Forward to sip_manager. """ return self.sip_manager.fetch_call_info(session_id=session_id)Fetch SIP call information. Forward to sip_manager.
async def fetch_file(self, url)-
Expand source code
async def fetch_file(self, url): """ Fetch a file from a URL. Args: url (str): URL of the file to fetch. Returns: Base64-encoded file data. """ return self.meeting.fetch_base64(url, self.auth_token)Fetch a file from a URL.
Args
url:str- URL of the file to fetch.
Returns
Base64-encoded file data.
async def force_end_session(self, reason: str = 'manual_hangup') ‑> None-
Expand source code
async def force_end_session(self, reason: str = "manual_hangup") -> None: """ Public helper: forcefully end the session, bypassing participant checks. Args: reason: Reason string to propagate to session end callbacks. """ await self._end_session(reason)Public helper: forcefully end the session, bypassing participant checks.
Args
reason- Reason string to propagate to session end callbacks.
def init_meeting(self)-
Expand source code
def init_meeting(self): """ Initialize the VideoSDK meeting instance. """ self._left: bool = False self.sdk_metadata = { "sdk": "agents", "sdk_version": "1.0.0" } self.videosdk_meeting_meta_data= { "agent_id": self.agent_id, "agent_name": self.name, "is_videosdk_agent": True, } self.meeting = VideoSDK.init_meeting( **self.meeting_config, sdk_metadata=self.sdk_metadata, meta_data=self.videosdk_meeting_meta_data, ) self.meeting.add_event_listener( MeetingHandler( on_meeting_joined=self.on_meeting_joined, on_meeting_left=self.on_meeting_left, on_participant_joined=self.on_participant_joined, on_participant_left=self.on_participant_left, on_error=self.on_error, on_agent_joined=self.on_agent_joined, on_agent_left=self.on_agent_left, ) )Initialize the VideoSDK meeting instance.
async def join(self)-
Expand source code
async def join(self): """ Join the meeting. """ await self.meeting.async_join()Join the meeting.
async def leave(self)-
Expand source code
async def leave(self): """ Leave the meeting and clean up resources. """ if self._left: logger.info("Meeting already left") return logger.info("Leaving meeting and cleaning up resources") self._left = True if self.recording: try: await self.stop_and_merge_recordings() except Exception as e: logger.error(f"Error stopping/merging recordings: {e}") try: if self.meeting: self.meeting.leave() except Exception as e: logger.error(f"Error leaving meeting: {e}") await self.cleanup()Leave the meeting and clean up resources.
async def merge_participant_recordings(self)-
Expand source code
async def merge_participant_recordings(self): """ Merge recordings. Forward to recording_manager. """ await self.recording_manager.merge_participant_recordings( session_id=self._session_id, local_participant_id=self.meeting.local_participant.id, participants_data=self.participants_data )Merge recordings. Forward to recording_manager.
def on_agent_joined(self, agent: videosdk.agent.Agent)-
Expand source code
def on_agent_joined(self, agent: TransportAgent): """ Handle remote agent join event. Args: agent (TransportAgent): The agent that joined. """ logger.info(f"Remote agent joined: {agent.display_name} ({agent.id})")Handle remote agent join event.
Args
agent:TransportAgent- The agent that joined.
def on_agent_left(self, agent: videosdk.agent.Agent)-
Expand source code
def on_agent_left(self, agent: TransportAgent): """ Handle remote agent leave event. Args: agent (TransportAgent): The agent that left. """ logger.info(f"Remote agent left: {agent.display_name} ({agent.id})")Handle remote agent leave event.
Args
agent:TransportAgent- The agent that left.
def on_error(self, data)-
Expand source code
def on_error(self, data): """ Handle room errors. This method is called when VideoSDK encounters an error and forwards it to the configured error callback if provided. Args: data: Error data from VideoSDK. """ if self.on_room_error: self.on_room_error(data) asyncio.create_task(self._end_session("error_in_meeting"))Handle room errors.
This method is called when VideoSDK encounters an error and forwards it to the configured error callback if provided.
Args
data- Error data from VideoSDK.
def on_meeting_joined(self, data)-
Expand source code
def on_meeting_joined(self, data): """ Handle meeting join event. Args: data: Meeting join event data from VideoSDK. """ logger.info(f"Agent joined the meeting") self._meeting_joined_data = data # Notify JobContext that meeting has been joined from ..job import get_current_job_context job_ctx = get_current_job_context() if job_ctx: job_ctx.notify_meeting_joined() asyncio.create_task(self._collect_session_id()) asyncio.create_task(self._collect_meeting_attributes()) if self.no_participant_timeout_seconds is not None and self.no_participant_timeout_seconds > 0: self._no_participant_timeout_task = asyncio.create_task( self._no_participant_timeout_handler() ) logger.info( f"No-participant timeout started: {self.no_participant_timeout_seconds}s" ) if self.recording: self.recorded_participants.add(self.meeting.local_participant.id) asyncio.create_task( self._start_base_recording_for_participant( self.meeting.local_participant.id, self.meeting.local_participant, ) )Handle meeting join event.
Args
data- Meeting join event data from VideoSDK.
def on_meeting_left(self, data)-
Expand source code
def on_meeting_left(self, data): """ Handle meeting leave event. Args: data: Meeting leave event data from VideoSDK. """ logger.info(f"Meeting Left: {data}") if hasattr(self, "participants_data") and self.participants_data: self.participants_data.clear() asyncio.create_task(self._end_session("meeting_left"))Handle meeting leave event.
Args
data- Meeting leave event data from VideoSDK.
def on_participant_joined(self, participant: videosdk.participant.Participant)-
Expand source code
def on_participant_joined(self, participant: Participant): """ Handle participant join event. Args: participant (Participant): The participant that joined. """ peer_name = participant.display_name self.participants_data[participant.id] = {"name": peer_name} self.participants_data[participant.id]["sipUser"] = ( participant.meta_data.get("sipUser", False) if participant.meta_data else False ) self.participants_data[participant.id]["sipCallType"] = ( participant.meta_data.get("callType", False) if participant.meta_data else False ) self.participants_data[participant.id]["enable_agent_events"] = ( participant.meta_data.get("enableAgentEvents", False) if participant.meta_data else False ) if self.participants_data[participant.id]["enable_agent_events"] and self.transport_event_sender is None: logger.info("Initializing transport_event_sender because participant has enableAgentEvents=True") self.transport_event_sender = TransportEventSender(self) logger.info(f"Participant joined: {peer_name}") if self._is_agent_participant(participant): logger.info(f"Skipping agent/avatar participant in on_participant_joined: {participant.id}") return sip_user_flag = self.participants_data[participant.id]["sipUser"] metrics_collector.add_participant_metrics( participant_id=participant.id, kind="user", sip_user=sip_user_flag, join_time=time.time(), meta=self.participants_data[participant.id], ) if self.recording and len(self.participants_data) == 1: self.recorded_participants.add(participant.id) asyncio.create_task( self._start_base_recording_for_participant(participant.id, participant) ) if participant.id in self._participant_joined_events: self._participant_joined_events[participant.id].set() if not self._first_participant_event.is_set(): self._first_participant_event.set() if self._no_participant_timeout_task and not self._no_participant_timeout_task.done(): self._no_participant_timeout_task.cancel() self._no_participant_timeout_task = None logger.info("No-participant timeout cancelled: participant joined") # Update participant count and cancel session end if participants are present self._update_non_agent_participant_count() if self._non_agent_participant_count > 0: self._cancel_session_end_task() def on_stream_enabled(stream: Stream): """ Internal method: Handle stream enabled event. """ if stream.kind == "audio": global_event_emitter.emit( "AUDIO_STREAM_ENABLED", {"stream": stream, "participant": participant}, ) logger.info(f"Audio stream enabled for participant: {peer_name}") try: task = asyncio.create_task(self.add_audio_listener(stream)) self.audio_listener_tasks[stream.id] = task except Exception as e: logger.error(f"Error creating audio listener task: {e}") if stream.kind == "share" and self.recording: asyncio.create_task(self._maybe_start_screen_track_recording(participant)) if stream.kind in ("video", "share") and self.vision: logger.info(f"{stream.kind} stream enabled for participant: {peer_name}") self.video_listener_tasks[stream.id] = asyncio.create_task( self.add_video_listener(stream) ) def on_stream_disabled(stream: Stream): """ Internal method: Handle stream disabled event. """ if stream.kind == "audio": audio_task = self.audio_listener_tasks.get(stream.id) if audio_task is not None: audio_task.cancel() del self.audio_listener_tasks[stream.id] if stream.kind in ("video", "share"): video_task = self.video_listener_tasks.get(stream.id) if video_task is not None: video_task.cancel() del self.video_listener_tasks[stream.id] if participant.id != self.meeting.local_participant.id: participant.add_event_listener( ParticipantHandler( participant_id=participant.id, on_stream_enabled=on_stream_enabled, on_stream_disabled=on_stream_disabled, ) )Handle participant join event.
Args
participant:Participant- The participant that joined.
def on_participant_left(self, participant: videosdk.participant.Participant)-
Expand source code
def on_participant_left(self, participant: Participant): """ Handle participant leave event. Args: participant (Participant): The participant that left. """ logger.info(f"Participant left: {participant.display_name}") if participant.id in self.audio_listener_tasks: try: self.audio_listener_tasks[participant.id].cancel() del self.audio_listener_tasks[participant.id] except Exception as e: logger.error( f"Error cancelling audio listener task for participant {participant.id}: {e}" ) if participant.id in self.video_listener_tasks: try: self.video_listener_tasks[participant.id].cancel() del self.video_listener_tasks[participant.id] except Exception as e: logger.error( f"Error cancelling video listener task for participant {participant.id}: {e}" ) global_event_emitter.emit("PARTICIPANT_LEFT", {"participant": participant}) # Update participant count and check if session should end self._update_non_agent_participant_count() if participant.id in self.recorded_participants: logger.info( f"Recorded participant {participant.display_name} left, ending session" ) asyncio.create_task(self._end_session("recorded_participant_left")) return if self._non_agent_participant_count == 0 and self.auto_end_session: if ( self.session_timeout_seconds is not None and self.session_timeout_seconds > 0 ): logger.info( f"All non-agent participants have left, scheduling session end in {self.session_timeout_seconds} seconds" ) self._schedule_session_end(self.session_timeout_seconds) else: logger.info( "All non-agent participants have left, ending session immediately" ) asyncio.create_task(self._end_session("all_participants_left"))Handle participant leave event.
Args
participant:Participant- The participant that left.
async def publish_to_pubsub(self, pubsub_config: videosdk.utils.PubSubPublishConfig)-
Expand source code
async def publish_to_pubsub(self, pubsub_config: PubSubPublishConfig): """ Publish message to pubsub. Args: pubsub_config (PubSubPublishConfig): Configuration for pubsub publishing. """ if self.meeting: await self.meeting.pubsub.publish(pubsub_config)Publish message to pubsub.
Args
pubsub_config:PubSubPublishConfig- Configuration for pubsub publishing.
def setup_session_end_callback(self, callback)-
Expand source code
def setup_session_end_callback(self, callback): """ Set up the session end callback. """ existing_callback = self.on_session_end if existing_callback: def chained_callback(reason: str): try: existing_callback(reason) except Exception as e: logger.error(f"Error in existing session end callback: {e}") try: callback(reason) except Exception as e: logger.error(f"Error in new session end callback: {e}") self.on_session_end = chained_callback logger.debug("Session end callback chained with existing callback") else: self.on_session_end = callback logger.debug("Session end callback set up")Set up the session end callback.
async def start_participant_recording(self, id: str)-
Expand source code
async def start_participant_recording(self, id: str): """ Start recording. Forward to recording_manager. """ await self.recording_manager.start_participant_recording(id)Start recording. Forward to recording_manager.
async def stop_and_merge_recordings(self)-
Expand source code
async def stop_and_merge_recordings(self): """ Stop and merge recordings. Forward to recording_manager. """ await self.recording_manager.stop_and_merge_recordings( session_id=self._session_id, local_participant_id=self.meeting.local_participant.id, participants_data=self.participants_data, track_kinds_by_participant=self._track_recordings_kinds_by_participant, stop_participants_recording=self.record_audio is None, )Stop and merge recordings. Forward to recording_manager.
async def stop_participant_recording(self, id: str)-
Expand source code
async def stop_participant_recording(self, id: str): """ Stop recording. Forward to recording_manager. """ await self.recording_manager.stop_participant_recording(id)Stop recording. Forward to recording_manager.
async def stop_participants_recording(self)-
Expand source code
async def stop_participants_recording(self): """ Stop recording for all participants. Forward to recording_manager. """ await self.recording_manager.stop_participant_recording(self.meeting.local_participant.id) for participant_id in self.participants_data.keys(): await self.recording_manager.stop_participant_recording(participant_id)Stop recording for all participants. Forward to recording_manager.
async def subscribe_to_pubsub(self, pubsub_config: videosdk.utils.PubSubSubscribeConfig)-
Expand source code
async def subscribe_to_pubsub(self, pubsub_config: PubSubSubscribeConfig): """ Subscribe to pubsub messages. Args: pubsub_config (PubSubSubscribeConfig): Configuration for pubsub subscription. Returns: List of existing messages from the subscription. """ old_messages = await self.meeting.pubsub.subscribe(pubsub_config) return old_messagesSubscribe to pubsub messages.
Args
pubsub_config:PubSubSubscribeConfig- Configuration for pubsub subscription.
Returns
List of existing messages from the subscription.
def transfer_call(self, call_id: str, transfer_to: str)-
Expand source code
def transfer_call(self,call_id: str, transfer_to: str): """ Transfer the call. Forward to sip_manager. """ return self.sip_manager.transfer_call(call_id=call_id, transfer_to=transfer_to)Transfer the call. Forward to sip_manager.
async def upload_file(self, base64_data, file_name)-
Expand source code
async def upload_file(self, base64_data, file_name): """ Upload a file to the temporary storage. Args: base64_data: Base64-encoded file data. file_name (str): Name of the file to upload. Returns: Upload response from VideoSDK API. """ return self.meeting.upload_base64(base64_data, self.auth_token, file_name)Upload a file to the temporary storage.
Args
base64_data- Base64-encoded file data.
file_name:str- Name of the file to upload.
Returns
Upload response from VideoSDK API.
async def wait_for_participant(self, participant_id: str | None = None) ‑> str-
Expand source code
async def wait_for_participant(self, participant_id: str | None = None) -> str: """ Wait for a specific participant to join. """ if participant_id: if participant_id in self.participants_data: return participant_id if participant_id not in self._participant_joined_events: self._participant_joined_events[participant_id] = asyncio.Event() await self._participant_joined_events[participant_id].wait() if self._session_ended: return None return participant_id else: if self.participants_data: return next(iter(self.participants_data.keys())) await self._first_participant_event.wait() if self._session_ended or not self.participants_data: return None return next(iter(self.participants_data.keys()))Wait for a specific participant to join.