Module agents.agent_session

Classes

class AgentSession (agent: Agent,
pipeline: Pipeline,
wake_up: Optional[int] = None,
background_audio: Optional[BackgroundAudioHandlerConfig] = None,
dtmf_handler: Optional[DTMFHandler] = None,
voice_mail_detector: Optional[VoiceMailDetector] = None)
Expand source code
class AgentSession(EventEmitter[Literal["user_state_changed", "agent_state_changed", "warm_transfer"]]):
    """
    Manages an agent session with its associated conversation flow and pipeline.
    """

    def __init__(
        self,
        agent: Agent,
        pipeline: Pipeline,
        wake_up: Optional[int] = None,
        background_audio: Optional[BackgroundAudioHandlerConfig] = None,
        dtmf_handler: Optional[DTMFHandler] = None,
        voice_mail_detector: Optional[VoiceMailDetector] = None,
    ) -> None:
        """
        Initialize an agent session.

        Args:
            agent: Instance of an Agent class that handles the core logic
            pipeline: Pipeline instance to process the agent's operations
            wake_up: Time in seconds after which to trigger wake-up callback if no speech detected
            background_audio: Configuration for background audio (optional)
            dtmf_handler: DTMF handler for phone number input (optional)
            voice_mail_detector: Voicemail detector (optional)
        """
        super().__init__()
        self.agent = agent
        self.pipeline = pipeline
        self.agent.session = self
        self._accept_user_input: bool = False
        self.wake_up = wake_up
        self.on_wake_up: Optional[Callable[[], None] | Callable[[], Any]] = None
        self._wake_up_task: Optional[asyncio.Task] = None
        self._wake_up_timer_active = False
        self._closed: bool = False
        self._reply_in_progress: bool = False
        self._user_state: UserState = UserState.IDLE
        self._agent_state: AgentState = AgentState.IDLE
        self.current_utterance: Optional[UtteranceHandle] = None
        self._thinking_audio_player: Optional[BackgroundAudioHandler] = None
        self._background_audio_player: Optional[BackgroundAudioHandler] = None
        self._thinking_was_playing = False
        self._override_thinking: bool = False
        self.background_audio_config = background_audio
        self._is_executing_tool = False
        self._job_context = None
        self.dtmf_handler = dtmf_handler
        self.voice_mail_detector = voice_mail_detector
        self._is_voice_mail_detected = False
        self._warm_transfer_in_progress: bool = False
        self._warm_transfer_task: Optional[asyncio.Task] = None

        # Set agent on pipeline (pipeline handles all internal wiring)
        if hasattr(self.pipeline, 'set_agent'):
            self.pipeline.set_agent(self.agent)
        
        # Setup voicemail detection
        if self.voice_mail_detector:
            if hasattr(self.pipeline, "set_voice_mail_detector"):
                self.pipeline.set_voice_mail_detector(self.voice_mail_detector)
            
            if hasattr(self.pipeline, "on"):
                self.pipeline.on("voicemail_result", self._handle_voicemail_result)

        # Setup wake-up callback
        if hasattr(self.pipeline, 'set_wake_up_callback'):
            self.pipeline.set_wake_up_callback(self._reset_wake_up_timer)


        # Get job context
        try:
            job_ctx = get_current_job_context()
            if job_ctx:
                self._job_context = job_ctx
                job_ctx.add_shutdown_callback(self.close)

        except Exception as e:
            logger.error(f"AgentSession: Error in session initialization: {e}")
            self._job_context = None

    @property
    def is_voicemail_detected(self) -> bool:
        """Returns True if voicemail was detected in this session."""
        return self._is_voicemail_detected

    def _handle_voicemail_result(self, data: dict) -> None:
        """
        Handler for the voicemail_result event from ConversationFlow.
        Updates session state and executes callback if needed.
        """
        is_vm = data.get("is_voicemail", False)
        self._is_voicemail_detected = is_vm
        
        if is_vm:
            logger.info("AgentSession: Voicemail confirmed. Executing callback.")
            if self.voice_mail_detector.callback:
                asyncio.create_task(self._safe_execute_vmd_callback())

    async def _safe_execute_vmd_callback(self) -> None:
        try:

            if self.voice_mail_detector.callback:
                await self.voice_mail_detector.callback()
        except Exception as e:
            logger.error(f"Error executing voicemail callback: {e}")


    def _start_wake_up_timer(self) -> None:
        if self.wake_up is not None and self.on_wake_up is not None:
            if self._wake_up_task and not self._wake_up_task.done():
                self._wake_up_task.cancel()
            self._wake_up_timer_active = True
            self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop())

    def _reset_wake_up_timer(self) -> None:
        if self.wake_up is not None and self.on_wake_up is not None:
            if self._reply_in_progress:
                return
            if self._wake_up_task and not self._wake_up_task.done():
                self._wake_up_task.cancel()
            self._wake_up_timer_active = True
            self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop())

    def _pause_wake_up_timer(self) -> None:
        self._wake_up_timer_active = False
        if self._wake_up_task and not self._wake_up_task.done():
            self._wake_up_task.cancel()
    
    def _cancel_wake_up_timer(self) -> None:
        if self._wake_up_task and not self._wake_up_task.done():
            self._wake_up_task.cancel()
        self._wake_up_timer_active = False
    
    async def _wake_up_timer_loop(self) -> None:
        try:
            await asyncio.sleep(self.wake_up)
            if (
                self._wake_up_timer_active
                and self.on_wake_up
                and not self._reply_in_progress
                and self._user_state != UserState.SPEAKING
            ):
                if asyncio.iscoroutinefunction(self.on_wake_up):
                    asyncio.create_task(self.on_wake_up())
                else:
                    self.on_wake_up()
        except asyncio.CancelledError:
            pass

    def _emit_user_state(self, state: UserState, data: dict | None = None) -> None:
        if state != self._user_state:
            self._user_state = state
            payload = {"state": state.value, **(data or {})}
            self.emit("user_state_changed", payload)

    def _emit_agent_state(self, state: AgentState, data: dict | None = None) -> None:
        if state != self._agent_state:
            self._agent_state = state
            payload = {"state": state.value, **(data or {})}
            self.emit("agent_state_changed", payload)
            global_event_emitter.emit("AGENT_STATE_CHANGED", {"state": state.value})

    @property
    def user_state(self) -> UserState:
        return self._user_state

    @property
    def agent_state(self) -> AgentState:
        return self._agent_state

    @property
    def is_background_audio_enabled(self) -> bool:
        """Check if background audio is enabled in the pipeline"""
        audio_track = self._get_audio_track()
        return hasattr(audio_track, 'add_background_bytes')

    async def start(
        self,
        wait_for_participant: bool = False,
        run_until_shutdown: bool = False,
        *,
        observability: Optional[ObservabilityOptions] = None,
        **kwargs: Any
    ) -> None:
        """
        Start the agent session.
        This will:
        1. Initialize the agent (including MCP tools if configured)
        2. Call the agent's on_enter hook
        3. Start the pipeline processing
        4. Start wake-up timer if configured (but only if callback is set)
        5. Optionally handle full lifecycle management (connect, wait, shutdown)

        Args:
            wait_for_participant: If True, wait for a participant to join before starting
            run_until_shutdown: If True, manage the full lifecycle including connection,
                               waiting for shutdown signals, and cleanup. This is a convenience
                               that internally calls ctx.run_until_shutdown() with this session.
            observability: Grouped recording/traces/metrics/logs config. When passed, it
                           is applied to ``ctx.room_options.observability`` before the
                           room connects, taking precedence over any value set on
                           ``RoomOptions``. Requires ``run_until_shutdown=True``; a
                           warning is logged otherwise.
            **kwargs: Additional arguments to pass to the pipeline start method

        Examples:
            Simple start (manual lifecycle management):
            ```python
            await session.start()
            ```

            Full lifecycle management (recommended):
            ```python
            await session.start(wait_for_participant=True, run_until_shutdown=True)
            ```

            Inline observability:
            ```python
            await session.start(
                wait_for_participant=True,
                run_until_shutdown=True,
                observability=ObservabilityOptions(
                    recording=RecordingOptions(video=True),
                ),
            )
            ```
        """
        if observability is not None:
            ctx = None
            try:
                ctx = get_current_job_context()
            except Exception as e:
                logger.warning(f"Could not resolve JobContext for observability overrides: {e}")
            if ctx is not None and ctx.room_options is not None:
                if not run_until_shutdown:
                    logger.warning(
                        "observability= passed to session.start() but run_until_shutdown=False; "
                        "overrides only apply if ctx.connect() has not yet run."
                    )
                ctx.room_options.observability = observability
            else:
                logger.warning(
                    "observability= passed to session.start() but no active JobContext "
                    "(or no RoomOptions) was found; ignoring."
                )

        if run_until_shutdown:
            try:
                ctx = get_current_job_context()
                if ctx:
                    logger.info("Starting session with full lifecycle management")
                    await ctx.run_until_shutdown(
                        session=self,
                        wait_for_participant=wait_for_participant
                    )
                    return
                else:
                    logger.warning(
                        "run_until_shutdown=True requires a JobContext, "
                        "falling back to normal start()"
                    )
            except Exception as e:
                logger.warning(
                    f"Failed to get JobContext for run_until_shutdown: {e}, "
                    "falling back to normal start()"
                )
        
        self._emit_agent_state(AgentState.STARTING)
        
        if self.agent._mcp_servers:
            await self.agent.initialize_mcp()

        if self.dtmf_handler:
            await self.dtmf_handler.start()

        # Configure metrics with session info
        metrics_collector.set_system_instructions(self.agent.instructions)

        # Set provider info based on pipeline components

        if not self.pipeline.config.is_realtime:
            if self.pipeline.stt:
                p_class, p_model = self._get_provider_info(self.pipeline.stt, 'stt')
                metrics_collector.set_provider_info("stt", p_class, p_model)
            if self.pipeline.llm:
                p_class, p_model = self._get_provider_info(self.pipeline.llm, 'llm')
                metrics_collector.set_provider_info("llm", p_class, p_model)
            if self.pipeline.tts:
                p_class, p_model = self._get_provider_info(self.pipeline.tts, 'tts')
                metrics_collector.set_provider_info("tts", p_class, p_model)
            if hasattr(self.pipeline, 'vad') and self.pipeline.vad:
                p_class, p_model = self._get_provider_info(self.pipeline.vad, 'vad')
                metrics_collector.set_provider_info("vad", p_class, p_model)
            if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector:
                p_class, p_model = self._get_provider_info(self.pipeline.turn_detector, 'eou')
                metrics_collector.set_provider_info("eou", p_class, p_model)
        else:
            if self.pipeline._realtime_model:
                metrics_collector.set_provider_info("realtime", format_provider_class(self.pipeline._realtime_model), getattr(self.pipeline._realtime_model, 'model', ''))
            if self.pipeline.stt:
                p_class, p_model = self._get_provider_info(self.pipeline.stt, 'stt')
                metrics_collector.set_provider_info("stt", p_class, p_model)
            if self.pipeline.tts:
                p_class, p_model = self._get_provider_info(self.pipeline.tts, 'tts')
                metrics_collector.set_provider_info("tts", p_class, p_model)

        # Traces flow manager setup
        traces_flow_manager = metrics_collector.traces_flow_manager
        if traces_flow_manager:
            config_attributes = {
                "system_instructions": self.agent.instructions,
                "function_tools": [
                    get_tool_info(tool).name
                    for tool in (
                        [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools]
                        if self.agent.mcp_manager else self.agent.tools
                    )
                ] if self.agent.tools else [],
                "mcp_tools": [
                    tool._tool_info.name
                    for tool in self.agent.mcp_manager.tools
                ] if self.agent.mcp_manager else [],
                "pipeline": self.pipeline.__class__.__name__,
                "pipeline_mode": self.pipeline.config.pipeline_mode.value,
                "transport_mode": metrics_collector.transport_mode
            }
            start_time = time.perf_counter()
            config_attributes["start_time"] = start_time
            await traces_flow_manager.start_agent_session_config(config_attributes)
            await traces_flow_manager.start_agent_session({"start_time": start_time})

        if hasattr(self.pipeline, 'set_agent'):
            self.pipeline.set_agent(self.agent)


        await self.pipeline.start()

        if hasattr(self.agent, 'a2a'):
            self.agent.a2a._attach_deferred_listeners()

        if self._should_delay_for_sip_user():
            logger.info("SIP user detected, waiting for audio stream to be enabled before calling on_enter")
            audio_stream_enabled = asyncio.Event()

            def on_audio_stream_enabled(data):
                stream = data.get("stream")
                participant = data.get("participant")
                if stream and stream.kind == "audio" and participant and participant.meta_data.get("sipUser"):
                    logger.info(f"SIP user audio stream enabled for participant {participant.id}")
                    audio_stream_enabled.set()

            global_event_emitter.on("AUDIO_STREAM_ENABLED", on_audio_stream_enabled)

            async def wait_and_start():
                try:
                    await audio_stream_enabled.wait()
                    logger.info("SIP user audio stream enabled, proceeding with on_enter")
                    await self.agent.on_enter()
                    self._accept_user_input = True
                    global_event_emitter.emit("AGENT_STARTED", {"session": self})
                    if self.on_wake_up is not None:
                        self._start_wake_up_timer()
                    self._emit_agent_state(AgentState.IDLE)
                except Exception as e:
                    logger.error(f"Error in wait_and_start: {e}")
                finally:
                    global_event_emitter.off("AUDIO_STREAM_ENABLED", on_audio_stream_enabled)

            asyncio.create_task(wait_and_start())
            return 

        await self.agent.on_enter()
        self._accept_user_input = True

        global_event_emitter.emit("AGENT_STARTED", {"session": self})
        if self.on_wake_up is not None:
            self._start_wake_up_timer()
        self._emit_agent_state(AgentState.IDLE)
    
    def _get_provider_info(self, component, comp_name):
        configs = self.pipeline.get_component_configs() if hasattr(self.pipeline, 'get_component_configs') else {}
        if not component:
            return "", ""
        default_model = configs.get(comp_name, {}).get('model', '')
        if hasattr(component, 'active_provider') and component.active_provider is not None:
            provider_class = format_provider_class(component.active_provider)
            model = getattr(component.active_provider, 'model', getattr(component.active_provider, 'model_id', getattr(component.active_provider, 'speech_model', getattr(component.active_provider, 'voice_id', getattr(component.active_provider, 'voice', getattr(component.active_provider, 'speaker', default_model))))))
        else:
            provider_class = format_provider_class(component)
            model = getattr(component, 'model', getattr(component, 'model_id', getattr(component, 'speech_model', getattr(component, 'voice_id', getattr(component, 'voice', getattr(component, 'speaker', default_model))))))
        return provider_class, str(model)

    async def say(
        self,
        message: str,
        interruptible: bool = True,
        audio_data: bytes | bytearray | Iterable[bytes] | AsyncIterator[bytes] | None = None,
        add_to_chat_context: bool = True,
    ) -> UtteranceHandle:
        """
        Send an initial message to the agent and return a handle to track it.
        When called from inside a function tool (_is_executing_tool), the current
        turn's utterance is not interrupted or replaced, so the LLM stream can
        continue after the tool returns.

        Args:
            message: The text being spoken. Always used for transcript/chat
                context (unless ``add_to_chat_context=False``).
            interruptible: If False, ``handle.interrupt()`` will raise unless
                forced.
            audio_data: Optional pre-synthesized PCM bytes (int16, matching
                the agent audio track sample rate). When provided, the TTS
                provider is bypassed and the bytes are streamed straight to
                the audio track. Accepts a single ``bytes`` blob, an
                iterable of ``bytes`` chunks, or an async iterator of
                ``bytes`` chunks. Use ``load_audio_file`` for WAV files or
                :class:`TTSAudioCache` to cache TTS output by text.
            add_to_chat_context: When False, the message is spoken but NOT
                added to the agent's chat history (useful for transient
                hold messages inside function tools).
        """
        handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}", interruptible=interruptible)
        self._accept_user_input = True
        if not self._is_executing_tool:
            if self.current_utterance and not self.current_utterance.done():
                self.current_utterance.interrupt()
            self.current_utterance = handle

        traces_flow_manager = metrics_collector.traces_flow_manager
        if traces_flow_manager:
            traces_flow_manager.agent_say_called(message)

        if add_to_chat_context:
            self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message)

        if hasattr(self.pipeline, 'send_message'):
            await self.pipeline.send_message(message, handle=handle, audio_data=audio_data)

        return handle
    
    async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) -> None:
        """Play background audio on demand.

        override_thinking=True  -> thinking audio is allowed to layer over the background
                                   music during LLM generation (stops on first TTS byte).
        override_thinking=False -> background music is exclusive; thinking audio is
                                   suppressed for as long as background is playing.
        """
        self._override_thinking = override_thinking
        if not override_thinking and self._thinking_audio_player and self._thinking_audio_player.is_playing:
            await self.stop_thinking_audio()
            self._thinking_was_playing = True

        audio_track = self._get_audio_track()
        if not hasattr(audio_track, 'add_background_bytes'):
            logger.warning(
                "Cannot play background audio. This feature requires the mixing audio track. "
                "Enable it by setting `background_audio=True` in RoomOptions."
            )
            return

        if audio_track:
            self._background_audio_player = BackgroundAudioHandler(config, audio_track)
            
            await self._background_audio_player.start()
            # Track background audio start for metrics
            metrics_collector.on_background_audio_start(
                file_path=config.file_path,
                looping=config.looping
            )


    async def stop_background_audio(self) -> None:
        """Stop background audio on demand"""
        if self._background_audio_player:
            await self._background_audio_player.stop()
            self._background_audio_player = None
            # Track background audio stop for metrics
            metrics_collector.on_background_audio_stop()

        self._override_thinking = False

        if self._thinking_was_playing:
            await self.start_thinking_audio()
            self._thinking_was_playing = False

    def _get_audio_track(self):
        """Get audio track from pipeline"""
        if self.pipeline is None:
            return None
        if self.pipeline.config.is_realtime:
            model = getattr(self.pipeline, '_realtime_model', None)
            if model and hasattr(model, 'audio_track'):
                return model.audio_track
        if self.pipeline.tts and hasattr(self.pipeline.tts, 'audio_track'):
            return self.pipeline.tts.audio_track
        return None
    
    async def start_thinking_audio(self):
        """Start thinking audio"""
        if (
            self._background_audio_player
            and self._background_audio_player.is_playing
            and not self._override_thinking
        ):
            return
        if self._thinking_audio_player and self._thinking_audio_player.is_playing:
            return

        audio_track = self._get_audio_track()
        if not hasattr(audio_track, 'add_background_bytes'):
            logger.warning(
                "Cannot play 'thinking' audio. This feature requires the mixing audio track. "
                "Enable it by setting `background_audio=True` in RoomOptions."
            )
            return

        if self.agent._thinking_background_config and audio_track:
            self._thinking_audio_player = BackgroundAudioHandler(self.agent._thinking_background_config, audio_track)
            await self._thinking_audio_player.start()
            # Track thinking audio start for metrics
            metrics_collector.on_thinking_audio_start(
                file_path=self.agent._thinking_background_config.file_path,
                looping=self.agent._thinking_background_config.looping
            )

    async def stop_thinking_audio(self):
        """Stop thinking audio"""
        if self._thinking_audio_player:
            await self._thinking_audio_player.stop()
            self._thinking_audio_player = None
            # Track thinking audio stop for metrics
            metrics_collector.on_thinking_audio_stop()
    

    async def reply(self, instructions: str, wait_for_playback: bool = True, frames: list[av.VideoFrame] | None = None, interruptible: bool = True) -> UtteranceHandle:
        """
        Generate a response from agent using instructions and current chat context.
        
        This method is safe to call from function tools - it will automatically
        detect re-entrant calls and schedule them as background tasks.
        
        Args:
            instructions: Instructions to add to chat context
            wait_for_playback: If True, wait for playback to complete
            frames: Optional list of VideoFrame objects to include in the reply
            
        Returns:
            UtteranceHandle: A handle to track the utterance lifecycle
        """
        self._accept_user_input = True

        if self._reply_in_progress:
            if self.current_utterance:
                return self.current_utterance
            handle = UtteranceHandle(utterance_id="placeholder", interruptible=interruptible)
            handle._mark_done()
            return handle
        
        handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}", interruptible=interruptible)
        self.current_utterance = handle

        if self._is_executing_tool:
            asyncio.create_task(
                self._internal_blocking_reply(instructions, wait_for_playback, handle, frames)
            )
            return handle
        else:
            await self._internal_blocking_reply(instructions, wait_for_playback, handle, frames)
            return handle

    async def _internal_blocking_reply(self, instructions: str, wait_for_playback: bool, handle: UtteranceHandle, frames: list[av.VideoFrame] | None = None) -> None:
        """
        The original, blocking logic of the reply method.
        """
        if not instructions:
            handle._mark_done()
            return
        self._reply_in_progress = True
        self._pause_wake_up_timer()
        
        try:
            # Call pipeline's reply_with_context
            if hasattr(self.pipeline, 'reply_with_context'):
                await self.pipeline.reply_with_context(instructions, wait_for_playback, handle=handle, frames=frames)

            if wait_for_playback:
                await handle

        finally:
            self._reply_in_progress = False
            if not handle.done():
                handle._mark_done() 

    def interrupt(self, *, force: bool = False) -> None:
        """
        Interrupt the agent's current speech.
        """
        if self.current_utterance and not self.current_utterance.interrupted:
            try:
                self.current_utterance.interrupt(force=force) 
            except RuntimeError as e:
                logger.warning(f"Could not interrupt utterance: {e}")
                return
        
        if hasattr(self.pipeline, 'interrupt'):
            self.pipeline.interrupt()

    def get_context_history(
        self,
        *,
        include_function_calls: bool = False,
        include_system_messages: bool = False,
    ) -> list[dict]:
        """
        Get the current chat context history (role/content items).

        Users can access the conversation history whenever they want, without needing
        any transcript hooks.
        """
        if not self.agent or not self.agent.chat_context:
            return []

        context_copy = self.agent.chat_context.copy(
            tools=None if include_function_calls else [],
            exclude_system_messages=not include_system_messages,
        )
        return context_copy.to_dict()["items"]

    async def close(self) -> None:
        """
        Close the agent session.
        """
        logger.info("Closing agent session")
        if self._closed:
            logger.info("Agent session already closed")
            return
        self._closed = True
        self._emit_agent_state(AgentState.CLOSING)

        metrics_collector.finalize_session()
        traces_flow_manager = metrics_collector.traces_flow_manager
        if traces_flow_manager:
            start_time = time.perf_counter()
            await traces_flow_manager.start_agent_session_closed({"start_time": start_time})
            traces_flow_manager.end_agent_session_closed()

        self._cancel_wake_up_timer()

        logger.info("Cleaning up agent session")
        try:
            await self.agent.on_exit()
        except Exception as e:
            logger.error(f"Error in agent.on_exit(): {e}")
        
        if self._thinking_audio_player:
            await self._thinking_audio_player.stop()

        if self._background_audio_player:
            await self._background_audio_player.stop()

        try:
            await self.pipeline.cleanup()
        except Exception as e:
            logger.error(f"Error cleaning up pipeline: {e}")
        
        try:
            await self.agent.cleanup()
        except Exception as e:
            logger.error(f"Error cleaning up agent: {e}")
        
        self.agent = None
        self.pipeline = None
        self.on_wake_up = None
        self._wake_up_task = None
        logger.info("Agent session cleaned up")

    async def leave(self) -> None:
        """
        Leave the agent session.
        """
        self._emit_agent_state(AgentState.CLOSING)
        await self.pipeline.leave()

    async def hangup(self, reason: str = "manual_hangup") -> None:
        """
        Hang up the session, leaving the room immediately if possible.
        """
        job_ctx = self._job_context
        if not job_ctx:
            try:
                job_ctx = get_current_job_context()
            except Exception:
                job_ctx = None

        room = getattr(job_ctx, "room", None) if job_ctx else None
        if room and hasattr(room, "force_end_session"):
            try:
                await room.force_end_session(reason)
                return
            except Exception as exc:
                logger.error(f"Error forcing room to end session: {exc}")

        await self.close()
     
    async def call_transfer(self,token: str, transfer_to: str) -> None:
        """ Transfer the call to a provided Phone number or SIP endpoint.
        Args:
            token: VideoSDK auth token.
            transfer_to: Phone number or SIP endpoint to transfer the call to.
        """
        job_ctx = self._job_context
        if not job_ctx:
            try:
                job_ctx = get_current_job_context()
            except Exception:
                job_ctx = None

        room = getattr(job_ctx, "room", None) if job_ctx else None
        if room and hasattr(room, "call_transfer"):
            try:
                await room.call_transfer(token, transfer_to)
                return
            except Exception as exc:
                logger.error(f"Error calling call_transfer: {exc}")

    def _should_delay_for_sip_user(self) -> bool:
        """Check if there are SIP users in the room that need audio stream initialization"""
        job_ctx = self._job_context
        if not job_ctx:
            try:
                job_ctx = get_current_job_context()
            except Exception:
                job_ctx = None
        room = getattr(job_ctx, "room", None) if job_ctx else None
        if room and hasattr(room, "participants_data"):
            participants = room.participants_data
            for participant_info in participants.values():
                # SIP-specific on_enter logic is currently limited to outbound calls.
                if participant_info.get("sipUser") and participant_info.get("sipCallType") == "outbound":
                    return True
        return False
    
    def on_warm_transfer(self, phase: Optional[str] = None, callback: Optional[Callable[..., Any]] = None):
        """Subscribe to warm-transfer phase changes (decorator or imperative).

        ``@session.on_warm_transfer()`` sees every phase; pass a phase name to
        filter to one. Handlers are sync and receive the ``warm_transfer`` event
        payload: ``{"phase": WarmTransferPhase, "data": {...}, "timestamp": float,
        "consultation_room_id": Optional[str]}``.
        """
        def _wrap(handler: Callable[..., Any]) -> Callable[..., Any]:
            if phase is None:
                return self.on("warm_transfer", handler)

            def _filtered(payload):
                pv = payload.get("phase") if isinstance(payload, dict) else None
                if getattr(pv, "value", pv) == phase:
                    handler(payload)

            _filtered.__name__ = f"{handler.__name__}__phase_{phase}"
            return self.on("warm_transfer", _filtered)

        return _wrap if callback is None else _wrap(callback)

    async def warm_transfer(self, config: "WarmTransferConfig") -> "WarmTransferResult":
        """Run a SIP-to-SIP warm transfer to a human supervisor.

        See :mod:`videosdk.agents.warm_transfer` for the config surface and the
        ``@session.on_warm_transfer(...)`` phase hook.

        The transfer runs in its own task and is shielded from the caller of this
        method being cancelled (this is usually invoked from a ``@function_tool``,
        and that task gets cancelled whenever the caller talks over the agent /
        triggers an interruption — which must not abort an in-flight transfer).
        If the awaiting tool is cancelled, the transfer keeps running to
        completion in the background; the result is simply not returned.
        """
        from .warm_transfer import WarmTransferRunner, WarmTransferError

        if self._warm_transfer_in_progress:
            raise WarmTransferError("A warm transfer is already in progress for this session.")
        self._warm_transfer_in_progress = True
        self._warm_transfer_task = asyncio.ensure_future(WarmTransferRunner(self, config).run())

        def _clear(_task: "asyncio.Task") -> None:
            self._warm_transfer_in_progress = False
            self._warm_transfer_task = None

        self._warm_transfer_task.add_done_callback(_clear)
        return await asyncio.shield(self._warm_transfer_task)

