Module agents.agent_session
Classes
class AgentSession (agent: Agent,
pipeline: Pipeline,
wake_up: Optional[int] = None,
background_audio: Optional[BackgroundAudioHandlerConfig] = None,
dtmf_handler: Optional[DTMFHandler] = None,
voice_mail_detector: Optional[VoiceMailDetector] = None)-
Expand source code
class AgentSession(EventEmitter[Literal["user_state_changed", "agent_state_changed", "warm_transfer"]]): """ Manages an agent session with its associated conversation flow and pipeline. """ def __init__( self, agent: Agent, pipeline: Pipeline, wake_up: Optional[int] = None, background_audio: Optional[BackgroundAudioHandlerConfig] = None, dtmf_handler: Optional[DTMFHandler] = None, voice_mail_detector: Optional[VoiceMailDetector] = None, ) -> None: """ Initialize an agent session. Args: agent: Instance of an Agent class that handles the core logic pipeline: Pipeline instance to process the agent's operations wake_up: Time in seconds after which to trigger wake-up callback if no speech detected background_audio: Configuration for background audio (optional) dtmf_handler: DTMF handler for phone number input (optional) voice_mail_detector: Voicemail detector (optional) """ super().__init__() self.agent = agent self.pipeline = pipeline self.agent.session = self self._accept_user_input: bool = False self.wake_up = wake_up self.on_wake_up: Optional[Callable[[], None] | Callable[[], Any]] = None self._wake_up_task: Optional[asyncio.Task] = None self._wake_up_timer_active = False self._closed: bool = False self._reply_in_progress: bool = False self._user_state: UserState = UserState.IDLE self._agent_state: AgentState = AgentState.IDLE self.current_utterance: Optional[UtteranceHandle] = None self._thinking_audio_player: Optional[BackgroundAudioHandler] = None self._background_audio_player: Optional[BackgroundAudioHandler] = None self._thinking_was_playing = False self._override_thinking: bool = False self.background_audio_config = background_audio self._is_executing_tool = False self._job_context = None self.dtmf_handler = dtmf_handler self.voice_mail_detector = voice_mail_detector self._is_voice_mail_detected = False self._warm_transfer_in_progress: bool = False self._warm_transfer_task: Optional[asyncio.Task] = None # Set agent on pipeline (pipeline handles all internal wiring) if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) # Setup voicemail detection if self.voice_mail_detector: if hasattr(self.pipeline, "set_voice_mail_detector"): self.pipeline.set_voice_mail_detector(self.voice_mail_detector) if hasattr(self.pipeline, "on"): self.pipeline.on("voicemail_result", self._handle_voicemail_result) # Setup wake-up callback if hasattr(self.pipeline, 'set_wake_up_callback'): self.pipeline.set_wake_up_callback(self._reset_wake_up_timer) # Get job context try: job_ctx = get_current_job_context() if job_ctx: self._job_context = job_ctx job_ctx.add_shutdown_callback(self.close) except Exception as e: logger.error(f"AgentSession: Error in session initialization: {e}") self._job_context = None @property def is_voicemail_detected(self) -> bool: """Returns True if voicemail was detected in this session.""" return self._is_voicemail_detected def _handle_voicemail_result(self, data: dict) -> None: """ Handler for the voicemail_result event from ConversationFlow. Updates session state and executes callback if needed. """ is_vm = data.get("is_voicemail", False) self._is_voicemail_detected = is_vm if is_vm: logger.info("AgentSession: Voicemail confirmed. Executing callback.") if self.voice_mail_detector.callback: asyncio.create_task(self._safe_execute_vmd_callback()) async def _safe_execute_vmd_callback(self) -> None: try: if self.voice_mail_detector.callback: await self.voice_mail_detector.callback() except Exception as e: logger.error(f"Error executing voicemail callback: {e}") def _start_wake_up_timer(self) -> None: if self.wake_up is not None and self.on_wake_up is not None: if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() self._wake_up_timer_active = True self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop()) def _reset_wake_up_timer(self) -> None: if self.wake_up is not None and self.on_wake_up is not None: if self._reply_in_progress: return if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() self._wake_up_timer_active = True self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop()) def _pause_wake_up_timer(self) -> None: self._wake_up_timer_active = False if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() def _cancel_wake_up_timer(self) -> None: if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() self._wake_up_timer_active = False async def _wake_up_timer_loop(self) -> None: try: await asyncio.sleep(self.wake_up) if ( self._wake_up_timer_active and self.on_wake_up and not self._reply_in_progress and self._user_state != UserState.SPEAKING ): if asyncio.iscoroutinefunction(self.on_wake_up): asyncio.create_task(self.on_wake_up()) else: self.on_wake_up() except asyncio.CancelledError: pass def _emit_user_state(self, state: UserState, data: dict | None = None) -> None: if state != self._user_state: self._user_state = state payload = {"state": state.value, **(data or {})} self.emit("user_state_changed", payload) def _emit_agent_state(self, state: AgentState, data: dict | None = None) -> None: if state != self._agent_state: self._agent_state = state payload = {"state": state.value, **(data or {})} self.emit("agent_state_changed", payload) global_event_emitter.emit("AGENT_STATE_CHANGED", {"state": state.value}) @property def user_state(self) -> UserState: return self._user_state @property def agent_state(self) -> AgentState: return self._agent_state @property def is_background_audio_enabled(self) -> bool: """Check if background audio is enabled in the pipeline""" audio_track = self._get_audio_track() return hasattr(audio_track, 'add_background_bytes') async def start( self, wait_for_participant: bool = False, run_until_shutdown: bool = False, *, observability: Optional[ObservabilityOptions] = None, **kwargs: Any ) -> None: """ Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set) 5. Optionally handle full lifecycle management (connect, wait, shutdown) Args: wait_for_participant: If True, wait for a participant to join before starting run_until_shutdown: If True, manage the full lifecycle including connection, waiting for shutdown signals, and cleanup. This is a convenience that internally calls ctx.run_until_shutdown() with this session. observability: Grouped recording/traces/metrics/logs config. When passed, it is applied to ``ctx.room_options.observability`` before the room connects, taking precedence over any value set on ``RoomOptions``. Requires ``run_until_shutdown=True``; a warning is logged otherwise. **kwargs: Additional arguments to pass to the pipeline start method Examples: Simple start (manual lifecycle management): ```python await session.start() ``` Full lifecycle management (recommended): ```python await session.start(wait_for_participant=True, run_until_shutdown=True) ``` Inline observability: ```python await session.start( wait_for_participant=True, run_until_shutdown=True, observability=ObservabilityOptions( recording=RecordingOptions(video=True), ), ) ``` """ if observability is not None: ctx = None try: ctx = get_current_job_context() except Exception as e: logger.warning(f"Could not resolve JobContext for observability overrides: {e}") if ctx is not None and ctx.room_options is not None: if not run_until_shutdown: logger.warning( "observability= passed to session.start() but run_until_shutdown=False; " "overrides only apply if ctx.connect() has not yet run." ) ctx.room_options.observability = observability else: logger.warning( "observability= passed to session.start() but no active JobContext " "(or no RoomOptions) was found; ignoring." ) if run_until_shutdown: try: ctx = get_current_job_context() if ctx: logger.info("Starting session with full lifecycle management") await ctx.run_until_shutdown( session=self, wait_for_participant=wait_for_participant ) return else: logger.warning( "run_until_shutdown=True requires a JobContext, " "falling back to normal start()" ) except Exception as e: logger.warning( f"Failed to get JobContext for run_until_shutdown: {e}, " "falling back to normal start()" ) self._emit_agent_state(AgentState.STARTING) if self.agent._mcp_servers: await self.agent.initialize_mcp() if self.dtmf_handler: await self.dtmf_handler.start() # Configure metrics with session info metrics_collector.set_system_instructions(self.agent.instructions) # Set provider info based on pipeline components if not self.pipeline.config.is_realtime: if self.pipeline.stt: p_class, p_model = self._get_provider_info(self.pipeline.stt, 'stt') metrics_collector.set_provider_info("stt", p_class, p_model) if self.pipeline.llm: p_class, p_model = self._get_provider_info(self.pipeline.llm, 'llm') metrics_collector.set_provider_info("llm", p_class, p_model) if self.pipeline.tts: p_class, p_model = self._get_provider_info(self.pipeline.tts, 'tts') metrics_collector.set_provider_info("tts", p_class, p_model) if hasattr(self.pipeline, 'vad') and self.pipeline.vad: p_class, p_model = self._get_provider_info(self.pipeline.vad, 'vad') metrics_collector.set_provider_info("vad", p_class, p_model) if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector: p_class, p_model = self._get_provider_info(self.pipeline.turn_detector, 'eou') metrics_collector.set_provider_info("eou", p_class, p_model) else: if self.pipeline._realtime_model: metrics_collector.set_provider_info("realtime", format_provider_class(self.pipeline._realtime_model), getattr(self.pipeline._realtime_model, 'model', '')) if self.pipeline.stt: p_class, p_model = self._get_provider_info(self.pipeline.stt, 'stt') metrics_collector.set_provider_info("stt", p_class, p_model) if self.pipeline.tts: p_class, p_model = self._get_provider_info(self.pipeline.tts, 'tts') metrics_collector.set_provider_info("tts", p_class, p_model) # Traces flow manager setup traces_flow_manager = metrics_collector.traces_flow_manager if traces_flow_manager: config_attributes = { "system_instructions": self.agent.instructions, "function_tools": [ get_tool_info(tool).name for tool in ( [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools] if self.agent.mcp_manager else self.agent.tools ) ] if self.agent.tools else [], "mcp_tools": [ tool._tool_info.name for tool in self.agent.mcp_manager.tools ] if self.agent.mcp_manager else [], "pipeline": self.pipeline.__class__.__name__, "pipeline_mode": self.pipeline.config.pipeline_mode.value, "transport_mode": metrics_collector.transport_mode } start_time = time.perf_counter() config_attributes["start_time"] = start_time await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({"start_time": start_time}) if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) await self.pipeline.start() if hasattr(self.agent, 'a2a'): self.agent.a2a._attach_deferred_listeners() if self._should_delay_for_sip_user(): logger.info("SIP user detected, waiting for audio stream to be enabled before calling on_enter") audio_stream_enabled = asyncio.Event() def on_audio_stream_enabled(data): stream = data.get("stream") participant = data.get("participant") if stream and stream.kind == "audio" and participant and participant.meta_data.get("sipUser"): logger.info(f"SIP user audio stream enabled for participant {participant.id}") audio_stream_enabled.set() global_event_emitter.on("AUDIO_STREAM_ENABLED", on_audio_stream_enabled) async def wait_and_start(): try: await audio_stream_enabled.wait() logger.info("SIP user audio stream enabled, proceeding with on_enter") await self.agent.on_enter() self._accept_user_input = True global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer() self._emit_agent_state(AgentState.IDLE) except Exception as e: logger.error(f"Error in wait_and_start: {e}") finally: global_event_emitter.off("AUDIO_STREAM_ENABLED", on_audio_stream_enabled) asyncio.create_task(wait_and_start()) return await self.agent.on_enter() self._accept_user_input = True global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer() self._emit_agent_state(AgentState.IDLE) def _get_provider_info(self, component, comp_name): configs = self.pipeline.get_component_configs() if hasattr(self.pipeline, 'get_component_configs') else {} if not component: return "", "" default_model = configs.get(comp_name, {}).get('model', '') if hasattr(component, 'active_provider') and component.active_provider is not None: provider_class = format_provider_class(component.active_provider) model = getattr(component.active_provider, 'model', getattr(component.active_provider, 'model_id', getattr(component.active_provider, 'speech_model', getattr(component.active_provider, 'voice_id', getattr(component.active_provider, 'voice', getattr(component.active_provider, 'speaker', default_model)))))) else: provider_class = format_provider_class(component) model = getattr(component, 'model', getattr(component, 'model_id', getattr(component, 'speech_model', getattr(component, 'voice_id', getattr(component, 'voice', getattr(component, 'speaker', default_model)))))) return provider_class, str(model) async def say( self, message: str, interruptible: bool = True, audio_data: bytes | bytearray | Iterable[bytes] | AsyncIterator[bytes] | None = None, add_to_chat_context: bool = True, ) -> UtteranceHandle: """ Send an initial message to the agent and return a handle to track it. When called from inside a function tool (_is_executing_tool), the current turn's utterance is not interrupted or replaced, so the LLM stream can continue after the tool returns. Args: message: The text being spoken. Always used for transcript/chat context (unless ``add_to_chat_context=False``). interruptible: If False, ``handle.interrupt()`` will raise unless forced. audio_data: Optional pre-synthesized PCM bytes (int16, matching the agent audio track sample rate). When provided, the TTS provider is bypassed and the bytes are streamed straight to the audio track. Accepts a single ``bytes`` blob, an iterable of ``bytes`` chunks, or an async iterator of ``bytes`` chunks. Use ``load_audio_file`` for WAV files or :class:`TTSAudioCache` to cache TTS output by text. add_to_chat_context: When False, the message is spoken but NOT added to the agent's chat history (useful for transient hold messages inside function tools). """ handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}", interruptible=interruptible) self._accept_user_input = True if not self._is_executing_tool: if self.current_utterance and not self.current_utterance.done(): self.current_utterance.interrupt() self.current_utterance = handle traces_flow_manager = metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_say_called(message) if add_to_chat_context: self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message) if hasattr(self.pipeline, 'send_message'): await self.pipeline.send_message(message, handle=handle, audio_data=audio_data) return handle async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) -> None: """Play background audio on demand. override_thinking=True -> thinking audio is allowed to layer over the background music during LLM generation (stops on first TTS byte). override_thinking=False -> background music is exclusive; thinking audio is suppressed for as long as background is playing. """ self._override_thinking = override_thinking if not override_thinking and self._thinking_audio_player and self._thinking_audio_player.is_playing: await self.stop_thinking_audio() self._thinking_was_playing = True audio_track = self._get_audio_track() if not hasattr(audio_track, 'add_background_bytes'): logger.warning( "Cannot play background audio. This feature requires the mixing audio track. " "Enable it by setting `background_audio=True` in RoomOptions." ) return if audio_track: self._background_audio_player = BackgroundAudioHandler(config, audio_track) await self._background_audio_player.start() # Track background audio start for metrics metrics_collector.on_background_audio_start( file_path=config.file_path, looping=config.looping ) async def stop_background_audio(self) -> None: """Stop background audio on demand""" if self._background_audio_player: await self._background_audio_player.stop() self._background_audio_player = None # Track background audio stop for metrics metrics_collector.on_background_audio_stop() self._override_thinking = False if self._thinking_was_playing: await self.start_thinking_audio() self._thinking_was_playing = False def _get_audio_track(self): """Get audio track from pipeline""" if self.pipeline is None: return None if self.pipeline.config.is_realtime: model = getattr(self.pipeline, '_realtime_model', None) if model and hasattr(model, 'audio_track'): return model.audio_track if self.pipeline.tts and hasattr(self.pipeline.tts, 'audio_track'): return self.pipeline.tts.audio_track return None async def start_thinking_audio(self): """Start thinking audio""" if ( self._background_audio_player and self._background_audio_player.is_playing and not self._override_thinking ): return if self._thinking_audio_player and self._thinking_audio_player.is_playing: return audio_track = self._get_audio_track() if not hasattr(audio_track, 'add_background_bytes'): logger.warning( "Cannot play 'thinking' audio. This feature requires the mixing audio track. " "Enable it by setting `background_audio=True` in RoomOptions." ) return if self.agent._thinking_background_config and audio_track: self._thinking_audio_player = BackgroundAudioHandler(self.agent._thinking_background_config, audio_track) await self._thinking_audio_player.start() # Track thinking audio start for metrics metrics_collector.on_thinking_audio_start( file_path=self.agent._thinking_background_config.file_path, looping=self.agent._thinking_background_config.looping ) async def stop_thinking_audio(self): """Stop thinking audio""" if self._thinking_audio_player: await self._thinking_audio_player.stop() self._thinking_audio_player = None # Track thinking audio stop for metrics metrics_collector.on_thinking_audio_stop() async def reply(self, instructions: str, wait_for_playback: bool = True, frames: list[av.VideoFrame] | None = None, interruptible: bool = True) -> UtteranceHandle: """ Generate a response from agent using instructions and current chat context. This method is safe to call from function tools - it will automatically detect re-entrant calls and schedule them as background tasks. Args: instructions: Instructions to add to chat context wait_for_playback: If True, wait for playback to complete frames: Optional list of VideoFrame objects to include in the reply Returns: UtteranceHandle: A handle to track the utterance lifecycle """ self._accept_user_input = True if self._reply_in_progress: if self.current_utterance: return self.current_utterance handle = UtteranceHandle(utterance_id="placeholder", interruptible=interruptible) handle._mark_done() return handle handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}", interruptible=interruptible) self.current_utterance = handle if self._is_executing_tool: asyncio.create_task( self._internal_blocking_reply(instructions, wait_for_playback, handle, frames) ) return handle else: await self._internal_blocking_reply(instructions, wait_for_playback, handle, frames) return handle async def _internal_blocking_reply(self, instructions: str, wait_for_playback: bool, handle: UtteranceHandle, frames: list[av.VideoFrame] | None = None) -> None: """ The original, blocking logic of the reply method. """ if not instructions: handle._mark_done() return self._reply_in_progress = True self._pause_wake_up_timer() try: # Call pipeline's reply_with_context if hasattr(self.pipeline, 'reply_with_context'): await self.pipeline.reply_with_context(instructions, wait_for_playback, handle=handle, frames=frames) if wait_for_playback: await handle finally: self._reply_in_progress = False if not handle.done(): handle._mark_done() def interrupt(self, *, force: bool = False) -> None: """ Interrupt the agent's current speech. """ if self.current_utterance and not self.current_utterance.interrupted: try: self.current_utterance.interrupt(force=force) except RuntimeError as e: logger.warning(f"Could not interrupt utterance: {e}") return if hasattr(self.pipeline, 'interrupt'): self.pipeline.interrupt() def get_context_history( self, *, include_function_calls: bool = False, include_system_messages: bool = False, ) -> list[dict]: """ Get the current chat context history (role/content items). Users can access the conversation history whenever they want, without needing any transcript hooks. """ if not self.agent or not self.agent.chat_context: return [] context_copy = self.agent.chat_context.copy( tools=None if include_function_calls else [], exclude_system_messages=not include_system_messages, ) return context_copy.to_dict()["items"] async def close(self) -> None: """ Close the agent session. """ logger.info("Closing agent session") if self._closed: logger.info("Agent session already closed") return self._closed = True self._emit_agent_state(AgentState.CLOSING) metrics_collector.finalize_session() traces_flow_manager = metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() self._cancel_wake_up_timer() logger.info("Cleaning up agent session") try: await self.agent.on_exit() except Exception as e: logger.error(f"Error in agent.on_exit(): {e}") if self._thinking_audio_player: await self._thinking_audio_player.stop() if self._background_audio_player: await self._background_audio_player.stop() try: await self.pipeline.cleanup() except Exception as e: logger.error(f"Error cleaning up pipeline: {e}") try: await self.agent.cleanup() except Exception as e: logger.error(f"Error cleaning up agent: {e}") self.agent = None self.pipeline = None self.on_wake_up = None self._wake_up_task = None logger.info("Agent session cleaned up") async def leave(self) -> None: """ Leave the agent session. """ self._emit_agent_state(AgentState.CLOSING) await self.pipeline.leave() async def hangup(self, reason: str = "manual_hangup") -> None: """ Hang up the session, leaving the room immediately if possible. """ job_ctx = self._job_context if not job_ctx: try: job_ctx = get_current_job_context() except Exception: job_ctx = None room = getattr(job_ctx, "room", None) if job_ctx else None if room and hasattr(room, "force_end_session"): try: await room.force_end_session(reason) return except Exception as exc: logger.error(f"Error forcing room to end session: {exc}") await self.close() async def call_transfer(self,token: str, transfer_to: str) -> None: """ Transfer the call to a provided Phone number or SIP endpoint. Args: token: VideoSDK auth token. transfer_to: Phone number or SIP endpoint to transfer the call to. """ job_ctx = self._job_context if not job_ctx: try: job_ctx = get_current_job_context() except Exception: job_ctx = None room = getattr(job_ctx, "room", None) if job_ctx else None if room and hasattr(room, "call_transfer"): try: await room.call_transfer(token, transfer_to) return except Exception as exc: logger.error(f"Error calling call_transfer: {exc}") def _should_delay_for_sip_user(self) -> bool: """Check if there are SIP users in the room that need audio stream initialization""" job_ctx = self._job_context if not job_ctx: try: job_ctx = get_current_job_context() except Exception: job_ctx = None room = getattr(job_ctx, "room", None) if job_ctx else None if room and hasattr(room, "participants_data"): participants = room.participants_data for participant_info in participants.values(): # SIP-specific on_enter logic is currently limited to outbound calls. if participant_info.get("sipUser") and participant_info.get("sipCallType") == "outbound": return True return False def on_warm_transfer(self, phase: Optional[str] = None, callback: Optional[Callable[..., Any]] = None): """Subscribe to warm-transfer phase changes (decorator or imperative). ``@session.on_warm_transfer()`` sees every phase; pass a phase name to filter to one. Handlers are sync and receive the ``warm_transfer`` event payload: ``{"phase": WarmTransferPhase, "data": {...}, "timestamp": float, "consultation_room_id": Optional[str]}``. """ def _wrap(handler: Callable[..., Any]) -> Callable[..., Any]: if phase is None: return self.on("warm_transfer", handler) def _filtered(payload): pv = payload.get("phase") if isinstance(payload, dict) else None if getattr(pv, "value", pv) == phase: handler(payload) _filtered.__name__ = f"{handler.__name__}__phase_{phase}" return self.on("warm_transfer", _filtered) return _wrap if callback is None else _wrap(callback) async def warm_transfer(self, config: "WarmTransferConfig") -> "WarmTransferResult": """Run a SIP-to-SIP warm transfer to a human supervisor. See :mod:`videosdk.agents.warm_transfer` for the config surface and the ``@session.on_warm_transfer(...)`` phase hook. The transfer runs in its own task and is shielded from the caller of this method being cancelled (this is usually invoked from a ``@function_tool``, and that task gets cancelled whenever the caller talks over the agent / triggers an interruption — which must not abort an in-flight transfer). If the awaiting tool is cancelled, the transfer keeps running to completion in the background; the result is simply not returned. """ from .warm_transfer import WarmTransferRunner, WarmTransferError if self._warm_transfer_in_progress: raise WarmTransferError("A warm transfer is already in progress for this session.") self._warm_transfer_in_progress = True self._warm_transfer_task = asyncio.ensure_future(WarmTransferRunner(self, config).run()) def _clear(_task: "asyncio.Task") -> None: self._warm_transfer_in_progress = False self._warm_transfer_task = None self._warm_transfer_task.add_done_callback(_clear) return await asyncio.shield(self._warm_transfer_task)Manages an agent session with its associated conversation flow and pipeline.
Initialize an agent session.
Args
agent- Instance of an Agent class that handles the core logic
pipeline- Pipeline instance to process the agent's operations
wake_up- Time in seconds after which to trigger wake-up callback if no speech detected
background_audio- Configuration for background audio (optional)
dtmf_handler- DTMF handler for phone number input (optional)
voice_mail_detector- Voicemail detector (optional)
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop agent_state : AgentState-
Expand source code
@property def agent_state(self) -> AgentState: return self._agent_state prop is_background_audio_enabled : bool-
Expand source code
@property def is_background_audio_enabled(self) -> bool: """Check if background audio is enabled in the pipeline""" audio_track = self._get_audio_track() return hasattr(audio_track, 'add_background_bytes')Check if background audio is enabled in the pipeline
prop is_voicemail_detected : bool-
Expand source code
@property def is_voicemail_detected(self) -> bool: """Returns True if voicemail was detected in this session.""" return self._is_voicemail_detectedReturns True if voicemail was detected in this session.
prop user_state : UserState-
Expand source code
@property def user_state(self) -> UserState: return self._user_state
Methods
async def call_transfer(self, token: str, transfer_to: str) ‑> None-
Expand source code
async def call_transfer(self,token: str, transfer_to: str) -> None: """ Transfer the call to a provided Phone number or SIP endpoint. Args: token: VideoSDK auth token. transfer_to: Phone number or SIP endpoint to transfer the call to. """ job_ctx = self._job_context if not job_ctx: try: job_ctx = get_current_job_context() except Exception: job_ctx = None room = getattr(job_ctx, "room", None) if job_ctx else None if room and hasattr(room, "call_transfer"): try: await room.call_transfer(token, transfer_to) return except Exception as exc: logger.error(f"Error calling call_transfer: {exc}")Transfer the call to a provided Phone number or SIP endpoint.
Args
token- VideoSDK auth token.
transfer_to- Phone number or SIP endpoint to transfer the call to.
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """ Close the agent session. """ logger.info("Closing agent session") if self._closed: logger.info("Agent session already closed") return self._closed = True self._emit_agent_state(AgentState.CLOSING) metrics_collector.finalize_session() traces_flow_manager = metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() self._cancel_wake_up_timer() logger.info("Cleaning up agent session") try: await self.agent.on_exit() except Exception as e: logger.error(f"Error in agent.on_exit(): {e}") if self._thinking_audio_player: await self._thinking_audio_player.stop() if self._background_audio_player: await self._background_audio_player.stop() try: await self.pipeline.cleanup() except Exception as e: logger.error(f"Error cleaning up pipeline: {e}") try: await self.agent.cleanup() except Exception as e: logger.error(f"Error cleaning up agent: {e}") self.agent = None self.pipeline = None self.on_wake_up = None self._wake_up_task = None logger.info("Agent session cleaned up")Close the agent session.
def get_context_history(self,
*,
include_function_calls: bool = False,
include_system_messages: bool = False) ‑> list[dict]-
Expand source code
def get_context_history( self, *, include_function_calls: bool = False, include_system_messages: bool = False, ) -> list[dict]: """ Get the current chat context history (role/content items). Users can access the conversation history whenever they want, without needing any transcript hooks. """ if not self.agent or not self.agent.chat_context: return [] context_copy = self.agent.chat_context.copy( tools=None if include_function_calls else [], exclude_system_messages=not include_system_messages, ) return context_copy.to_dict()["items"]Get the current chat context history (role/content items).
Users can access the conversation history whenever they want, without needing any transcript hooks.
async def hangup(self, reason: str = 'manual_hangup') ‑> None-
Expand source code
async def hangup(self, reason: str = "manual_hangup") -> None: """ Hang up the session, leaving the room immediately if possible. """ job_ctx = self._job_context if not job_ctx: try: job_ctx = get_current_job_context() except Exception: job_ctx = None room = getattr(job_ctx, "room", None) if job_ctx else None if room and hasattr(room, "force_end_session"): try: await room.force_end_session(reason) return except Exception as exc: logger.error(f"Error forcing room to end session: {exc}") await self.close()Hang up the session, leaving the room immediately if possible.
def interrupt(self, *, force: bool = False) ‑> None-
Expand source code
def interrupt(self, *, force: bool = False) -> None: """ Interrupt the agent's current speech. """ if self.current_utterance and not self.current_utterance.interrupted: try: self.current_utterance.interrupt(force=force) except RuntimeError as e: logger.warning(f"Could not interrupt utterance: {e}") return if hasattr(self.pipeline, 'interrupt'): self.pipeline.interrupt()Interrupt the agent's current speech.
async def leave(self) ‑> None-
Expand source code
async def leave(self) -> None: """ Leave the agent session. """ self._emit_agent_state(AgentState.CLOSING) await self.pipeline.leave()Leave the agent session.
def on_warm_transfer(self,
phase: Optional[str] = None,
callback: Optional[Callable[..., Any]] = None)-
Expand source code
def on_warm_transfer(self, phase: Optional[str] = None, callback: Optional[Callable[..., Any]] = None): """Subscribe to warm-transfer phase changes (decorator or imperative). ``@session.on_warm_transfer()`` sees every phase; pass a phase name to filter to one. Handlers are sync and receive the ``warm_transfer`` event payload: ``{"phase": WarmTransferPhase, "data": {...}, "timestamp": float, "consultation_room_id": Optional[str]}``. """ def _wrap(handler: Callable[..., Any]) -> Callable[..., Any]: if phase is None: return self.on("warm_transfer", handler) def _filtered(payload): pv = payload.get("phase") if isinstance(payload, dict) else None if getattr(pv, "value", pv) == phase: handler(payload) _filtered.__name__ = f"{handler.__name__}__phase_{phase}" return self.on("warm_transfer", _filtered) return _wrap if callback is None else _wrap(callback)Subscribe to warm-transfer phase changes (decorator or imperative).
@session.on_warm_transfer()sees every phase; pass a phase name to filter to one. Handlers are sync and receive thewarm_transferevent payload:{"phase": WarmTransferPhase, "data": {...}, "timestamp": float, "consultation_room_id": Optional[str]}. async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) ‑> None-
Expand source code
async def play_background_audio(self, config: BackgroundAudioHandlerConfig, override_thinking: bool) -> None: """Play background audio on demand. override_thinking=True -> thinking audio is allowed to layer over the background music during LLM generation (stops on first TTS byte). override_thinking=False -> background music is exclusive; thinking audio is suppressed for as long as background is playing. """ self._override_thinking = override_thinking if not override_thinking and self._thinking_audio_player and self._thinking_audio_player.is_playing: await self.stop_thinking_audio() self._thinking_was_playing = True audio_track = self._get_audio_track() if not hasattr(audio_track, 'add_background_bytes'): logger.warning( "Cannot play background audio. This feature requires the mixing audio track. " "Enable it by setting `background_audio=True` in RoomOptions." ) return if audio_track: self._background_audio_player = BackgroundAudioHandler(config, audio_track) await self._background_audio_player.start() # Track background audio start for metrics metrics_collector.on_background_audio_start( file_path=config.file_path, looping=config.looping )Play background audio on demand.
override_thinking=True -> thinking audio is allowed to layer over the background music during LLM generation (stops on first TTS byte). override_thinking=False -> background music is exclusive; thinking audio is suppressed for as long as background is playing.
async def reply(self,
instructions: str,
wait_for_playback: bool = True,
frames: list[av.VideoFrame] | None = None,
interruptible: bool = True) ‑> UtteranceHandle-
Expand source code
async def reply(self, instructions: str, wait_for_playback: bool = True, frames: list[av.VideoFrame] | None = None, interruptible: bool = True) -> UtteranceHandle: """ Generate a response from agent using instructions and current chat context. This method is safe to call from function tools - it will automatically detect re-entrant calls and schedule them as background tasks. Args: instructions: Instructions to add to chat context wait_for_playback: If True, wait for playback to complete frames: Optional list of VideoFrame objects to include in the reply Returns: UtteranceHandle: A handle to track the utterance lifecycle """ self._accept_user_input = True if self._reply_in_progress: if self.current_utterance: return self.current_utterance handle = UtteranceHandle(utterance_id="placeholder", interruptible=interruptible) handle._mark_done() return handle handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}", interruptible=interruptible) self.current_utterance = handle if self._is_executing_tool: asyncio.create_task( self._internal_blocking_reply(instructions, wait_for_playback, handle, frames) ) return handle else: await self._internal_blocking_reply(instructions, wait_for_playback, handle, frames) return handleGenerate a response from agent using instructions and current chat context.
This method is safe to call from function tools - it will automatically detect re-entrant calls and schedule them as background tasks.
Args
instructions- Instructions to add to chat context
wait_for_playback- If True, wait for playback to complete
frames- Optional list of VideoFrame objects to include in the reply
Returns
UtteranceHandle- A handle to track the utterance lifecycle
async def say(self,
message: str,
interruptible: bool = True,
audio_data: bytes | bytearray | Iterable[bytes] | AsyncIterator[bytes] | None = None,
add_to_chat_context: bool = True) ‑> UtteranceHandle-
Expand source code
async def say( self, message: str, interruptible: bool = True, audio_data: bytes | bytearray | Iterable[bytes] | AsyncIterator[bytes] | None = None, add_to_chat_context: bool = True, ) -> UtteranceHandle: """ Send an initial message to the agent and return a handle to track it. When called from inside a function tool (_is_executing_tool), the current turn's utterance is not interrupted or replaced, so the LLM stream can continue after the tool returns. Args: message: The text being spoken. Always used for transcript/chat context (unless ``add_to_chat_context=False``). interruptible: If False, ``handle.interrupt()`` will raise unless forced. audio_data: Optional pre-synthesized PCM bytes (int16, matching the agent audio track sample rate). When provided, the TTS provider is bypassed and the bytes are streamed straight to the audio track. Accepts a single ``bytes`` blob, an iterable of ``bytes`` chunks, or an async iterator of ``bytes`` chunks. Use ``load_audio_file`` for WAV files or :class:`TTSAudioCache` to cache TTS output by text. add_to_chat_context: When False, the message is spoken but NOT added to the agent's chat history (useful for transient hold messages inside function tools). """ handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}", interruptible=interruptible) self._accept_user_input = True if not self._is_executing_tool: if self.current_utterance and not self.current_utterance.done(): self.current_utterance.interrupt() self.current_utterance = handle traces_flow_manager = metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_say_called(message) if add_to_chat_context: self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message) if hasattr(self.pipeline, 'send_message'): await self.pipeline.send_message(message, handle=handle, audio_data=audio_data) return handleSend an initial message to the agent and return a handle to track it. When called from inside a function tool (_is_executing_tool), the current turn's utterance is not interrupted or replaced, so the LLM stream can continue after the tool returns.
Args
message- The text being spoken. Always used for transcript/chat
context (unless
add_to_chat_context=False). interruptible- If False,
handle.interrupt()will raise unless forced. audio_data- Optional pre-synthesized PCM bytes (int16, matching
the agent audio track sample rate). When provided, the TTS
provider is bypassed and the bytes are streamed straight to
the audio track. Accepts a single
bytesblob, an iterable ofbyteschunks, or an async iterator ofbyteschunks. Useload_audio_filefor WAV files or :class:TTSAudioCacheto cache TTS output by text. add_to_chat_context- When False, the message is spoken but NOT added to the agent's chat history (useful for transient hold messages inside function tools).
async def start(self,
wait_for_participant: bool = False,
run_until_shutdown: bool = False,
*,
observability: Optional[ObservabilityOptions] = None,
**kwargs: Any) ‑> None-
Expand source code
async def start( self, wait_for_participant: bool = False, run_until_shutdown: bool = False, *, observability: Optional[ObservabilityOptions] = None, **kwargs: Any ) -> None: """ Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set) 5. Optionally handle full lifecycle management (connect, wait, shutdown) Args: wait_for_participant: If True, wait for a participant to join before starting run_until_shutdown: If True, manage the full lifecycle including connection, waiting for shutdown signals, and cleanup. This is a convenience that internally calls ctx.run_until_shutdown() with this session. observability: Grouped recording/traces/metrics/logs config. When passed, it is applied to ``ctx.room_options.observability`` before the room connects, taking precedence over any value set on ``RoomOptions``. Requires ``run_until_shutdown=True``; a warning is logged otherwise. **kwargs: Additional arguments to pass to the pipeline start method Examples: Simple start (manual lifecycle management): ```python await session.start() ``` Full lifecycle management (recommended): ```python await session.start(wait_for_participant=True, run_until_shutdown=True) ``` Inline observability: ```python await session.start( wait_for_participant=True, run_until_shutdown=True, observability=ObservabilityOptions( recording=RecordingOptions(video=True), ), ) ``` """ if observability is not None: ctx = None try: ctx = get_current_job_context() except Exception as e: logger.warning(f"Could not resolve JobContext for observability overrides: {e}") if ctx is not None and ctx.room_options is not None: if not run_until_shutdown: logger.warning( "observability= passed to session.start() but run_until_shutdown=False; " "overrides only apply if ctx.connect() has not yet run." ) ctx.room_options.observability = observability else: logger.warning( "observability= passed to session.start() but no active JobContext " "(or no RoomOptions) was found; ignoring." ) if run_until_shutdown: try: ctx = get_current_job_context() if ctx: logger.info("Starting session with full lifecycle management") await ctx.run_until_shutdown( session=self, wait_for_participant=wait_for_participant ) return else: logger.warning( "run_until_shutdown=True requires a JobContext, " "falling back to normal start()" ) except Exception as e: logger.warning( f"Failed to get JobContext for run_until_shutdown: {e}, " "falling back to normal start()" ) self._emit_agent_state(AgentState.STARTING) if self.agent._mcp_servers: await self.agent.initialize_mcp() if self.dtmf_handler: await self.dtmf_handler.start() # Configure metrics with session info metrics_collector.set_system_instructions(self.agent.instructions) # Set provider info based on pipeline components if not self.pipeline.config.is_realtime: if self.pipeline.stt: p_class, p_model = self._get_provider_info(self.pipeline.stt, 'stt') metrics_collector.set_provider_info("stt", p_class, p_model) if self.pipeline.llm: p_class, p_model = self._get_provider_info(self.pipeline.llm, 'llm') metrics_collector.set_provider_info("llm", p_class, p_model) if self.pipeline.tts: p_class, p_model = self._get_provider_info(self.pipeline.tts, 'tts') metrics_collector.set_provider_info("tts", p_class, p_model) if hasattr(self.pipeline, 'vad') and self.pipeline.vad: p_class, p_model = self._get_provider_info(self.pipeline.vad, 'vad') metrics_collector.set_provider_info("vad", p_class, p_model) if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector: p_class, p_model = self._get_provider_info(self.pipeline.turn_detector, 'eou') metrics_collector.set_provider_info("eou", p_class, p_model) else: if self.pipeline._realtime_model: metrics_collector.set_provider_info("realtime", format_provider_class(self.pipeline._realtime_model), getattr(self.pipeline._realtime_model, 'model', '')) if self.pipeline.stt: p_class, p_model = self._get_provider_info(self.pipeline.stt, 'stt') metrics_collector.set_provider_info("stt", p_class, p_model) if self.pipeline.tts: p_class, p_model = self._get_provider_info(self.pipeline.tts, 'tts') metrics_collector.set_provider_info("tts", p_class, p_model) # Traces flow manager setup traces_flow_manager = metrics_collector.traces_flow_manager if traces_flow_manager: config_attributes = { "system_instructions": self.agent.instructions, "function_tools": [ get_tool_info(tool).name for tool in ( [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools] if self.agent.mcp_manager else self.agent.tools ) ] if self.agent.tools else [], "mcp_tools": [ tool._tool_info.name for tool in self.agent.mcp_manager.tools ] if self.agent.mcp_manager else [], "pipeline": self.pipeline.__class__.__name__, "pipeline_mode": self.pipeline.config.pipeline_mode.value, "transport_mode": metrics_collector.transport_mode } start_time = time.perf_counter() config_attributes["start_time"] = start_time await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({"start_time": start_time}) if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) await self.pipeline.start() if hasattr(self.agent, 'a2a'): self.agent.a2a._attach_deferred_listeners() if self._should_delay_for_sip_user(): logger.info("SIP user detected, waiting for audio stream to be enabled before calling on_enter") audio_stream_enabled = asyncio.Event() def on_audio_stream_enabled(data): stream = data.get("stream") participant = data.get("participant") if stream and stream.kind == "audio" and participant and participant.meta_data.get("sipUser"): logger.info(f"SIP user audio stream enabled for participant {participant.id}") audio_stream_enabled.set() global_event_emitter.on("AUDIO_STREAM_ENABLED", on_audio_stream_enabled) async def wait_and_start(): try: await audio_stream_enabled.wait() logger.info("SIP user audio stream enabled, proceeding with on_enter") await self.agent.on_enter() self._accept_user_input = True global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer() self._emit_agent_state(AgentState.IDLE) except Exception as e: logger.error(f"Error in wait_and_start: {e}") finally: global_event_emitter.off("AUDIO_STREAM_ENABLED", on_audio_stream_enabled) asyncio.create_task(wait_and_start()) return await self.agent.on_enter() self._accept_user_input = True global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer() self._emit_agent_state(AgentState.IDLE)Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set) 5. Optionally handle full lifecycle management (connect, wait, shutdown)
Args
wait_for_participant- If True, wait for a participant to join before starting
run_until_shutdown- If True, manage the full lifecycle including connection, waiting for shutdown signals, and cleanup. This is a convenience that internally calls ctx.run_until_shutdown() with this session.
observability- Grouped recording/traces/metrics/logs config. When passed, it
is applied to
ctx.room_options.observabilitybefore the room connects, taking precedence over any value set onRoomOptions. Requiresrun_until_shutdown=True; a warning is logged otherwise. **kwargs- Additional arguments to pass to the pipeline start method
Examples
Simple start (manual lifecycle management):
await session.start()Full lifecycle management (recommended):
await session.start(wait_for_participant=True, run_until_shutdown=True)Inline observability:
await session.start( wait_for_participant=True, run_until_shutdown=True, observability=ObservabilityOptions( recording=RecordingOptions(video=True), ), ) async def start_thinking_audio(self)-
Expand source code
async def start_thinking_audio(self): """Start thinking audio""" if ( self._background_audio_player and self._background_audio_player.is_playing and not self._override_thinking ): return if self._thinking_audio_player and self._thinking_audio_player.is_playing: return audio_track = self._get_audio_track() if not hasattr(audio_track, 'add_background_bytes'): logger.warning( "Cannot play 'thinking' audio. This feature requires the mixing audio track. " "Enable it by setting `background_audio=True` in RoomOptions." ) return if self.agent._thinking_background_config and audio_track: self._thinking_audio_player = BackgroundAudioHandler(self.agent._thinking_background_config, audio_track) await self._thinking_audio_player.start() # Track thinking audio start for metrics metrics_collector.on_thinking_audio_start( file_path=self.agent._thinking_background_config.file_path, looping=self.agent._thinking_background_config.looping )Start thinking audio
async def stop_background_audio(self) ‑> None-
Expand source code
async def stop_background_audio(self) -> None: """Stop background audio on demand""" if self._background_audio_player: await self._background_audio_player.stop() self._background_audio_player = None # Track background audio stop for metrics metrics_collector.on_background_audio_stop() self._override_thinking = False if self._thinking_was_playing: await self.start_thinking_audio() self._thinking_was_playing = FalseStop background audio on demand
async def stop_thinking_audio(self)-
Expand source code
async def stop_thinking_audio(self): """Stop thinking audio""" if self._thinking_audio_player: await self._thinking_audio_player.stop() self._thinking_audio_player = None # Track thinking audio stop for metrics metrics_collector.on_thinking_audio_stop()Stop thinking audio
async def warm_transfer(self, config: "'WarmTransferConfig'") ‑> 'WarmTransferResult'-
Expand source code
async def warm_transfer(self, config: "WarmTransferConfig") -> "WarmTransferResult": """Run a SIP-to-SIP warm transfer to a human supervisor. See :mod:`videosdk.agents.warm_transfer` for the config surface and the ``@session.on_warm_transfer(...)`` phase hook. The transfer runs in its own task and is shielded from the caller of this method being cancelled (this is usually invoked from a ``@function_tool``, and that task gets cancelled whenever the caller talks over the agent / triggers an interruption — which must not abort an in-flight transfer). If the awaiting tool is cancelled, the transfer keeps running to completion in the background; the result is simply not returned. """ from .warm_transfer import WarmTransferRunner, WarmTransferError if self._warm_transfer_in_progress: raise WarmTransferError("A warm transfer is already in progress for this session.") self._warm_transfer_in_progress = True self._warm_transfer_task = asyncio.ensure_future(WarmTransferRunner(self, config).run()) def _clear(_task: "asyncio.Task") -> None: self._warm_transfer_in_progress = False self._warm_transfer_task = None self._warm_transfer_task.add_done_callback(_clear) return await asyncio.shield(self._warm_transfer_task)Run a SIP-to-SIP warm transfer to a human supervisor.
See :mod:
videosdk.agents.warm_transferfor the config surface and the@session.on_warm_transfer(...)phase hook.The transfer runs in its own task and is shielded from the caller of this method being cancelled (this is usually invoked from a
@function_tool, and that task gets cancelled whenever the caller talks over the agent / triggers an interruption — which must not abort an in-flight transfer). If the awaiting tool is cancelled, the transfer keeps running to completion in the background; the result is simply not returned.
Inherited members