Module agents.agent_session

Classes

class AgentSession (agent: Agent,
pipeline: Pipeline,
conversation_flow: Optional[ConversationFlow] = None,
wake_up: Optional[int] = None,
background_audio: Optional[BackgroundAudioHandlerConfig] = None)
Expand source code
class AgentSession(EventEmitter[Literal["user_state_changed", "agent_state_changed", "on_speech_in", "on_speech_out"]]):
    """
    Manages an agent session with its associated conversation flow and pipeline.
    """

    def __init__(
        self,
        agent: Agent,
        pipeline: Pipeline,
        conversation_flow: Optional[ConversationFlow] = None,
        wake_up: Optional[int] = None,
        background_audio: Optional[BackgroundAudioHandlerConfig] = None,
    ) -> None:
        """
        Initialize an agent session.

        Args:
            agent: Instance of an Agent class that handles the core logic
            pipeline: Pipeline instance to process the agent's operations
            conversation_flow: ConversationFlow instance to manage conversation state
            wake_up: Time in seconds after which to trigger wake-up callback if no speech detected
            background_audio: Configuration for background audio (optional)
        """
        super().__init__()
        self.agent = agent
        self.pipeline = pipeline
        self.conversation_flow = conversation_flow
        self.agent.session = self
        self.wake_up = wake_up
        self.on_wake_up: Optional[Callable[[], None] | Callable[[], Any]] = None
        self._wake_up_task: Optional[asyncio.Task] = None
        self._wake_up_timer_active = False
        self._closed: bool = False
        self._reply_in_progress: bool = False
        self._user_state: UserState = UserState.IDLE
        self._agent_state: AgentState = AgentState.IDLE
        self.current_utterance: Optional[UtteranceHandle] = None
        self._thinking_audio_player: Optional[BackgroundAudioHandler] = None
        self._background_audio_player: Optional[BackgroundAudioHandler] = None
        self._thinking_was_playing = False
        self.background_audio_config = background_audio

        if hasattr(self.pipeline, 'set_agent'):
            self.pipeline.set_agent(self.agent)
        
        if (
            hasattr(self.pipeline, "set_conversation_flow")
            and self.conversation_flow is not None
        ):
            self.pipeline.set_conversation_flow(self.conversation_flow)
        if hasattr(self.pipeline, 'set_wake_up_callback'):
            self.pipeline.set_wake_up_callback(self._reset_wake_up_timer)

        global_event_emitter.on("ON_SPEECH_IN", self._on_speech_in)
        global_event_emitter.on("ON_SPEECH_OUT", self._on_speech_out)

        try:
            job_ctx = get_current_job_context()
            if job_ctx:
                job_ctx.add_shutdown_callback(self.close)
        except Exception:
            pass

    def _on_speech_in(self, data: dict) -> None:
        self.emit("on_speech_in", data)

    def _on_speech_out(self, data: dict) -> None:
        self.emit("on_speech_out", data)

    def _start_wake_up_timer(self) -> None:
        if self.wake_up is not None and self.on_wake_up is not None:
            self._wake_up_timer_active = True
            self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop())
    
    def _reset_wake_up_timer(self) -> None:
        if self.wake_up is not None and self.on_wake_up is not None:
            if self._reply_in_progress:
                return
            if self._wake_up_task and not self._wake_up_task.done():
                self._wake_up_task.cancel()
            self._wake_up_timer_active = True
            self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop())
    
    def _pause_wake_up_timer(self) -> None:
        if self._wake_up_task and not self._wake_up_task.done():
            self._wake_up_task.cancel()
    
    def _cancel_wake_up_timer(self) -> None:
        if self._wake_up_task and not self._wake_up_task.done():
            self._wake_up_task.cancel()
        self._wake_up_timer_active = False
    
    async def _wake_up_timer_loop(self) -> None:
        try:
            await asyncio.sleep(self.wake_up)
            if self._wake_up_timer_active and self.on_wake_up and not self._reply_in_progress:
                if asyncio.iscoroutinefunction(self.on_wake_up):
                    await self.on_wake_up()
                else:
                    self.on_wake_up()
        except asyncio.CancelledError:
            pass

    def _emit_user_state(self, state: UserState, data: dict | None = None) -> None:
        if state != self._user_state:
            self._user_state = state
            payload = {"state": state.value, **(data or {})}
            self.emit("user_state_changed", payload)

    def _emit_agent_state(self, state: AgentState, data: dict | None = None) -> None:
        if state != self._agent_state:
            self._agent_state = state
            payload = {"state": state.value, **(data or {})}
            self.emit("agent_state_changed", payload)

    @property
    def user_state(self) -> UserState:
        return self._user_state

    @property
    def agent_state(self) -> AgentState:
        return self._agent_state

    @property
    def is_background_audio_enabled(self) -> bool:
        """Check if background audio is enabled in the pipeline"""
        audio_track = self._get_audio_track()
        return hasattr(audio_track, 'add_background_bytes')

    async def start(self, **kwargs: Any) -> None:
        """
        Start the agent session.
        This will:
        1. Initialize the agent (including MCP tools if configured)
        2. Call the agent's on_enter hook
        3. Start the pipeline processing
        4. Start wake-up timer if configured (but only if callback is set)
        
        Args:
            **kwargs: Additional arguments to pass to the pipeline start method
        """       
        self._emit_agent_state(AgentState.STARTING)
        await self.agent.initialize_mcp()

        if isinstance(self.pipeline, RealTimePipeline):
            await realtime_metrics_collector.start_session(self.agent, self.pipeline)
        else:
            traces_flow_manager = cascading_metrics_collector.traces_flow_manager
            if traces_flow_manager:
                config_attributes = {
                    "system_instructions": self.agent.instructions,
                    "function_tools": [
                        get_tool_info(tool).name
                        for tool in (
                            [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools]
                            if self.agent.mcp_manager else self.agent.tools
                        )
                    ] if self.agent.tools else [],

                    "mcp_tools": [
                        get_tool_info(tool).name
                        for tool in self.agent.mcp_manager.tools
                    ] if self.agent.mcp_manager else [],

                    "pipeline": self.pipeline.__class__.__name__,
                    **({
                        "stt_provider": self.pipeline.stt.__class__.__name__ if self.pipeline.stt else None,
                        "tts_provider": self.pipeline.tts.__class__.__name__ if self.pipeline.tts else None, 
                        "llm_provider": self.pipeline.llm.__class__.__name__ if self.pipeline.llm else None,
                        "vad_provider": self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else None,
                        "eou_provider": self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None,
                        "stt_model": self.pipeline.get_component_configs()['stt'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.stt else None,
                        "llm_model": self.pipeline.get_component_configs()['llm'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.llm else None,
                        "tts_model": self.pipeline.get_component_configs()['tts'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.tts else None,
                        "vad_model": self.pipeline.get_component_configs()['vad'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'vad') and self.pipeline.vad else None,
                        "eou_model": self.pipeline.get_component_configs()['eou'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None
                    } if self.pipeline.__class__.__name__ == "CascadingPipeline" else {}),
                }
                start_time = time.perf_counter()
                config_attributes["start_time"] = start_time
                await traces_flow_manager.start_agent_session_config(config_attributes)
                await traces_flow_manager.start_agent_session({"start_time": start_time})

            if self.pipeline.__class__.__name__ == "CascadingPipeline":
                configs = self.pipeline.get_component_configs() if hasattr(self.pipeline, 'get_component_configs') else {}
                cascading_metrics_collector.set_provider_info(
                    llm_provider=self.pipeline.llm.__class__.__name__ if self.pipeline.llm else "",
                    llm_model=configs.get('llm', {}).get('model', "") if self.pipeline.llm else "",
                    stt_provider=self.pipeline.stt.__class__.__name__ if self.pipeline.stt else "",
                    stt_model=configs.get('stt', {}).get('model', "") if self.pipeline.stt else "",
                    tts_provider=self.pipeline.tts.__class__.__name__ if self.pipeline.tts else "",
                    tts_model=configs.get('tts', {}).get('model', "") if self.pipeline.tts else "",
                    vad_provider=self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "",
                    vad_model=configs.get('vad', {}).get('model', "") if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "",
                    eou_provider=self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "",
                    eou_model=configs.get('eou', {}).get('model', "") if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else ""
                )
        
        if hasattr(self.pipeline, 'set_agent'):
            self.pipeline.set_agent(self.agent)

        self.on("on_speech_in", self.agent.on_speech_in)
        self.on("on_speech_out", self.agent.on_speech_out)

        await self.pipeline.start()
        await self.agent.on_enter()
        global_event_emitter.emit("AGENT_STARTED", {"session": self})
        if self.on_wake_up is not None:
            self._start_wake_up_timer()
        self._emit_agent_state(AgentState.IDLE)
        
    async def say(self, message: str) -> UtteranceHandle:
        """
        Send an initial message to the agent and return a handle to track it.
        """
        if self.current_utterance and not self.current_utterance.done():
            self.current_utterance.interrupt() 

        handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}")
        self.current_utterance = handle

        if not isinstance(self.pipeline, RealTimePipeline):
            traces_flow_manager = cascading_metrics_collector.traces_flow_manager
            if traces_flow_manager:
                traces_flow_manager.agent_say_called(message)
        
        self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message)
        
        if hasattr(self.pipeline, 'send_message'):
            await self.pipeline.send_message(message, handle=handle)
        
        return handle
    
    async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) -> None:
        """Play background audio on demand"""
        if override_thinking and self._thinking_audio_player and self._thinking_audio_player.is_playing:
            await self.stop_thinking_audio()
            self._thinking_was_playing = True

        audio_track = self._get_audio_track()
        if not hasattr(audio_track, 'add_background_bytes'):
            logger.warning(
                "Cannot play background audio. This feature requires the mixing audio track. "
                "Enable it by setting `background_audio=True` in RoomOptions."
            )
            return

        if audio_track:
            self._background_audio_player = BackgroundAudioHandler(config, audio_track)
            
            await self._background_audio_player.start()


    async def stop_background_audio(self) -> None:
        """Stop background audio on demand"""
        if self._background_audio_player:
            await self._background_audio_player.stop()
            self._background_audio_player = None

        if self._thinking_was_playing:
            await self.start_thinking_audio()
            self._thinking_was_playing = False

    def _get_audio_track(self):
        """Get audio track from pipeline"""
        if hasattr(self.pipeline, 'tts') and self.pipeline.tts and self.pipeline.tts.audio_track: # Cascading
            return self.pipeline.tts.audio_track
        elif hasattr(self.pipeline, 'model') and self.pipeline.model and self.pipeline.model.audio_track: # Realtime
            return self.pipeline.model.audio_track
        return None

    async def start_thinking_audio(self):
        """Start thinking audio"""
        if self._background_audio_player and self._background_audio_player.is_playing:
            return

        audio_track = self._get_audio_track()
        if not hasattr(audio_track, 'add_background_bytes'):
            logger.warning(
                "Cannot play 'thinking' audio. This feature requires the mixing audio track. "
                "Enable it by setting `background_audio=True` in RoomOptions."
            )
            return

        if self.agent._thinking_background_config and audio_track:
            self._thinking_audio_player = BackgroundAudioHandler(self.agent._thinking_background_config, audio_track)
            await self._thinking_audio_player.start()

    async def stop_thinking_audio(self):
        """Stop thinking audio"""
        if self._thinking_audio_player:
            await self._thinking_audio_player.stop()
            self._thinking_audio_player = None
    
    async def reply(self, instructions: str, wait_for_playback: bool = True) -> UtteranceHandle:
        """
        Generate a response from agent using instructions and current chat context.
        Subsequent calls are discarded while the first one is still running.
        Returns a handle to track the utterance.
        """
        if not instructions:
            handle = UtteranceHandle(utterance_id="empty_reply")
            handle._mark_done()
            return handle
        
        if self._reply_in_progress:
            if self.current_utterance:
                return self.current_utterance
            # Create a placeholder that will never resolve to avoid blocking
            handle = UtteranceHandle(utterance_id="placeholder")
            handle._mark_done()
            return handle

        if self.current_utterance and not self.current_utterance.done():
            self.current_utterance.interrupt()

        handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}")
        self.current_utterance = handle
        self._reply_in_progress = True
        
        self._pause_wake_up_timer()
        
        try:
            if not isinstance(self.pipeline, RealTimePipeline):
                traces_flow_manager = cascading_metrics_collector.traces_flow_manager
                if traces_flow_manager:
                    traces_flow_manager.agent_reply_called(instructions)

            if hasattr(self.pipeline, 'reply_with_context'):
                await self.pipeline.reply_with_context(instructions, wait_for_playback, handle=handle)
            else:
                if hasattr(self.pipeline, 'send_text_message'):
                    await self.pipeline.send_text_message(instructions)
                else:
                    await self.pipeline.send_message(instructions)
        finally:
            self._reply_in_progress = False
            
        return handle
    
    def interrupt(self) -> None:
        """
        Interrupt the agent's current speech.
        """
        if self.current_utterance and not self.current_utterance.interrupted:
            self.current_utterance.interrupt()
        
        if hasattr(self.pipeline, 'interrupt'):
            self.pipeline.interrupt()

    async def close(self) -> None:
        """
        Close the agent session.
        """
        logger.info("Closing agent session")
        if self._closed:
            logger.info("Agent session already closed")
            return
        self._closed = True
        self._emit_agent_state(AgentState.CLOSING)
        if isinstance(self.pipeline, RealTimePipeline):
            realtime_metrics_collector.finalize_session()
            traces_flow_manager = realtime_metrics_collector.traces_flow_manager
            if traces_flow_manager:
                start_time = time.perf_counter()
                await traces_flow_manager.start_agent_session_closed({"start_time": start_time})
                traces_flow_manager.end_agent_session_closed()
        else:
            traces_flow_manager = cascading_metrics_collector.traces_flow_manager
            if traces_flow_manager:
                start_time = time.perf_counter()
                await traces_flow_manager.start_agent_session_closed({"start_time": start_time})
                traces_flow_manager.end_agent_session_closed()

        self._cancel_wake_up_timer()
        
        global_event_emitter.off("ON_SPEECH_IN", self._on_speech_in)
        global_event_emitter.off("ON_SPEECH_OUT", self._on_speech_out)

        self.off("on_speech_in", self.agent.on_speech_in)
        self.off("on_speech_out", self.agent.on_speech_out)

        logger.info("Cleaning up agent session")
        try:
            await self.agent.on_exit()
        except Exception as e:
            logger.error(f"Error in agent.on_exit(): {e}")
        
        if self._thinking_audio_player:
            await self._thinking_audio_player.stop()

        if self._background_audio_player:
            await self._background_audio_player.stop()

        try:
            await self.pipeline.cleanup()
        except Exception as e:
            logger.error(f"Error cleaning up pipeline: {e}")
        
        if self.conversation_flow:
            try:
                await self.conversation_flow.cleanup()
            except Exception as e:
                logger.error(f"Error cleaning up conversation flow: {e}")
            self.conversation_flow = None
        
        try:
            await self.agent.cleanup()
        except Exception as e:
            logger.error(f"Error cleaning up agent: {e}")
        
        self.agent = None
        self.pipeline = None
        self.on_wake_up = None
        self._wake_up_task = None
        logger.info("Agent session cleaned up")

    async def leave(self) -> None:
        """
        Leave the agent session.
        """
        self._emit_agent_state(AgentState.CLOSING)
        await self.pipeline.leave()