Manages an agent session with its associated conversation flow and pipeline.

Initialize an agent session.

Args

agent
Instance of an Agent class that handles the core logic
pipeline
Pipeline instance to process the agent's operations
wake_up
Time in seconds after which to trigger wake-up callback if no speech detected
background_audio
Configuration for background audio (optional)
dtmf_handler
DTMF handler for phone number input (optional)
voice_mail_detector
Voicemail detector (optional)

Ancestors

Instance variables

prop agent_state : AgentState
Expand source code
@property
def agent_state(self) -> AgentState:
    return self._agent_state
prop is_background_audio_enabled : bool
Expand source code
@property
def is_background_audio_enabled(self) -> bool:
    """Check if background audio is enabled in the pipeline"""
    audio_track = self._get_audio_track()
    return hasattr(audio_track, 'add_background_bytes')

Check if background audio is enabled in the pipeline

prop is_voicemail_detected : bool
Expand source code
@property
def is_voicemail_detected(self) -> bool:
    """Returns True if voicemail was detected in this session."""
    return self._is_voicemail_detected

Returns True if voicemail was detected in this session.

prop user_state : UserState
Expand source code
@property
def user_state(self) -> UserState:
    return self._user_state

Methods

async def call_transfer(self, token: str, transfer_to: str) ‑> None
Expand source code
async def call_transfer(self,token: str, transfer_to: str) -> None:
    """ Transfer the call to a provided Phone number or SIP endpoint.
    Args:
        token: VideoSDK auth token.
        transfer_to: Phone number or SIP endpoint to transfer the call to.
    """
    job_ctx = self._job_context
    if not job_ctx:
        try:
            job_ctx = get_current_job_context()
        except Exception:
            job_ctx = None

    room = getattr(job_ctx, "room", None) if job_ctx else None
    if room and hasattr(room, "call_transfer"):
        try:
            await room.call_transfer(token, transfer_to)
            return
        except Exception as exc:
            logger.error(f"Error calling call_transfer: {exc}")

