Module agents.agent_session
Classes
class AgentSession (agent: Agent,
pipeline: Pipeline,
wake_up: Optional[int] = None,
background_audio: Optional[BackgroundAudioHandlerConfig] = None,
dtmf_handler: Optional[DTMFHandler] = None,
voice_mail_detector: Optional[VoiceMailDetector] = None)-
Expand source code
class AgentSession(EventEmitter[Literal["user_state_changed", "agent_state_changed"]]): """ Manages an agent session with its associated conversation flow and pipeline. """ def __init__( self, agent: Agent, pipeline: Pipeline, wake_up: Optional[int] = None, background_audio: Optional[BackgroundAudioHandlerConfig] = None, dtmf_handler: Optional[DTMFHandler] = None, voice_mail_detector: Optional[VoiceMailDetector] = None, ) -> None: """ Initialize an agent session. Args: agent: Instance of an Agent class that handles the core logic pipeline: Pipeline instance to process the agent's operations wake_up: Time in seconds after which to trigger wake-up callback if no speech detected background_audio: Configuration for background audio (optional) dtmf_handler: DTMF handler for phone number input (optional) voice_mail_detector: Voicemail detector (optional) """ super().__init__() self.agent = agent self.pipeline = pipeline self.agent.session = self self.wake_up = wake_up self.on_wake_up: Optional[Callable[[], None] | Callable[[], Any]] = None self._wake_up_task: Optional[asyncio.Task] = None self._wake_up_timer_active = False self._closed: bool = False self._reply_in_progress: bool = False self._user_state: UserState = UserState.IDLE self._agent_state: AgentState = AgentState.IDLE self.current_utterance: Optional[UtteranceHandle] = None self._thinking_audio_player: Optional[BackgroundAudioHandler] = None self._background_audio_player: Optional[BackgroundAudioHandler] = None self._thinking_was_playing = False self.background_audio_config = background_audio self._is_executing_tool = False self._job_context = None self.dtmf_handler = dtmf_handler self.voice_mail_detector = voice_mail_detector self._is_voice_mail_detected = False self._playground_manager = None self._playground = False self._send_analytics_to_pubsub = False # Set agent on pipeline (pipeline handles all internal wiring) if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) # Setup voicemail detection if self.voice_mail_detector: if hasattr(self.pipeline, "set_voice_mail_detector"): self.pipeline.set_voice_mail_detector(self.voice_mail_detector) if hasattr(self.pipeline, "on"): self.pipeline.on("voicemail_result", self._handle_voicemail_result) # Setup wake-up callback if hasattr(self.pipeline, 'set_wake_up_callback'): self.pipeline.set_wake_up_callback(self._reset_wake_up_timer) # Get job context try: job_ctx = get_current_job_context() if job_ctx: self._job_context = job_ctx job_ctx.add_shutdown_callback(self.close) self._playground = job_ctx.room_options.playground self._send_analytics_to_pubsub = job_ctx.room_options.send_analytics_to_pubsub except Exception as e: logger.error(f"AgentSession: Error in session initialization: {e}") self._job_context = None @property def is_voicemail_detected(self) -> bool: """Returns True if voicemail was detected in this session.""" return self._is_voicemail_detected def _handle_voicemail_result(self, data: dict) -> None: """ Handler for the voicemail_result event from ConversationFlow. Updates session state and executes callback if needed. """ is_vm = data.get("is_voicemail", False) self._is_voicemail_detected = is_vm if is_vm: logger.info("AgentSession: Voicemail confirmed. Executing callback.") if self.voice_mail_detector.callback: asyncio.create_task(self._safe_execute_vmd_callback()) async def _safe_execute_vmd_callback(self) -> None: try: if self.voice_mail_detector.callback: await self.voice_mail_detector.callback() except Exception as e: logger.error(f"Error executing voicemail callback: {e}") def _start_wake_up_timer(self) -> None: if self.wake_up is not None and self.on_wake_up is not None: self._wake_up_timer_active = True self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop()) def _reset_wake_up_timer(self) -> None: if self.wake_up is not None and self.on_wake_up is not None: if self._reply_in_progress: return if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() self._wake_up_timer_active = True self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop()) def _pause_wake_up_timer(self) -> None: if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() def _cancel_wake_up_timer(self) -> None: if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() self._wake_up_timer_active = False async def _wake_up_timer_loop(self) -> None: try: await asyncio.sleep(self.wake_up) if self._wake_up_timer_active and self.on_wake_up and not self._reply_in_progress: if asyncio.iscoroutinefunction(self.on_wake_up): asyncio.create_task(self.on_wake_up()) else: self.on_wake_up() except asyncio.CancelledError: pass def _emit_user_state(self, state: UserState, data: dict | None = None) -> None: if state != self._user_state: self._user_state = state payload = {"state": state.value, **(data or {})} self.emit("user_state_changed", payload) def _emit_agent_state(self, state: AgentState, data: dict | None = None) -> None: if state != self._agent_state: self._agent_state = state payload = {"state": state.value, **(data or {})} self.emit("agent_state_changed", payload) global_event_emitter.emit("AGENT_STATE_CHANGED", {"state": state.value}) @property def user_state(self) -> UserState: return self._user_state @property def agent_state(self) -> AgentState: return self._agent_state @property def is_background_audio_enabled(self) -> bool: """Check if background audio is enabled in the pipeline""" audio_track = self._get_audio_track() return hasattr(audio_track, 'add_background_bytes') async def start( self, wait_for_participant: bool = False, run_until_shutdown: bool = False, **kwargs: Any ) -> None: """ Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set) 5. Optionally handle full lifecycle management (connect, wait, shutdown) Args: wait_for_participant: If True, wait for a participant to join before starting run_until_shutdown: If True, manage the full lifecycle including connection, waiting for shutdown signals, and cleanup. This is a convenience that internally calls ctx.run_until_shutdown() with this session. **kwargs: Additional arguments to pass to the pipeline start method Examples: Simple start (manual lifecycle management): ```python await session.start() ``` Full lifecycle management (recommended): ```python await session.start(wait_for_participant=True, run_until_shutdown=True) ``` """ if run_until_shutdown: try: ctx = get_current_job_context() if ctx: logger.info("Starting session with full lifecycle management") await ctx.run_until_shutdown( session=self, wait_for_participant=wait_for_participant ) return else: logger.warning( "run_until_shutdown=True requires a JobContext, " "falling back to normal start()" ) except Exception as e: logger.warning( f"Failed to get JobContext for run_until_shutdown: {e}, " "falling back to normal start()" ) self._emit_agent_state(AgentState.STARTING) if self.agent._mcp_servers: await self.agent.initialize_mcp() if self.dtmf_handler: await self.dtmf_handler.start() if self._playground or self._send_analytics_to_pubsub: job_ctx = get_current_job_context() self.playground_manager = PlaygroundManager(job_ctx) metrics_collector.set_playground_manager(self.playground_manager) # Configure metrics with session info metrics_collector.set_system_instructions(self.agent.instructions) # Set provider info based on pipeline components if not self.pipeline.config.is_realtime: if self.pipeline.stt: p_class, p_model = self._get_provider_info(self.pipeline.stt, 'stt') metrics_collector.set_provider_info("stt", p_class, p_model) if self.pipeline.llm: p_class, p_model = self._get_provider_info(self.pipeline.llm, 'llm') metrics_collector.set_provider_info("llm", p_class, p_model) if self.pipeline.tts: p_class, p_model = self._get_provider_info(self.pipeline.tts, 'tts') metrics_collector.set_provider_info("tts", p_class, p_model) if hasattr(self.pipeline, 'vad') and self.pipeline.vad: p_class, p_model = self._get_provider_info(self.pipeline.vad, 'vad') metrics_collector.set_provider_info("vad", p_class, p_model) if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector: p_class, p_model = self._get_provider_info(self.pipeline.turn_detector, 'eou') metrics_collector.set_provider_info("eou", p_class, p_model) else: if self.pipeline._realtime_model: metrics_collector.set_provider_info("realtime", self.pipeline._realtime_model.__class__.__name__, getattr(self.pipeline._realtime_model, 'model', '')) if self.pipeline.stt: p_class, p_model = self._get_provider_info(self.pipeline.stt, 'stt') metrics_collector.set_provider_info("stt", p_class, p_model) if self.pipeline.tts: p_class, p_model = self._get_provider_info(self.pipeline.tts, 'tts') metrics_collector.set_provider_info("tts", p_class, p_model) # Traces flow manager setup traces_flow_manager = metrics_collector.traces_flow_manager if traces_flow_manager: config_attributes = { "system_instructions": self.agent.instructions, "function_tools": [ get_tool_info(tool).name for tool in ( [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools] if self.agent.mcp_manager else self.agent.tools ) ] if self.agent.tools else [], "mcp_tools": [ tool._tool_info.name for tool in self.agent.mcp_manager.tools ] if self.agent.mcp_manager else [], "pipeline": self.pipeline.__class__.__name__, "pipeline_mode": self.pipeline.config.pipeline_mode.value, "transport_mode": metrics_collector.transport_mode } start_time = time.perf_counter() config_attributes["start_time"] = start_time await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({"start_time": start_time}) if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) await self.pipeline.start() if hasattr(self.agent, 'a2a'): self.agent.a2a._attach_deferred_listeners() if self._should_delay_for_sip_user(): logger.info("SIP user detected, waiting for audio stream to be enabled before calling on_enter") audio_stream_enabled = asyncio.Event() def on_audio_stream_enabled(data): stream = data.get("stream") participant = data.get("participant") if stream and stream.kind == "audio" and participant and participant.meta_data.get("sipUser"): logger.info(f"SIP user audio stream enabled for participant {participant.id}") audio_stream_enabled.set() global_event_emitter.on("AUDIO_STREAM_ENABLED", on_audio_stream_enabled) async def wait_and_start(): try: await audio_stream_enabled.wait() logger.info("SIP user audio stream enabled, proceeding with on_enter") await self.agent.on_enter() global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer() self._emit_agent_state(AgentState.IDLE) except Exception as e: logger.error(f"Error in wait_and_start: {e}") finally: global_event_emitter.off("AUDIO_STREAM_ENABLED", on_audio_stream_enabled) asyncio.create_task(wait_and_start()) return await self.agent.on_enter() global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer() self._emit_agent_state(AgentState.IDLE) def _get_provider_info(self, component, comp_name): configs = self.pipeline.get_component_configs() if hasattr(self.pipeline, 'get_component_configs') else {} if not component: return "", "" default_model = configs.get(comp_name, {}).get('model', '') if hasattr(component, 'active_provider') and component.active_provider is not None: provider_class = component.active_provider.__class__.__name__ model = getattr(component.active_provider, 'model', getattr(component.active_provider, 'model_id', getattr(component.active_provider, 'speech_model', getattr(component.active_provider, 'voice_id', getattr(component.active_provider, 'voice', getattr(component.active_provider, 'speaker', default_model)))))) else: provider_class = component.__class__.__name__ model = getattr(component, 'model', getattr(component, 'model_id', getattr(component, 'speech_model', getattr(component, 'voice_id', getattr(component, 'voice', getattr(component, 'speaker', default_model)))))) return provider_class, str(model) async def say(self, message: str, interruptible: bool = True) -> UtteranceHandle: """ Send an initial message to the agent and return a handle to track it. When called from inside a function tool (_is_executing_tool), the current turn's utterance is not interrupted or replaced, so the LLM stream can continue after the tool returns. """ handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}", interruptible=interruptible) if not self._is_executing_tool: if self.current_utterance and not self.current_utterance.done(): self.current_utterance.interrupt() self.current_utterance = handle traces_flow_manager = metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_say_called(message) self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message) if hasattr(self.pipeline, 'send_message'): await self.pipeline.send_message(message, handle=handle) return handle async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) -> None: """Play background audio on demand""" if override_thinking and self._thinking_audio_player and self._thinking_audio_player.is_playing: await self.stop_thinking_audio() self._thinking_was_playing = True audio_track = self._get_audio_track() if not hasattr(audio_track, 'add_background_bytes'): logger.warning( "Cannot play background audio. This feature requires the mixing audio track. " "Enable it by setting `background_audio=True` in RoomOptions." ) return if audio_track: self._background_audio_player = BackgroundAudioHandler(config, audio_track) await self._background_audio_player.start() # Track background audio start for metrics metrics_collector.on_background_audio_start( file_path=config.file_path, looping=config.looping ) async def stop_background_audio(self) -> None: """Stop background audio on demand""" if self._background_audio_player: await self._background_audio_player.stop() self._background_audio_player = None # Track background audio stop for metrics metrics_collector.on_background_audio_stop() if self._thinking_was_playing: await self.start_thinking_audio() self._thinking_was_playing = False def _get_audio_track(self): """Get audio track from pipeline""" if self.pipeline is None: return None if self.pipeline.config.is_realtime: model = getattr(self.pipeline, '_realtime_model', None) if model and hasattr(model, 'audio_track'): return model.audio_track if self.pipeline.tts and hasattr(self.pipeline.tts, 'audio_track'): return self.pipeline.tts.audio_track return None async def start_thinking_audio(self): """Start thinking audio""" if self._background_audio_player and self._background_audio_player.is_playing: return audio_track = self._get_audio_track() if not hasattr(audio_track, 'add_background_bytes'): logger.warning( "Cannot play 'thinking' audio. This feature requires the mixing audio track. " "Enable it by setting `background_audio=True` in RoomOptions." ) return if self.agent._thinking_background_config and audio_track: self._thinking_audio_player = BackgroundAudioHandler(self.agent._thinking_background_config, audio_track) await self._thinking_audio_player.start() # Track thinking audio start for metrics metrics_collector.on_thinking_audio_start( file_path=self.agent._thinking_background_config.file_path, looping=self.agent._thinking_background_config.looping ) async def stop_thinking_audio(self): """Stop thinking audio""" if self._thinking_audio_player: await self._thinking_audio_player.stop() self._thinking_audio_player = None # Track thinking audio stop for metrics metrics_collector.on_thinking_audio_stop() async def reply(self, instructions: str, wait_for_playback: bool = True, frames: list[av.VideoFrame] | None = None, interruptible: bool = True) -> UtteranceHandle: """ Generate a response from agent using instructions and current chat context. This method is safe to call from function tools - it will automatically detect re-entrant calls and schedule them as background tasks. Args: instructions: Instructions to add to chat context wait_for_playback: If True, wait for playback to complete frames: Optional list of VideoFrame objects to include in the reply Returns: UtteranceHandle: A handle to track the utterance lifecycle """ if self._reply_in_progress: if self.current_utterance: return self.current_utterance handle = UtteranceHandle(utterance_id="placeholder", interruptible=interruptible) handle._mark_done() return handle handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}", interruptible=interruptible) self.current_utterance = handle if self._is_executing_tool: asyncio.create_task( self._internal_blocking_reply(instructions, wait_for_playback, handle, frames) ) return handle else: await self._internal_blocking_reply(instructions, wait_for_playback, handle, frames) return handle async def _internal_blocking_reply(self, instructions: str, wait_for_playback: bool, handle: UtteranceHandle, frames: list[av.VideoFrame] | None = None) -> None: """ The original, blocking logic of the reply method. """ if not instructions: handle._mark_done() return self._reply_in_progress = True self._pause_wake_up_timer() try: # Call pipeline's reply_with_context if hasattr(self.pipeline, 'reply_with_context'): await self.pipeline.reply_with_context(instructions, wait_for_playback, handle=handle, frames=frames) if wait_for_playback: await handle finally: self._reply_in_progress = False if not handle.done(): handle._mark_done() def interrupt(self, *, force: bool = False) -> None: """ Interrupt the agent's current speech. """ if self.current_utterance and not self.current_utterance.interrupted: try: self.current_utterance.interrupt(force=force) except RuntimeError as e: logger.warning(f"Could not interrupt utterance: {e}") return if hasattr(self.pipeline, 'interrupt'): self.pipeline.interrupt() async def close(self) -> None: """ Close the agent session. """ logger.info("Closing agent session") if self._closed: logger.info("Agent session already closed") return self._closed = True self._emit_agent_state(AgentState.CLOSING) metrics_collector.finalize_session() traces_flow_manager = metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() self._cancel_wake_up_timer() logger.info("Cleaning up agent session") try: await self.agent.on_exit() except Exception as e: logger.error(f"Error in agent.on_exit(): {e}") if self._thinking_audio_player: await self._thinking_audio_player.stop() if self._background_audio_player: await self._background_audio_player.stop() try: await self.pipeline.cleanup() except Exception as e: logger.error(f"Error cleaning up pipeline: {e}") try: await self.agent.cleanup() except Exception as e: logger.error(f"Error cleaning up agent: {e}") self.agent = None self.pipeline = None self.on_wake_up = None self._wake_up_task = None logger.info("Agent session cleaned up") async def leave(self) -> None: """ Leave the agent session. """ self._emit_agent_state(AgentState.CLOSING) await self.pipeline.leave() async def hangup(self, reason: str = "manual_hangup") -> None: """ Hang up the session, leaving the room immediately if possible. """ job_ctx = self._job_context if not job_ctx: try: job_ctx = get_current_job_context() except Exception: job_ctx = None room = getattr(job_ctx, "room", None) if job_ctx else None if room and hasattr(room, "force_end_session"): try: await room.force_end_session(reason) return except Exception as exc: logger.error(f"Error forcing room to end session: {exc}") await self.close() async def call_transfer(self,token: str, transfer_to: str) -> None: """ Transfer the call to a provided Phone number or SIP endpoint. Args: token: VideoSDK auth token. transfer_to: Phone number or SIP endpoint to transfer the call to. """ job_ctx = self._job_context if not job_ctx: try: job_ctx = get_current_job_context() except Exception: job_ctx = None room = getattr(job_ctx, "room", None) if job_ctx else None if room and hasattr(room, "call_transfer"): try: await room.call_transfer(token, transfer_to) return except Exception as exc: logger.error(f"Error calling call_transfer: {exc}") def _should_delay_for_sip_user(self) -> bool: """Check if there are SIP users in the room that need audio stream initialization""" job_ctx = self._job_context if not job_ctx: try: job_ctx = get_current_job_context() except Exception: job_ctx = None room = getattr(job_ctx, "room", None) if job_ctx else None if room and hasattr(room, "participants_data"): participants = room.participants_data for participant_info in participants.values(): # SIP-specific on_enter logic is currently limited to outbound calls. if participant_info.get("sipUser") and participant_info.get("sipCallType") == "outbound": return True return FalseManages an agent session with its associated conversation flow and pipeline.
Initialize an agent session.
Args
agent- Instance of an Agent class that handles the core logic
pipeline- Pipeline instance to process the agent's operations
wake_up- Time in seconds after which to trigger wake-up callback if no speech detected
background_audio- Configuration for background audio (optional)
dtmf_handler- DTMF handler for phone number input (optional)
voice_mail_detector- Voicemail detector (optional)
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop agent_state : AgentState-
Expand source code
@property def agent_state(self) -> AgentState: return self._agent_state prop is_background_audio_enabled : bool-
Expand source code
@property def is_background_audio_enabled(self) -> bool: """Check if background audio is enabled in the pipeline""" audio_track = self._get_audio_track() return hasattr(audio_track, 'add_background_bytes')Check if background audio is enabled in the pipeline
prop is_voicemail_detected : bool-
Expand source code
@property def is_voicemail_detected(self) -> bool: """Returns True if voicemail was detected in this session.""" return self._is_voicemail_detectedReturns True if voicemail was detected in this session.
prop user_state : UserState-
Expand source code
@property def user_state(self) -> UserState: return self._user_state
Methods
async def call_transfer(self, token: str, transfer_to: str) ‑> None-
Expand source code
async def call_transfer(self,token: str, transfer_to: str) -> None: """ Transfer the call to a provided Phone number or SIP endpoint. Args: token: VideoSDK auth token. transfer_to: Phone number or SIP endpoint to transfer the call to. """ job_ctx = self._job_context if not job_ctx: try: job_ctx = get_current_job_context() except Exception: job_ctx = None room = getattr(job_ctx, "room", None) if job_ctx else None if room and hasattr(room, "call_transfer"): try: await room.call_transfer(token, transfer_to) return except Exception as exc: logger.error(f"Error calling call_transfer: {exc}")Transfer the call to a provided Phone number or SIP endpoint.
Args
token- VideoSDK auth token.
transfer_to- Phone number or SIP endpoint to transfer the call to.
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """ Close the agent session. """ logger.info("Closing agent session") if self._closed: logger.info("Agent session already closed") return self._closed = True self._emit_agent_state(AgentState.CLOSING) metrics_collector.finalize_session() traces_flow_manager = metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() self._cancel_wake_up_timer() logger.info("Cleaning up agent session") try: await self.agent.on_exit() except Exception as e: logger.error(f"Error in agent.on_exit(): {e}") if self._thinking_audio_player: await self._thinking_audio_player.stop() if self._background_audio_player: await self._background_audio_player.stop() try: await self.pipeline.cleanup() except Exception as e: logger.error(f"Error cleaning up pipeline: {e}") try: await self.agent.cleanup() except Exception as e: logger.error(f"Error cleaning up agent: {e}") self.agent = None self.pipeline = None self.on_wake_up = None self._wake_up_task = None logger.info("Agent session cleaned up")Close the agent session.
async def hangup(self, reason: str = 'manual_hangup') ‑> None-
Expand source code
async def hangup(self, reason: str = "manual_hangup") -> None: """ Hang up the session, leaving the room immediately if possible. """ job_ctx = self._job_context if not job_ctx: try: job_ctx = get_current_job_context() except Exception: job_ctx = None room = getattr(job_ctx, "room", None) if job_ctx else None if room and hasattr(room, "force_end_session"): try: await room.force_end_session(reason) return except Exception as exc: logger.error(f"Error forcing room to end session: {exc}") await self.close()Hang up the session, leaving the room immediately if possible.
def interrupt(self, *, force: bool = False) ‑> None-
Expand source code
def interrupt(self, *, force: bool = False) -> None: """ Interrupt the agent's current speech. """ if self.current_utterance and not self.current_utterance.interrupted: try: self.current_utterance.interrupt(force=force) except RuntimeError as e: logger.warning(f"Could not interrupt utterance: {e}") return if hasattr(self.pipeline, 'interrupt'): self.pipeline.interrupt()Interrupt the agent's current speech.
async def leave(self) ‑> None-
Expand source code
async def leave(self) -> None: """ Leave the agent session. """ self._emit_agent_state(AgentState.CLOSING) await self.pipeline.leave()Leave the agent session.
async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) ‑> None-
Expand source code
async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) -> None: """Play background audio on demand""" if override_thinking and self._thinking_audio_player and self._thinking_audio_player.is_playing: await self.stop_thinking_audio() self._thinking_was_playing = True audio_track = self._get_audio_track() if not hasattr(audio_track, 'add_background_bytes'): logger.warning( "Cannot play background audio. This feature requires the mixing audio track. " "Enable it by setting `background_audio=True` in RoomOptions." ) return if audio_track: self._background_audio_player = BackgroundAudioHandler(config, audio_track) await self._background_audio_player.start() # Track background audio start for metrics metrics_collector.on_background_audio_start( file_path=config.file_path, looping=config.looping )Play background audio on demand
async def reply(self,
instructions: str,
wait_for_playback: bool = True,
frames: list[av.VideoFrame] | None = None,
interruptible: bool = True) ‑> UtteranceHandle-
Expand source code
async def reply(self, instructions: str, wait_for_playback: bool = True, frames: list[av.VideoFrame] | None = None, interruptible: bool = True) -> UtteranceHandle: """ Generate a response from agent using instructions and current chat context. This method is safe to call from function tools - it will automatically detect re-entrant calls and schedule them as background tasks. Args: instructions: Instructions to add to chat context wait_for_playback: If True, wait for playback to complete frames: Optional list of VideoFrame objects to include in the reply Returns: UtteranceHandle: A handle to track the utterance lifecycle """ if self._reply_in_progress: if self.current_utterance: return self.current_utterance handle = UtteranceHandle(utterance_id="placeholder", interruptible=interruptible) handle._mark_done() return handle handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}", interruptible=interruptible) self.current_utterance = handle if self._is_executing_tool: asyncio.create_task( self._internal_blocking_reply(instructions, wait_for_playback, handle, frames) ) return handle else: await self._internal_blocking_reply(instructions, wait_for_playback, handle, frames) return handleGenerate a response from agent using instructions and current chat context.
This method is safe to call from function tools - it will automatically detect re-entrant calls and schedule them as background tasks.
Args
instructions- Instructions to add to chat context
wait_for_playback- If True, wait for playback to complete
frames- Optional list of VideoFrame objects to include in the reply
Returns
UtteranceHandle- A handle to track the utterance lifecycle
async def say(self, message: str, interruptible: bool = True) ‑> UtteranceHandle-
Expand source code
async def say(self, message: str, interruptible: bool = True) -> UtteranceHandle: """ Send an initial message to the agent and return a handle to track it. When called from inside a function tool (_is_executing_tool), the current turn's utterance is not interrupted or replaced, so the LLM stream can continue after the tool returns. """ handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}", interruptible=interruptible) if not self._is_executing_tool: if self.current_utterance and not self.current_utterance.done(): self.current_utterance.interrupt() self.current_utterance = handle traces_flow_manager = metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_say_called(message) self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message) if hasattr(self.pipeline, 'send_message'): await self.pipeline.send_message(message, handle=handle) return handleSend an initial message to the agent and return a handle to track it. When called from inside a function tool (_is_executing_tool), the current turn's utterance is not interrupted or replaced, so the LLM stream can continue after the tool returns.
async def start(self,
wait_for_participant: bool = False,
run_until_shutdown: bool = False,
**kwargs: Any) ‑> None-
Expand source code
async def start( self, wait_for_participant: bool = False, run_until_shutdown: bool = False, **kwargs: Any ) -> None: """ Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set) 5. Optionally handle full lifecycle management (connect, wait, shutdown) Args: wait_for_participant: If True, wait for a participant to join before starting run_until_shutdown: If True, manage the full lifecycle including connection, waiting for shutdown signals, and cleanup. This is a convenience that internally calls ctx.run_until_shutdown() with this session. **kwargs: Additional arguments to pass to the pipeline start method Examples: Simple start (manual lifecycle management): ```python await session.start() ``` Full lifecycle management (recommended): ```python await session.start(wait_for_participant=True, run_until_shutdown=True) ``` """ if run_until_shutdown: try: ctx = get_current_job_context() if ctx: logger.info("Starting session with full lifecycle management") await ctx.run_until_shutdown( session=self, wait_for_participant=wait_for_participant ) return else: logger.warning( "run_until_shutdown=True requires a JobContext, " "falling back to normal start()" ) except Exception as e: logger.warning( f"Failed to get JobContext for run_until_shutdown: {e}, " "falling back to normal start()" ) self._emit_agent_state(AgentState.STARTING) if self.agent._mcp_servers: await self.agent.initialize_mcp() if self.dtmf_handler: await self.dtmf_handler.start() if self._playground or self._send_analytics_to_pubsub: job_ctx = get_current_job_context() self.playground_manager = PlaygroundManager(job_ctx) metrics_collector.set_playground_manager(self.playground_manager) # Configure metrics with session info metrics_collector.set_system_instructions(self.agent.instructions) # Set provider info based on pipeline components if not self.pipeline.config.is_realtime: if self.pipeline.stt: p_class, p_model = self._get_provider_info(self.pipeline.stt, 'stt') metrics_collector.set_provider_info("stt", p_class, p_model) if self.pipeline.llm: p_class, p_model = self._get_provider_info(self.pipeline.llm, 'llm') metrics_collector.set_provider_info("llm", p_class, p_model) if self.pipeline.tts: p_class, p_model = self._get_provider_info(self.pipeline.tts, 'tts') metrics_collector.set_provider_info("tts", p_class, p_model) if hasattr(self.pipeline, 'vad') and self.pipeline.vad: p_class, p_model = self._get_provider_info(self.pipeline.vad, 'vad') metrics_collector.set_provider_info("vad", p_class, p_model) if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector: p_class, p_model = self._get_provider_info(self.pipeline.turn_detector, 'eou') metrics_collector.set_provider_info("eou", p_class, p_model) else: if self.pipeline._realtime_model: metrics_collector.set_provider_info("realtime", self.pipeline._realtime_model.__class__.__name__, getattr(self.pipeline._realtime_model, 'model', '')) if self.pipeline.stt: p_class, p_model = self._get_provider_info(self.pipeline.stt, 'stt') metrics_collector.set_provider_info("stt", p_class, p_model) if self.pipeline.tts: p_class, p_model = self._get_provider_info(self.pipeline.tts, 'tts') metrics_collector.set_provider_info("tts", p_class, p_model) # Traces flow manager setup traces_flow_manager = metrics_collector.traces_flow_manager if traces_flow_manager: config_attributes = { "system_instructions": self.agent.instructions, "function_tools": [ get_tool_info(tool).name for tool in ( [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools] if self.agent.mcp_manager else self.agent.tools ) ] if self.agent.tools else [], "mcp_tools": [ tool._tool_info.name for tool in self.agent.mcp_manager.tools ] if self.agent.mcp_manager else [], "pipeline": self.pipeline.__class__.__name__, "pipeline_mode": self.pipeline.config.pipeline_mode.value, "transport_mode": metrics_collector.transport_mode } start_time = time.perf_counter() config_attributes["start_time"] = start_time await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({"start_time": start_time}) if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) await self.pipeline.start() if hasattr(self.agent, 'a2a'): self.agent.a2a._attach_deferred_listeners() if self._should_delay_for_sip_user(): logger.info("SIP user detected, waiting for audio stream to be enabled before calling on_enter") audio_stream_enabled = asyncio.Event() def on_audio_stream_enabled(data): stream = data.get("stream") participant = data.get("participant") if stream and stream.kind == "audio" and participant and participant.meta_data.get("sipUser"): logger.info(f"SIP user audio stream enabled for participant {participant.id}") audio_stream_enabled.set() global_event_emitter.on("AUDIO_STREAM_ENABLED", on_audio_stream_enabled) async def wait_and_start(): try: await audio_stream_enabled.wait() logger.info("SIP user audio stream enabled, proceeding with on_enter") await self.agent.on_enter() global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer() self._emit_agent_state(AgentState.IDLE) except Exception as e: logger.error(f"Error in wait_and_start: {e}") finally: global_event_emitter.off("AUDIO_STREAM_ENABLED", on_audio_stream_enabled) asyncio.create_task(wait_and_start()) return await self.agent.on_enter() global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer() self._emit_agent_state(AgentState.IDLE)Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set) 5. Optionally handle full lifecycle management (connect, wait, shutdown)
Args
wait_for_participant- If True, wait for a participant to join before starting
run_until_shutdown- If True, manage the full lifecycle including connection, waiting for shutdown signals, and cleanup. This is a convenience that internally calls ctx.run_until_shutdown() with this session.
**kwargs- Additional arguments to pass to the pipeline start method
Examples
Simple start (manual lifecycle management):
await session.start()Full lifecycle management (recommended):
await session.start(wait_for_participant=True, run_until_shutdown=True) async def start_thinking_audio(self)-
Expand source code
async def start_thinking_audio(self): """Start thinking audio""" if self._background_audio_player and self._background_audio_player.is_playing: return audio_track = self._get_audio_track() if not hasattr(audio_track, 'add_background_bytes'): logger.warning( "Cannot play 'thinking' audio. This feature requires the mixing audio track. " "Enable it by setting `background_audio=True` in RoomOptions." ) return if self.agent._thinking_background_config and audio_track: self._thinking_audio_player = BackgroundAudioHandler(self.agent._thinking_background_config, audio_track) await self._thinking_audio_player.start() # Track thinking audio start for metrics metrics_collector.on_thinking_audio_start( file_path=self.agent._thinking_background_config.file_path, looping=self.agent._thinking_background_config.looping )Start thinking audio
async def stop_background_audio(self) ‑> None-
Expand source code
async def stop_background_audio(self) -> None: """Stop background audio on demand""" if self._background_audio_player: await self._background_audio_player.stop() self._background_audio_player = None # Track background audio stop for metrics metrics_collector.on_background_audio_stop() if self._thinking_was_playing: await self.start_thinking_audio() self._thinking_was_playing = FalseStop background audio on demand
async def stop_thinking_audio(self)-
Expand source code
async def stop_thinking_audio(self): """Stop thinking audio""" if self._thinking_audio_player: await self._thinking_audio_player.stop() self._thinking_audio_player = None # Track thinking audio stop for metrics metrics_collector.on_thinking_audio_stop()Stop thinking audio
Inherited members