Manages an agent session with its associated conversation flow and pipeline.

Initialize an agent session.

Args

agent
Instance of an Agent class that handles the core logic
pipeline
Pipeline instance to process the agent's operations
conversation_flow
ConversationFlow instance to manage conversation state
wake_up
Time in seconds after which to trigger wake-up callback if no speech detected
background_audio
Configuration for background audio (optional)

Ancestors

Instance variables

prop agent_state : AgentState
Expand source code
@property
def agent_state(self) -> AgentState:
    return self._agent_state
prop is_background_audio_enabled : bool
Expand source code
@property
def is_background_audio_enabled(self) -> bool:
    """Check if background audio is enabled in the pipeline"""
    audio_track = self._get_audio_track()
    return hasattr(audio_track, 'add_background_bytes')

Check if background audio is enabled in the pipeline

prop user_state : UserState
Expand source code
@property
def user_state(self) -> UserState:
    return self._user_state

Methods

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """
    Close the agent session.
    """
    logger.info("Closing agent session")
    if self._closed:
        logger.info("Agent session already closed")
        return
    self._closed = True
    self._emit_agent_state(AgentState.CLOSING)
    if isinstance(self.pipeline, RealTimePipeline):
        realtime_metrics_collector.finalize_session()
        traces_flow_manager = realtime_metrics_collector.traces_flow_manager
        if traces_flow_manager:
            start_time = time.perf_counter()
            await traces_flow_manager.start_agent_session_closed({"start_time": start_time})
            traces_flow_manager.end_agent_session_closed()
    else:
        traces_flow_manager = cascading_metrics_collector.traces_flow_manager
        if traces_flow_manager:
            start_time = time.perf_counter()
            await traces_flow_manager.start_agent_session_closed({"start_time": start_time})
            traces_flow_manager.end_agent_session_closed()

    self._cancel_wake_up_timer()
    
    global_event_emitter.off("ON_SPEECH_IN", self._on_speech_in)
    global_event_emitter.off("ON_SPEECH_OUT", self._on_speech_out)

    self.off("on_speech_in", self.agent.on_speech_in)
    self.off("on_speech_out", self.agent.on_speech_out)

    logger.info("Cleaning up agent session")
    try:
        await self.agent.on_exit()
    except Exception as e:
        logger.error(f"Error in agent.on_exit(): {e}")
    
    if self._thinking_audio_player:
        await self._thinking_audio_player.stop()

    if self._background_audio_player:
        await self._background_audio_player.stop()

    try:
        await self.pipeline.cleanup()
    except Exception as e:
        logger.error(f"Error cleaning up pipeline: {e}")
    
    if self.conversation_flow:
        try:
            await self.conversation_flow.cleanup()
        except Exception as e:
            logger.error(f"Error cleaning up conversation flow: {e}")
        self.conversation_flow = None
    
    try:
        await self.agent.cleanup()
    except Exception as e:
        logger.error(f"Error cleaning up agent: {e}")
    
    self.agent = None
    self.pipeline = None
    self.on_wake_up = None
    self._wake_up_task = None
    logger.info("Agent session cleaned up")