Transfer the call to a provided Phone number or SIP endpoint.

Args

token
VideoSDK auth token.
transfer_to
Phone number or SIP endpoint to transfer the call to.
async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """
    Close the agent session.
    """
    logger.info("Closing agent session")
    if self._closed:
        logger.info("Agent session already closed")
        return
    self._closed = True
    self._emit_agent_state(AgentState.CLOSING)

    metrics_collector.finalize_session()
    traces_flow_manager = metrics_collector.traces_flow_manager
    if traces_flow_manager:
        start_time = time.perf_counter()
        await traces_flow_manager.start_agent_session_closed({"start_time": start_time})
        traces_flow_manager.end_agent_session_closed()

    self._cancel_wake_up_timer()

    logger.info("Cleaning up agent session")
    try:
        await self.agent.on_exit()
    except Exception as e:
        logger.error(f"Error in agent.on_exit(): {e}")
    
    if self._thinking_audio_player:
        await self._thinking_audio_player.stop()

    if self._background_audio_player:
        await self._background_audio_player.stop()

    try:
        await self.pipeline.cleanup()
    except Exception as e:
        logger.error(f"Error cleaning up pipeline: {e}")
    
    try:
        await self.agent.cleanup()
    except Exception as e:
        logger.error(f"Error cleaning up agent: {e}")
    
    self.agent = None
    self.pipeline = None
    self.on_wake_up = None
    self._wake_up_task = None
    logger.info("Agent session cleaned up")

