Module agents.agent_session
Classes
class AgentSession (agent: Agent,
pipeline: Pipeline,
conversation_flow: Optional[ConversationFlow] = None,
wake_up: Optional[int] = None,
background_audio: Optional[BackgroundAudioHandlerConfig] = None)-
Expand source code
class AgentSession(EventEmitter[Literal["user_state_changed", "agent_state_changed", "on_speech_in", "on_speech_out"]]): """ Manages an agent session with its associated conversation flow and pipeline. """ def __init__( self, agent: Agent, pipeline: Pipeline, conversation_flow: Optional[ConversationFlow] = None, wake_up: Optional[int] = None, background_audio: Optional[BackgroundAudioHandlerConfig] = None, ) -> None: """ Initialize an agent session. Args: agent: Instance of an Agent class that handles the core logic pipeline: Pipeline instance to process the agent's operations conversation_flow: ConversationFlow instance to manage conversation state wake_up: Time in seconds after which to trigger wake-up callback if no speech detected background_audio: Configuration for background audio (optional) """ super().__init__() self.agent = agent self.pipeline = pipeline self.conversation_flow = conversation_flow self.agent.session = self self.wake_up = wake_up self.on_wake_up: Optional[Callable[[], None] | Callable[[], Any]] = None self._wake_up_task: Optional[asyncio.Task] = None self._wake_up_timer_active = False self._closed: bool = False self._reply_in_progress: bool = False self._user_state: UserState = UserState.IDLE self._agent_state: AgentState = AgentState.IDLE self.current_utterance: Optional[UtteranceHandle] = None self._thinking_audio_player: Optional[BackgroundAudioHandler] = None self._background_audio_player: Optional[BackgroundAudioHandler] = None self._thinking_was_playing = False self.background_audio_config = background_audio if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) if ( hasattr(self.pipeline, "set_conversation_flow") and self.conversation_flow is not None ): self.pipeline.set_conversation_flow(self.conversation_flow) if hasattr(self.pipeline, 'set_wake_up_callback'): self.pipeline.set_wake_up_callback(self._reset_wake_up_timer) global_event_emitter.on("ON_SPEECH_IN", self._on_speech_in) global_event_emitter.on("ON_SPEECH_OUT", self._on_speech_out) try: job_ctx = get_current_job_context() if job_ctx: job_ctx.add_shutdown_callback(self.close) except Exception: pass def _on_speech_in(self, data: dict) -> None: self.emit("on_speech_in", data) def _on_speech_out(self, data: dict) -> None: self.emit("on_speech_out", data) def _start_wake_up_timer(self) -> None: if self.wake_up is not None and self.on_wake_up is not None: self._wake_up_timer_active = True self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop()) def _reset_wake_up_timer(self) -> None: if self.wake_up is not None and self.on_wake_up is not None: if self._reply_in_progress: return if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() self._wake_up_timer_active = True self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop()) def _pause_wake_up_timer(self) -> None: if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() def _cancel_wake_up_timer(self) -> None: if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() self._wake_up_timer_active = False async def _wake_up_timer_loop(self) -> None: try: await asyncio.sleep(self.wake_up) if self._wake_up_timer_active and self.on_wake_up and not self._reply_in_progress: if asyncio.iscoroutinefunction(self.on_wake_up): await self.on_wake_up() else: self.on_wake_up() except asyncio.CancelledError: pass def _emit_user_state(self, state: UserState, data: dict | None = None) -> None: if state != self._user_state: self._user_state = state payload = {"state": state.value, **(data or {})} self.emit("user_state_changed", payload) def _emit_agent_state(self, state: AgentState, data: dict | None = None) -> None: if state != self._agent_state: self._agent_state = state payload = {"state": state.value, **(data or {})} self.emit("agent_state_changed", payload) @property def user_state(self) -> UserState: return self._user_state @property def agent_state(self) -> AgentState: return self._agent_state @property def is_background_audio_enabled(self) -> bool: """Check if background audio is enabled in the pipeline""" audio_track = self._get_audio_track() return hasattr(audio_track, 'add_background_bytes') async def start(self, **kwargs: Any) -> None: """ Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set) Args: **kwargs: Additional arguments to pass to the pipeline start method """ self._emit_agent_state(AgentState.STARTING) await self.agent.initialize_mcp() if isinstance(self.pipeline, RealTimePipeline): await realtime_metrics_collector.start_session(self.agent, self.pipeline) else: traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: config_attributes = { "system_instructions": self.agent.instructions, "function_tools": [ get_tool_info(tool).name for tool in ( [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools] if self.agent.mcp_manager else self.agent.tools ) ] if self.agent.tools else [], "mcp_tools": [ get_tool_info(tool).name for tool in self.agent.mcp_manager.tools ] if self.agent.mcp_manager else [], "pipeline": self.pipeline.__class__.__name__, **({ "stt_provider": self.pipeline.stt.__class__.__name__ if self.pipeline.stt else None, "tts_provider": self.pipeline.tts.__class__.__name__ if self.pipeline.tts else None, "llm_provider": self.pipeline.llm.__class__.__name__ if self.pipeline.llm else None, "vad_provider": self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else None, "eou_provider": self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None, "stt_model": self.pipeline.get_component_configs()['stt'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.stt else None, "llm_model": self.pipeline.get_component_configs()['llm'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.llm else None, "tts_model": self.pipeline.get_component_configs()['tts'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.tts else None, "vad_model": self.pipeline.get_component_configs()['vad'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'vad') and self.pipeline.vad else None, "eou_model": self.pipeline.get_component_configs()['eou'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None } if self.pipeline.__class__.__name__ == "CascadingPipeline" else {}), } start_time = time.perf_counter() config_attributes["start_time"] = start_time await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({"start_time": start_time}) if self.pipeline.__class__.__name__ == "CascadingPipeline": configs = self.pipeline.get_component_configs() if hasattr(self.pipeline, 'get_component_configs') else {} cascading_metrics_collector.set_provider_info( llm_provider=self.pipeline.llm.__class__.__name__ if self.pipeline.llm else "", llm_model=configs.get('llm', {}).get('model', "") if self.pipeline.llm else "", stt_provider=self.pipeline.stt.__class__.__name__ if self.pipeline.stt else "", stt_model=configs.get('stt', {}).get('model', "") if self.pipeline.stt else "", tts_provider=self.pipeline.tts.__class__.__name__ if self.pipeline.tts else "", tts_model=configs.get('tts', {}).get('model', "") if self.pipeline.tts else "", vad_provider=self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "", vad_model=configs.get('vad', {}).get('model', "") if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "", eou_provider=self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "", eou_model=configs.get('eou', {}).get('model', "") if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "" ) if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) self.on("on_speech_in", self.agent.on_speech_in) self.on("on_speech_out", self.agent.on_speech_out) await self.pipeline.start() await self.agent.on_enter() global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer() self._emit_agent_state(AgentState.IDLE) async def say(self, message: str) -> UtteranceHandle: """ Send an initial message to the agent and return a handle to track it. """ if self.current_utterance and not self.current_utterance.done(): self.current_utterance.interrupt() handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}") self.current_utterance = handle if not isinstance(self.pipeline, RealTimePipeline): traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_say_called(message) self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message) if hasattr(self.pipeline, 'send_message'): await self.pipeline.send_message(message, handle=handle) return handle async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) -> None: """Play background audio on demand""" if override_thinking and self._thinking_audio_player and self._thinking_audio_player.is_playing: await self.stop_thinking_audio() self._thinking_was_playing = True audio_track = self._get_audio_track() if not hasattr(audio_track, 'add_background_bytes'): logger.warning( "Cannot play background audio. This feature requires the mixing audio track. " "Enable it by setting `background_audio=True` in RoomOptions." ) return if audio_track: self._background_audio_player = BackgroundAudioHandler(config, audio_track) await self._background_audio_player.start() async def stop_background_audio(self) -> None: """Stop background audio on demand""" if self._background_audio_player: await self._background_audio_player.stop() self._background_audio_player = None if self._thinking_was_playing: await self.start_thinking_audio() self._thinking_was_playing = False def _get_audio_track(self): """Get audio track from pipeline""" if hasattr(self.pipeline, 'tts') and self.pipeline.tts and self.pipeline.tts.audio_track: # Cascading return self.pipeline.tts.audio_track elif hasattr(self.pipeline, 'model') and self.pipeline.model and self.pipeline.model.audio_track: # Realtime return self.pipeline.model.audio_track return None async def start_thinking_audio(self): """Start thinking audio""" if self._background_audio_player and self._background_audio_player.is_playing: return audio_track = self._get_audio_track() if not hasattr(audio_track, 'add_background_bytes'): logger.warning( "Cannot play 'thinking' audio. This feature requires the mixing audio track. " "Enable it by setting `background_audio=True` in RoomOptions." ) return if self.agent._thinking_background_config and audio_track: self._thinking_audio_player = BackgroundAudioHandler(self.agent._thinking_background_config, audio_track) await self._thinking_audio_player.start() async def stop_thinking_audio(self): """Stop thinking audio""" if self._thinking_audio_player: await self._thinking_audio_player.stop() self._thinking_audio_player = None async def reply(self, instructions: str, wait_for_playback: bool = True) -> UtteranceHandle: """ Generate a response from agent using instructions and current chat context. Subsequent calls are discarded while the first one is still running. Returns a handle to track the utterance. """ if not instructions: handle = UtteranceHandle(utterance_id="empty_reply") handle._mark_done() return handle if self._reply_in_progress: if self.current_utterance: return self.current_utterance # Create a placeholder that will never resolve to avoid blocking handle = UtteranceHandle(utterance_id="placeholder") handle._mark_done() return handle if self.current_utterance and not self.current_utterance.done(): self.current_utterance.interrupt() handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}") self.current_utterance = handle self._reply_in_progress = True self._pause_wake_up_timer() try: if not isinstance(self.pipeline, RealTimePipeline): traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_reply_called(instructions) if hasattr(self.pipeline, 'reply_with_context'): await self.pipeline.reply_with_context(instructions, wait_for_playback, handle=handle) else: if hasattr(self.pipeline, 'send_text_message'): await self.pipeline.send_text_message(instructions) else: await self.pipeline.send_message(instructions) finally: self._reply_in_progress = False return handle def interrupt(self) -> None: """ Interrupt the agent's current speech. """ if self.current_utterance and not self.current_utterance.interrupted: self.current_utterance.interrupt() if hasattr(self.pipeline, 'interrupt'): self.pipeline.interrupt() async def close(self) -> None: """ Close the agent session. """ logger.info("Closing agent session") if self._closed: logger.info("Agent session already closed") return self._closed = True self._emit_agent_state(AgentState.CLOSING) if isinstance(self.pipeline, RealTimePipeline): realtime_metrics_collector.finalize_session() traces_flow_manager = realtime_metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() else: traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() self._cancel_wake_up_timer() global_event_emitter.off("ON_SPEECH_IN", self._on_speech_in) global_event_emitter.off("ON_SPEECH_OUT", self._on_speech_out) self.off("on_speech_in", self.agent.on_speech_in) self.off("on_speech_out", self.agent.on_speech_out) logger.info("Cleaning up agent session") try: await self.agent.on_exit() except Exception as e: logger.error(f"Error in agent.on_exit(): {e}") if self._thinking_audio_player: await self._thinking_audio_player.stop() if self._background_audio_player: await self._background_audio_player.stop() try: await self.pipeline.cleanup() except Exception as e: logger.error(f"Error cleaning up pipeline: {e}") if self.conversation_flow: try: await self.conversation_flow.cleanup() except Exception as e: logger.error(f"Error cleaning up conversation flow: {e}") self.conversation_flow = None try: await self.agent.cleanup() except Exception as e: logger.error(f"Error cleaning up agent: {e}") self.agent = None self.pipeline = None self.on_wake_up = None self._wake_up_task = None logger.info("Agent session cleaned up") async def leave(self) -> None: """ Leave the agent session. """ self._emit_agent_state(AgentState.CLOSING) await self.pipeline.leave()Manages an agent session with its associated conversation flow and pipeline.
Initialize an agent session.
Args
agent- Instance of an Agent class that handles the core logic
pipeline- Pipeline instance to process the agent's operations
conversation_flow- ConversationFlow instance to manage conversation state
wake_up- Time in seconds after which to trigger wake-up callback if no speech detected
background_audio- Configuration for background audio (optional)
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop agent_state : AgentState-
Expand source code
@property def agent_state(self) -> AgentState: return self._agent_state prop is_background_audio_enabled : bool-
Expand source code
@property def is_background_audio_enabled(self) -> bool: """Check if background audio is enabled in the pipeline""" audio_track = self._get_audio_track() return hasattr(audio_track, 'add_background_bytes')Check if background audio is enabled in the pipeline
prop user_state : UserState-
Expand source code
@property def user_state(self) -> UserState: return self._user_state
Methods
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """ Close the agent session. """ logger.info("Closing agent session") if self._closed: logger.info("Agent session already closed") return self._closed = True self._emit_agent_state(AgentState.CLOSING) if isinstance(self.pipeline, RealTimePipeline): realtime_metrics_collector.finalize_session() traces_flow_manager = realtime_metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() else: traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() self._cancel_wake_up_timer() global_event_emitter.off("ON_SPEECH_IN", self._on_speech_in) global_event_emitter.off("ON_SPEECH_OUT", self._on_speech_out) self.off("on_speech_in", self.agent.on_speech_in) self.off("on_speech_out", self.agent.on_speech_out) logger.info("Cleaning up agent session") try: await self.agent.on_exit() except Exception as e: logger.error(f"Error in agent.on_exit(): {e}") if self._thinking_audio_player: await self._thinking_audio_player.stop() if self._background_audio_player: await self._background_audio_player.stop() try: await self.pipeline.cleanup() except Exception as e: logger.error(f"Error cleaning up pipeline: {e}") if self.conversation_flow: try: await self.conversation_flow.cleanup() except Exception as e: logger.error(f"Error cleaning up conversation flow: {e}") self.conversation_flow = None try: await self.agent.cleanup() except Exception as e: logger.error(f"Error cleaning up agent: {e}") self.agent = None self.pipeline = None self.on_wake_up = None self._wake_up_task = None logger.info("Agent session cleaned up")Close the agent session.
def interrupt(self) ‑> None-
Expand source code
def interrupt(self) -> None: """ Interrupt the agent's current speech. """ if self.current_utterance and not self.current_utterance.interrupted: self.current_utterance.interrupt() if hasattr(self.pipeline, 'interrupt'): self.pipeline.interrupt()Interrupt the agent's current speech.
async def leave(self) ‑> None-
Expand source code
async def leave(self) -> None: """ Leave the agent session. """ self._emit_agent_state(AgentState.CLOSING) await self.pipeline.leave()Leave the agent session.
async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) ‑> None-
Expand source code
async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) -> None: """Play background audio on demand""" if override_thinking and self._thinking_audio_player and self._thinking_audio_player.is_playing: await self.stop_thinking_audio() self._thinking_was_playing = True audio_track = self._get_audio_track() if not hasattr(audio_track, 'add_background_bytes'): logger.warning( "Cannot play background audio. This feature requires the mixing audio track. " "Enable it by setting `background_audio=True` in RoomOptions." ) return if audio_track: self._background_audio_player = BackgroundAudioHandler(config, audio_track) await self._background_audio_player.start()Play background audio on demand
async def reply(self, instructions: str, wait_for_playback: bool = True) ‑> UtteranceHandle-
Expand source code
async def reply(self, instructions: str, wait_for_playback: bool = True) -> UtteranceHandle: """ Generate a response from agent using instructions and current chat context. Subsequent calls are discarded while the first one is still running. Returns a handle to track the utterance. """ if not instructions: handle = UtteranceHandle(utterance_id="empty_reply") handle._mark_done() return handle if self._reply_in_progress: if self.current_utterance: return self.current_utterance # Create a placeholder that will never resolve to avoid blocking handle = UtteranceHandle(utterance_id="placeholder") handle._mark_done() return handle if self.current_utterance and not self.current_utterance.done(): self.current_utterance.interrupt() handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}") self.current_utterance = handle self._reply_in_progress = True self._pause_wake_up_timer() try: if not isinstance(self.pipeline, RealTimePipeline): traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_reply_called(instructions) if hasattr(self.pipeline, 'reply_with_context'): await self.pipeline.reply_with_context(instructions, wait_for_playback, handle=handle) else: if hasattr(self.pipeline, 'send_text_message'): await self.pipeline.send_text_message(instructions) else: await self.pipeline.send_message(instructions) finally: self._reply_in_progress = False return handleGenerate a response from agent using instructions and current chat context. Subsequent calls are discarded while the first one is still running. Returns a handle to track the utterance.
async def say(self, message: str) ‑> UtteranceHandle-
Expand source code
async def say(self, message: str) -> UtteranceHandle: """ Send an initial message to the agent and return a handle to track it. """ if self.current_utterance and not self.current_utterance.done(): self.current_utterance.interrupt() handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}") self.current_utterance = handle if not isinstance(self.pipeline, RealTimePipeline): traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_say_called(message) self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message) if hasattr(self.pipeline, 'send_message'): await self.pipeline.send_message(message, handle=handle) return handleSend an initial message to the agent and return a handle to track it.
async def start(self, **kwargs: Any) ‑> None-
Expand source code
async def start(self, **kwargs: Any) -> None: """ Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set) Args: **kwargs: Additional arguments to pass to the pipeline start method """ self._emit_agent_state(AgentState.STARTING) await self.agent.initialize_mcp() if isinstance(self.pipeline, RealTimePipeline): await realtime_metrics_collector.start_session(self.agent, self.pipeline) else: traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: config_attributes = { "system_instructions": self.agent.instructions, "function_tools": [ get_tool_info(tool).name for tool in ( [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools] if self.agent.mcp_manager else self.agent.tools ) ] if self.agent.tools else [], "mcp_tools": [ get_tool_info(tool).name for tool in self.agent.mcp_manager.tools ] if self.agent.mcp_manager else [], "pipeline": self.pipeline.__class__.__name__, **({ "stt_provider": self.pipeline.stt.__class__.__name__ if self.pipeline.stt else None, "tts_provider": self.pipeline.tts.__class__.__name__ if self.pipeline.tts else None, "llm_provider": self.pipeline.llm.__class__.__name__ if self.pipeline.llm else None, "vad_provider": self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else None, "eou_provider": self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None, "stt_model": self.pipeline.get_component_configs()['stt'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.stt else None, "llm_model": self.pipeline.get_component_configs()['llm'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.llm else None, "tts_model": self.pipeline.get_component_configs()['tts'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.tts else None, "vad_model": self.pipeline.get_component_configs()['vad'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'vad') and self.pipeline.vad else None, "eou_model": self.pipeline.get_component_configs()['eou'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None } if self.pipeline.__class__.__name__ == "CascadingPipeline" else {}), } start_time = time.perf_counter() config_attributes["start_time"] = start_time await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({"start_time": start_time}) if self.pipeline.__class__.__name__ == "CascadingPipeline": configs = self.pipeline.get_component_configs() if hasattr(self.pipeline, 'get_component_configs') else {} cascading_metrics_collector.set_provider_info( llm_provider=self.pipeline.llm.__class__.__name__ if self.pipeline.llm else "", llm_model=configs.get('llm', {}).get('model', "") if self.pipeline.llm else "", stt_provider=self.pipeline.stt.__class__.__name__ if self.pipeline.stt else "", stt_model=configs.get('stt', {}).get('model', "") if self.pipeline.stt else "", tts_provider=self.pipeline.tts.__class__.__name__ if self.pipeline.tts else "", tts_model=configs.get('tts', {}).get('model', "") if self.pipeline.tts else "", vad_provider=self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "", vad_model=configs.get('vad', {}).get('model', "") if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "", eou_provider=self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "", eou_model=configs.get('eou', {}).get('model', "") if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "" ) if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) self.on("on_speech_in", self.agent.on_speech_in) self.on("on_speech_out", self.agent.on_speech_out) await self.pipeline.start() await self.agent.on_enter() global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer() self._emit_agent_state(AgentState.IDLE)Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set)
Args
**kwargs- Additional arguments to pass to the pipeline start method
async def start_thinking_audio(self)-
Expand source code
async def start_thinking_audio(self): """Start thinking audio""" if self._background_audio_player and self._background_audio_player.is_playing: return audio_track = self._get_audio_track() if not hasattr(audio_track, 'add_background_bytes'): logger.warning( "Cannot play 'thinking' audio. This feature requires the mixing audio track. " "Enable it by setting `background_audio=True` in RoomOptions." ) return if self.agent._thinking_background_config and audio_track: self._thinking_audio_player = BackgroundAudioHandler(self.agent._thinking_background_config, audio_track) await self._thinking_audio_player.start()Start thinking audio
async def stop_background_audio(self) ‑> None-
Expand source code
async def stop_background_audio(self) -> None: """Stop background audio on demand""" if self._background_audio_player: await self._background_audio_player.stop() self._background_audio_player = None if self._thinking_was_playing: await self.start_thinking_audio() self._thinking_was_playing = FalseStop background audio on demand
async def stop_thinking_audio(self)-
Expand source code
async def stop_thinking_audio(self): """Stop thinking audio""" if self._thinking_audio_player: await self._thinking_audio_player.stop() self._thinking_audio_player = NoneStop thinking audio