Close the agent session.

def interrupt(self) ‑> None
Expand source code
def interrupt(self) -> None:
    """
    Interrupt the agent's current speech.
    """
    if self.current_utterance and not self.current_utterance.interrupted:
        self.current_utterance.interrupt()
    
    if hasattr(self.pipeline, 'interrupt'):
        self.pipeline.interrupt()

Interrupt the agent's current speech.

async def leave(self) ‑> None
Expand source code
async def leave(self) -> None:
    """
    Leave the agent session.
    """
    self._emit_agent_state(AgentState.CLOSING)
    await self.pipeline.leave()

Leave the agent session.

async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) ‑> None
Expand source code
async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) -> None:
    """Play background audio on demand"""
    if override_thinking and self._thinking_audio_player and self._thinking_audio_player.is_playing:
        await self.stop_thinking_audio()
        self._thinking_was_playing = True

    audio_track = self._get_audio_track()
    if not hasattr(audio_track, 'add_background_bytes'):
        logger.warning(
            "Cannot play background audio. This feature requires the mixing audio track. "
            "Enable it by setting `background_audio=True` in RoomOptions."
        )
        return

    if audio_track:
        self._background_audio_player = BackgroundAudioHandler(config, audio_track)
        
        await self._background_audio_player.start()

Play background audio on demand