Close the agent session.

def get_context_history(self,
*,
include_function_calls: bool = False,
include_system_messages: bool = False) ‑> list[dict]
Expand source code
def get_context_history(
    self,
    *,
    include_function_calls: bool = False,
    include_system_messages: bool = False,
) -> list[dict]:
    """
    Get the current chat context history (role/content items).

    Users can access the conversation history whenever they want, without needing
    any transcript hooks.
    """
    if not self.agent or not self.agent.chat_context:
        return []

    context_copy = self.agent.chat_context.copy(
        tools=None if include_function_calls else [],
        exclude_system_messages=not include_system_messages,
    )
    return context_copy.to_dict()["items"]

Get the current chat context history (role/content items).

Users can access the conversation history whenever they want, without needing any transcript hooks.

async def hangup(self, reason: str = 'manual_hangup') ‑> None
Expand source code
async def hangup(self, reason: str = "manual_hangup") -> None:
    """
    Hang up the session, leaving the room immediately if possible.
    """
    job_ctx = self._job_context
    if not job_ctx:
        try:
            job_ctx = get_current_job_context()
        except Exception:
            job_ctx = None

    room = getattr(job_ctx, "room", None) if job_ctx else None
    if room and hasattr(room, "force_end_session"):
        try:
            await room.force_end_session(reason)
            return
        except Exception as exc:
            logger.error(f"Error forcing room to end session: {exc}")

    await self.close()

Hang up the session, leaving the room immediately if possible.

def interrupt(self, *, force: bool = False) ‑> None
Expand source code
def interrupt(self, *, force: bool = False) -> None:
    """
    Interrupt the agent's current speech.
    """
    if self.current_utterance and not self.current_utterance.interrupted:
        try:
            self.current_utterance.interrupt(force=force) 
        except RuntimeError as e:
            logger.warning(f"Could not interrupt utterance: {e}")
            return
    
    if hasattr(self.pipeline, 'interrupt'):
        self.pipeline.interrupt()

Interrupt the agent's current speech.

async def leave(self) ‑> None
Expand source code
async def leave(self) -> None:
    """
    Leave the agent session.
    """
    self._emit_agent_state(AgentState.CLOSING)
    await self.pipeline.leave()

Leave the agent session.

def on_warm_transfer(self,
phase: Optional[str] = None,
callback: Optional[Callable[..., Any]] = None)
Expand source code
def on_warm_transfer(self, phase: Optional[str] = None, callback: Optional[Callable[..., Any]] = None):
    """Subscribe to warm-transfer phase changes (decorator or imperative).

    ``@session.on_warm_transfer()`` sees every phase; pass a phase name to
    filter to one. Handlers are sync and receive the ``warm_transfer`` event
    payload: ``{"phase": WarmTransferPhase, "data": {...}, "timestamp": float,
    "consultation_room_id": Optional[str]}``.
    """
    def _wrap(handler: Callable[..., Any]) -> Callable[..., Any]:
        if phase is None:
            return self.on("warm_transfer", handler)

        def _filtered(payload):
            pv = payload.get("phase") if isinstance(payload, dict) else None
            if getattr(pv, "value", pv) == phase:
                handler(payload)

        _filtered.__name__ = f"{handler.__name__}__phase_{phase}"
        return self.on("warm_transfer", _filtered)

    return _wrap if callback is None else _wrap(callback)