async def reply(self, instructions: str, wait_for_playback: bool = True) ‑> UtteranceHandle
Expand source code
async def reply(self, instructions: str, wait_for_playback: bool = True) -> UtteranceHandle:
    """
    Generate a response from agent using instructions and current chat context.
    Subsequent calls are discarded while the first one is still running.
    Returns a handle to track the utterance.
    """
    if not instructions:
        handle = UtteranceHandle(utterance_id="empty_reply")
        handle._mark_done()
        return handle
    
    if self._reply_in_progress:
        if self.current_utterance:
            return self.current_utterance
        # Create a placeholder that will never resolve to avoid blocking
        handle = UtteranceHandle(utterance_id="placeholder")
        handle._mark_done()
        return handle

    if self.current_utterance and not self.current_utterance.done():
        self.current_utterance.interrupt()

    handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}")
    self.current_utterance = handle
    self._reply_in_progress = True
    
    self._pause_wake_up_timer()
    
    try:
        if not isinstance(self.pipeline, RealTimePipeline):
            traces_flow_manager = cascading_metrics_collector.traces_flow_manager
            if traces_flow_manager:
                traces_flow_manager.agent_reply_called(instructions)

        if hasattr(self.pipeline, 'reply_with_context'):
            await self.pipeline.reply_with_context(instructions, wait_for_playback, handle=handle)
        else:
            if hasattr(self.pipeline, 'send_text_message'):
                await self.pipeline.send_text_message(instructions)
            else:
                await self.pipeline.send_message(instructions)
    finally:
        self._reply_in_progress = False
        
    return handle