Subscribe to warm-transfer phase changes (decorator or imperative).

@session.on_warm_transfer() sees every phase; pass a phase name to filter to one. Handlers are sync and receive the warm_transfer event payload: {"phase": WarmTransferPhase, "data": {...}, "timestamp": float, "consultation_room_id": Optional[str]}.

async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) ‑> None
Expand source code
async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) -> None:
    """Play background audio on demand.

    override_thinking=True  -> thinking audio is allowed to layer over the background
                               music during LLM generation (stops on first TTS byte).
    override_thinking=False -> background music is exclusive; thinking audio is
                               suppressed for as long as background is playing.
    """
    self._override_thinking = override_thinking
    if not override_thinking and self._thinking_audio_player and self._thinking_audio_player.is_playing:
        await self.stop_thinking_audio()
        self._thinking_was_playing = True

    audio_track = self._get_audio_track()
    if not hasattr(audio_track, 'add_background_bytes'):
        logger.warning(
            "Cannot play background audio. This feature requires the mixing audio track. "
            "Enable it by setting `background_audio=True` in RoomOptions."
        )
        return

    if audio_track:
        self._background_audio_player = BackgroundAudioHandler(config, audio_track)
        
        await self._background_audio_player.start()
        # Track background audio start for metrics
        metrics_collector.on_background_audio_start(
            file_path=config.file_path,
            looping=config.looping
        )

Play background audio on demand.

override_thinking=True -> thinking audio is allowed to layer over the background music during LLM generation (stops on first TTS byte). override_thinking=False -> background music is exclusive; thinking audio is suppressed for as long as background is playing.

async def reply(self,
instructions: str,
wait_for_playback: bool = True,
frames: list[av.VideoFrame] | None = None,
interruptible: bool = True) ‑> UtteranceHandle
Expand source code
async def reply(self, instructions: str, wait_for_playback: bool = True, frames: list[av.VideoFrame] | None = None, interruptible: bool = True) -> UtteranceHandle:
    """
    Generate a response from agent using instructions and current chat context.
    
    This method is safe to call from function tools - it will automatically
    detect re-entrant calls and schedule them as background tasks.
    
    Args:
        instructions: Instructions to add to chat context
        wait_for_playback: If True, wait for playback to complete
        frames: Optional list of VideoFrame objects to include in the reply
        
    Returns:
        UtteranceHandle: A handle to track the utterance lifecycle
    """
    self._accept_user_input = True

    if self._reply_in_progress:
        if self.current_utterance:
            return self.current_utterance
        handle = UtteranceHandle(utterance_id="placeholder", interruptible=interruptible)
        handle._mark_done()
        return handle
    
    handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}", interruptible=interruptible)
    self.current_utterance = handle

    if self._is_executing_tool:
        asyncio.create_task(
            self._internal_blocking_reply(instructions, wait_for_playback, handle, frames)
        )
        return handle
    else:
        await self._internal_blocking_reply(instructions, wait_for_playback, handle, frames)
        return handle

Generate a response from agent using instructions and current chat context.

This method is safe to call from function tools - it will automatically detect re-entrant calls and schedule them as background tasks.

Args

instructions
Instructions to add to chat context
wait_for_playback
If True, wait for playback to complete
frames
Optional list of VideoFrame objects to include in the reply

Returns

UtteranceHandle
A handle to track the utterance lifecycle
async def say(self,
message: str,
interruptible: bool = True,
audio_data: bytes | bytearray | Iterable[bytes] | AsyncIterator[bytes] | None = None,
add_to_chat_context: bool = True) ‑> UtteranceHandle
Expand source code
async def say(
    self,
    message: str,
    interruptible: bool = True,
    audio_data: bytes | bytearray | Iterable[bytes] | AsyncIterator[bytes] | None = None,
    add_to_chat_context: bool = True,
) -> UtteranceHandle:
    """
    Send an initial message to the agent and return a handle to track it.
    When called from inside a function tool (_is_executing_tool), the current
    turn's utterance is not interrupted or replaced, so the LLM stream can
    continue after the tool returns.

    Args:
        message: The text being spoken. Always used for transcript/chat
            context (unless ``add_to_chat_context=False``).
        interruptible: If False, ``handle.interrupt()`` will raise unless
            forced.
        audio_data: Optional pre-synthesized PCM bytes (int16, matching
            the agent audio track sample rate). When provided, the TTS
            provider is bypassed and the bytes are streamed straight to
            the audio track. Accepts a single ``bytes`` blob, an
            iterable of ``bytes`` chunks, or an async iterator of
            ``bytes`` chunks. Use ``load_audio_file`` for WAV files or
            :class:`TTSAudioCache` to cache TTS output by text.
        add_to_chat_context: When False, the message is spoken but NOT
            added to the agent's chat history (useful for transient
            hold messages inside function tools).
    """
    handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}", interruptible=interruptible)
    self._accept_user_input = True
    if not self._is_executing_tool:
        if self.current_utterance and not self.current_utterance.done():
            self.current_utterance.interrupt()
        self.current_utterance = handle

    traces_flow_manager = metrics_collector.traces_flow_manager
    if traces_flow_manager:
        traces_flow_manager.agent_say_called(message)

    if add_to_chat_context:
        self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message)

    if hasattr(self.pipeline, 'send_message'):
        await self.pipeline.send_message(message, handle=handle, audio_data=audio_data)

    return handle

Send an initial message to the agent and return a handle to track it. When called from inside a function tool (_is_executing_tool), the current turn's utterance is not interrupted or replaced, so the LLM stream can continue after the tool returns.

Args

message
The text being spoken. Always used for transcript/chat context (unless add_to_chat_context=False).
interruptible
If False, handle.interrupt() will raise unless forced.
audio_data
Optional pre-synthesized PCM bytes (int16, matching the agent audio track sample rate). When provided, the TTS provider is bypassed and the bytes are streamed straight to the audio track. Accepts a single bytes blob, an iterable of bytes chunks, or an async iterator of bytes chunks. Use load_audio_file for WAV files or :class:TTSAudioCache to cache TTS output by text.
add_to_chat_context
When False, the message is spoken but NOT added to the agent's chat history (useful for transient hold messages inside function tools).
async def start(self,
wait_for_participant: bool = False,
run_until_shutdown: bool = False,
*,
observability: Optional[ObservabilityOptions] = None,
**kwargs: Any) ‑> None
Expand source code
async def start(
    self,
    wait_for_participant: bool = False,
    run_until_shutdown: bool = False,
    *,
    observability: Optional[ObservabilityOptions] = None,
    **kwargs: Any
) -> None:
    """
    Start the agent session.
    This will:
    1. Initialize the agent (including MCP tools if configured)
    2. Call the agent's on_enter hook
    3. Start the pipeline processing
    4. Start wake-up timer if configured (but only if callback is set)
    5. Optionally handle full lifecycle management (connect, wait, shutdown)

    Args:
        wait_for_participant: If True, wait for a participant to join before starting
        run_until_shutdown: If True, manage the full lifecycle including connection,
                           waiting for shutdown signals, and cleanup. This is a convenience
                           that internally calls ctx.run_until_shutdown() with this session.
        observability: Grouped recording/traces/metrics/logs config. When passed, it
                       is applied to ``ctx.room_options.observability`` before the
                       room connects, taking precedence over any value set on
                       ``RoomOptions``. Requires ``run_until_shutdown=True``; a
                       warning is logged otherwise.
        **kwargs: Additional arguments to pass to the pipeline start method

    Examples:
        Simple start (manual lifecycle management):
        ```python
        await session.start()
        ```

        Full lifecycle management (recommended):
        ```python
        await session.start(wait_for_participant=True, run_until_shutdown=True)
        ```

        Inline observability:
        ```python
        await session.start(
            wait_for_participant=True,
            run_until_shutdown=True,
            observability=ObservabilityOptions(
                recording=RecordingOptions(video=True),
            ),
        )
        ```
    """
    if observability is not None:
        ctx = None
        try:
            ctx = get_current_job_context()
        except Exception as e:
            logger.warning(f"Could not resolve JobContext for observability overrides: {e}")
        if ctx is not None and ctx.room_options is not None:
            if not run_until_shutdown:
                logger.warning(
                    "observability= passed to session.start() but run_until_shutdown=False; "
                    "overrides only apply if ctx.connect() has not yet run."
                )
            ctx.room_options.observability = observability
        else:
            logger.warning(
                "observability= passed to session.start() but no active JobContext "
                "(or no RoomOptions) was found; ignoring."
            )

    if run_until_shutdown:
        try:
            ctx = get_current_job_context()
            if ctx:
                logger.info("Starting session with full lifecycle management")
                await ctx.run_until_shutdown(
                    session=self,
                    wait_for_participant=wait_for_participant
                )
                return
            else:
                logger.warning(
                    "run_until_shutdown=True requires a JobContext, "
                    "falling back to normal start()"
                )
        except Exception as e:
            logger.warning(
                f"Failed to get JobContext for run_until_shutdown: {e}, "
                "falling back to normal start()"
            )
    
    self._emit_agent_state(AgentState.STARTING)
    
    if self.agent._mcp_servers:
        await self.agent.initialize_mcp()

    if self.dtmf_handler:
        await self.dtmf_handler.start()

    # Configure metrics with session info
    metrics_collector.set_system_instructions(self.agent.instructions)

    # Set provider info based on pipeline components

    if not self.pipeline.config.is_realtime:
        if self.pipeline.stt:
            p_class, p_model = self._get_provider_info(self.pipeline.stt, 'stt')
            metrics_collector.set_provider_info("stt", p_class, p_model)
        if self.pipeline.llm:
            p_class, p_model = self._get_provider_info(self.pipeline.llm, 'llm')
            metrics_collector.set_provider_info("llm", p_class, p_model)
        if self.pipeline.tts:
            p_class, p_model = self._get_provider_info(self.pipeline.tts, 'tts')
            metrics_collector.set_provider_info("tts", p_class, p_model)
        if hasattr(self.pipeline, 'vad') and self.pipeline.vad:
            p_class, p_model = self._get_provider_info(self.pipeline.vad, 'vad')
            metrics_collector.set_provider_info("vad", p_class, p_model)
        if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector:
            p_class, p_model = self._get_provider_info(self.pipeline.turn_detector, 'eou')
            metrics_collector.set_provider_info("eou", p_class, p_model)
    else:
        if self.pipeline._realtime_model:
            metrics_collector.set_provider_info("realtime", format_provider_class(self.pipeline._realtime_model), getattr(self.pipeline._realtime_model, 'model', ''))
        if self.pipeline.stt:
            p_class, p_model = self._get_provider_info(self.pipeline.stt, 'stt')
            metrics_collector.set_provider_info("stt", p_class, p_model)
        if self.pipeline.tts:
            p_class, p_model = self._get_provider_info(self.pipeline.tts, 'tts')
            metrics_collector.set_provider_info("tts", p_class, p_model)

    # Traces flow manager setup
    traces_flow_manager = metrics_collector.traces_flow_manager
    if traces_flow_manager:
        config_attributes = {
            "system_instructions": self.agent.instructions,
            "function_tools": [
                get_tool_info(tool).name
                for tool in (
                    [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools]
                    if self.agent.mcp_manager else self.agent.tools
                )
            ] if self.agent.tools else [],
            "mcp_tools": [
                tool._tool_info.name
                for tool in self.agent.mcp_manager.tools
            ] if self.agent.mcp_manager else [],
            "pipeline": self.pipeline.__class__.__name__,
            "pipeline_mode": self.pipeline.config.pipeline_mode.value,
            "transport_mode": metrics_collector.transport_mode
        }
        start_time = time.perf_counter()
        config_attributes["start_time"] = start_time
        await traces_flow_manager.start_agent_session_config(config_attributes)
        await traces_flow_manager.start_agent_session({"start_time": start_time})

    if hasattr(self.pipeline, 'set_agent'):
        self.pipeline.set_agent(self.agent)


    await self.pipeline.start()

    if hasattr(self.agent, 'a2a'):
        self.agent.a2a._attach_deferred_listeners()

    if self._should_delay_for_sip_user():
        logger.info("SIP user detected, waiting for audio stream to be enabled before calling on_enter")
        audio_stream_enabled = asyncio.Event()

        def on_audio_stream_enabled(data):
            stream = data.get("stream")
            participant = data.get("participant")
            if stream and stream.kind == "audio" and participant and participant.meta_data.get("sipUser"):
                logger.info(f"SIP user audio stream enabled for participant {participant.id}")
                audio_stream_enabled.set()

        global_event_emitter.on("AUDIO_STREAM_ENABLED", on_audio_stream_enabled)

        async def wait_and_start():
            try:
                await audio_stream_enabled.wait()
                logger.info("SIP user audio stream enabled, proceeding with on_enter")
                await self.agent.on_enter()
                self._accept_user_input = True
                global_event_emitter.emit("AGENT_STARTED", {"session": self})
                if self.on_wake_up is not None:
                    self._start_wake_up_timer()
                self._emit_agent_state(AgentState.IDLE)
            except Exception as e:
                logger.error(f"Error in wait_and_start: {e}")
            finally:
                global_event_emitter.off("AUDIO_STREAM_ENABLED", on_audio_stream_enabled)

        asyncio.create_task(wait_and_start())
        return 

    await self.agent.on_enter()
    self._accept_user_input = True

    global_event_emitter.emit("AGENT_STARTED", {"session": self})
    if self.on_wake_up is not None:
        self._start_wake_up_timer()
    self._emit_agent_state(AgentState.IDLE)

Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set) 5. Optionally handle full lifecycle management (connect, wait, shutdown)