Generate a response from agent using instructions and current chat context. Subsequent calls are discarded while the first one is still running. Returns a handle to track the utterance.

async def say(self, message: str) ‑> UtteranceHandle
Expand source code
async def say(self, message: str) -> UtteranceHandle:
    """
    Send an initial message to the agent and return a handle to track it.
    """
    if self.current_utterance and not self.current_utterance.done():
        self.current_utterance.interrupt() 

    handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}")
    self.current_utterance = handle

    if not isinstance(self.pipeline, RealTimePipeline):
        traces_flow_manager = cascading_metrics_collector.traces_flow_manager
        if traces_flow_manager:
            traces_flow_manager.agent_say_called(message)
    
    self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message)
    
    if hasattr(self.pipeline, 'send_message'):
        await self.pipeline.send_message(message, handle=handle)
    
    return handle

Send an initial message to the agent and return a handle to track it.

async def start(self, **kwargs: Any) ‑> None
Expand source code
async def start(self, **kwargs: Any) -> None:
    """
    Start the agent session.
    This will:
    1. Initialize the agent (including MCP tools if configured)
    2. Call the agent's on_enter hook
    3. Start the pipeline processing
    4. Start wake-up timer if configured (but only if callback is set)
    
    Args:
        **kwargs: Additional arguments to pass to the pipeline start method
    """       
    self._emit_agent_state(AgentState.STARTING)
    await self.agent.initialize_mcp()

    if isinstance(self.pipeline, RealTimePipeline):
        await realtime_metrics_collector.start_session(self.agent, self.pipeline)
    else:
        traces_flow_manager = cascading_metrics_collector.traces_flow_manager
        if traces_flow_manager:
            config_attributes = {
                "system_instructions": self.agent.instructions,
                "function_tools": [
                    get_tool_info(tool).name
                    for tool in (
                        [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools]
                        if self.agent.mcp_manager else self.agent.tools
                    )
                ] if self.agent.tools else [],

                "mcp_tools": [
                    get_tool_info(tool).name
                    for tool in self.agent.mcp_manager.tools
                ] if self.agent.mcp_manager else [],

                "pipeline": self.pipeline.__class__.__name__,
                **({
                    "stt_provider": self.pipeline.stt.__class__.__name__ if self.pipeline.stt else None,
                    "tts_provider": self.pipeline.tts.__class__.__name__ if self.pipeline.tts else None, 
                    "llm_provider": self.pipeline.llm.__class__.__name__ if self.pipeline.llm else None,
                    "vad_provider": self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else None,
                    "eou_provider": self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None,
                    "stt_model": self.pipeline.get_component_configs()['stt'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.stt else None,
                    "llm_model": self.pipeline.get_component_configs()['llm'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.llm else None,
                    "tts_model": self.pipeline.get_component_configs()['tts'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.tts else None,
                    "vad_model": self.pipeline.get_component_configs()['vad'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'vad') and self.pipeline.vad else None,
                    "eou_model": self.pipeline.get_component_configs()['eou'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None
                } if self.pipeline.__class__.__name__ == "CascadingPipeline" else {}),
            }
            start_time = time.perf_counter()
            config_attributes["start_time"] = start_time
            await traces_flow_manager.start_agent_session_config(config_attributes)
            await traces_flow_manager.start_agent_session({"start_time": start_time})

        if self.pipeline.__class__.__name__ == "CascadingPipeline":
            configs = self.pipeline.get_component_configs() if hasattr(self.pipeline, 'get_component_configs') else {}
            cascading_metrics_collector.set_provider_info(
                llm_provider=self.pipeline.llm.__class__.__name__ if self.pipeline.llm else "",
                llm_model=configs.get('llm', {}).get('model', "") if self.pipeline.llm else "",
                stt_provider=self.pipeline.stt.__class__.__name__ if self.pipeline.stt else "",
                stt_model=configs.get('stt', {}).get('model', "") if self.pipeline.stt else "",
                tts_provider=self.pipeline.tts.__class__.__name__ if self.pipeline.tts else "",
                tts_model=configs.get('tts', {}).get('model', "") if self.pipeline.tts else "",
                vad_provider=self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "",
                vad_model=configs.get('vad', {}).get('model', "") if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "",
                eou_provider=self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "",
                eou_model=configs.get('eou', {}).get('model', "") if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else ""
            )
    
    if hasattr(self.pipeline, 'set_agent'):
        self.pipeline.set_agent(self.agent)

    self.on("on_speech_in", self.agent.on_speech_in)
    self.on("on_speech_out", self.agent.on_speech_out)

    await self.pipeline.start()
    await self.agent.on_enter()
    global_event_emitter.emit("AGENT_STARTED", {"session": self})
    if self.on_wake_up is not None:
        self._start_wake_up_timer()
    self._emit_agent_state(AgentState.IDLE)

Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set)

Args

**kwargs
Additional arguments to pass to the pipeline start method
async def start_thinking_audio(self)
Expand source code
async def start_thinking_audio(self):
    """Start thinking audio"""
    if self._background_audio_player and self._background_audio_player.is_playing:
        return

    audio_track = self._get_audio_track()
    if not hasattr(audio_track, 'add_background_bytes'):
        logger.warning(
            "Cannot play 'thinking' audio. This feature requires the mixing audio track. "
            "Enable it by setting `background_audio=True` in RoomOptions."
        )
        return

    if self.agent._thinking_background_config and audio_track:
        self._thinking_audio_player = BackgroundAudioHandler(self.agent._thinking_background_config, audio_track)
        await self._thinking_audio_player.start()

Start thinking audio

async def stop_background_audio(self) ‑> None
Expand source code
async def stop_background_audio(self) -> None:
    """Stop background audio on demand"""
    if self._background_audio_player:
        await self._background_audio_player.stop()
        self._background_audio_player = None

    if self._thinking_was_playing:
        await self.start_thinking_audio()
        self._thinking_was_playing = False

Stop background audio on demand

async def stop_thinking_audio(self)
Expand source code
async def stop_thinking_audio(self):
    """Stop thinking audio"""
    if self._thinking_audio_player:
        await self._thinking_audio_player.stop()
        self._thinking_audio_player = None

Stop thinking audio