Args

wait_for_participant
If True, wait for a participant to join before starting
run_until_shutdown
If True, manage the full lifecycle including connection, waiting for shutdown signals, and cleanup. This is a convenience that internally calls ctx.run_until_shutdown() with this session.
observability
Grouped recording/traces/metrics/logs config. When passed, it is applied to ctx.room_options.observability before the room connects, taking precedence over any value set on RoomOptions. Requires run_until_shutdown=True; a warning is logged otherwise.
**kwargs
Additional arguments to pass to the pipeline start method

Examples

Simple start (manual lifecycle management):

await session.start()

Full lifecycle management (recommended):

await session.start(wait_for_participant=True, run_until_shutdown=True)

Inline observability:

await session.start(
    wait_for_participant=True,
    run_until_shutdown=True,
    observability=ObservabilityOptions(
        recording=RecordingOptions(video=True),
    ),
)
async def start_thinking_audio(self)
Expand source code
async def start_thinking_audio(self):
    """Start thinking audio"""
    if (
        self._background_audio_player
        and self._background_audio_player.is_playing
        and not self._override_thinking
    ):
        return
    if self._thinking_audio_player and self._thinking_audio_player.is_playing:
        return

    audio_track = self._get_audio_track()
    if not hasattr(audio_track, 'add_background_bytes'):
        logger.warning(
            "Cannot play 'thinking' audio. This feature requires the mixing audio track. "
            "Enable it by setting `background_audio=True` in RoomOptions."
        )
        return

    if self.agent._thinking_background_config and audio_track:
        self._thinking_audio_player = BackgroundAudioHandler(self.agent._thinking_background_config, audio_track)
        await self._thinking_audio_player.start()
        # Track thinking audio start for metrics
        metrics_collector.on_thinking_audio_start(
            file_path=self.agent._thinking_background_config.file_path,
            looping=self.agent._thinking_background_config.looping
        )

Start thinking audio

async def stop_background_audio(self) ‑> None
Expand source code
async def stop_background_audio(self) -> None:
    """Stop background audio on demand"""
    if self._background_audio_player:
        await self._background_audio_player.stop()
        self._background_audio_player = None
        # Track background audio stop for metrics
        metrics_collector.on_background_audio_stop()

    self._override_thinking = False

    if self._thinking_was_playing:
        await self.start_thinking_audio()
        self._thinking_was_playing = False

Stop background audio on demand

async def stop_thinking_audio(self)
Expand source code
async def stop_thinking_audio(self):
    """Stop thinking audio"""
    if self._thinking_audio_player:
        await self._thinking_audio_player.stop()
        self._thinking_audio_player = None
        # Track thinking audio stop for metrics
        metrics_collector.on_thinking_audio_stop()

Stop thinking audio

async def warm_transfer(self, config: "'WarmTransferConfig'") ‑> 'WarmTransferResult'
Expand source code
async def warm_transfer(self, config: "WarmTransferConfig") -> "WarmTransferResult":
    """Run a SIP-to-SIP warm transfer to a human supervisor.

    See :mod:`videosdk.agents.warm_transfer` for the config surface and the
    ``@session.on_warm_transfer(...)`` phase hook.

    The transfer runs in its own task and is shielded from the caller of this
    method being cancelled (this is usually invoked from a ``@function_tool``,
    and that task gets cancelled whenever the caller talks over the agent /
    triggers an interruption — which must not abort an in-flight transfer).
    If the awaiting tool is cancelled, the transfer keeps running to
    completion in the background; the result is simply not returned.
    """
    from .warm_transfer import WarmTransferRunner, WarmTransferError

    if self._warm_transfer_in_progress:
        raise WarmTransferError("A warm transfer is already in progress for this session.")
    self._warm_transfer_in_progress = True
    self._warm_transfer_task = asyncio.ensure_future(WarmTransferRunner(self, config).run())

    def _clear(_task: "asyncio.Task") -> None:
        self._warm_transfer_in_progress = False
        self._warm_transfer_task = None

    self._warm_transfer_task.add_done_callback(_clear)
    return await asyncio.shield(self._warm_transfer_task)

Run a SIP-to-SIP warm transfer to a human supervisor.

See :mod:videosdk.agents.warm_transfer for the config surface and the @session.on_warm_transfer(...) phase hook.

The transfer runs in its own task and is shielded from the caller of this method being cancelled (this is usually invoked from a @function_tool, and that task gets cancelled whenever the caller talks over the agent / triggers an interruption — which must not abort an in-flight transfer). If the awaiting tool is cancelled, the transfer keeps running to completion in the background; the result is simply not returned.

Inherited members