Module agents.agent_session
Classes
class AgentSession (agent: Agent,
pipeline: Pipeline,
conversation_flow: Optional[ConversationFlow] = None,
wake_up: Optional[int] = None,
background_audio: Optional[BackgroundAudioConfig] = None)-
Expand source code
class AgentSession(EventEmitter[Literal["user_state_changed", "agent_state_changed", "on_speech_in", "on_speech_out"]]): """ Manages an agent session with its associated conversation flow and pipeline. """ def __init__( self, agent: Agent, pipeline: Pipeline, conversation_flow: Optional[ConversationFlow] = None, wake_up: Optional[int] = None, background_audio: Optional[BackgroundAudioConfig] = None, ) -> None: """ Initialize an agent session. Args: agent: Instance of an Agent class that handles the core logic pipeline: Pipeline instance to process the agent's operations conversation_flow: ConversationFlow instance to manage conversation state wake_up: Time in seconds after which to trigger wake-up callback if no speech detected background_audio: Configuration for background audio (optional) """ super().__init__() self.agent = agent self.pipeline = pipeline self.conversation_flow = conversation_flow self.agent.session = self self.wake_up = wake_up self.on_wake_up: Optional[Callable[[], None] | Callable[[], Any]] = None self._wake_up_task: Optional[asyncio.Task] = None self._wake_up_timer_active = False self._closed: bool = False self._reply_in_progress: bool = False self._user_state: UserState = UserState.IDLE self._agent_state: AgentState = AgentState.IDLE if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) if background_audio and hasattr(self.pipeline, 'background_audio'): self.pipeline.background_audio = background_audio if ( hasattr(self.pipeline, "set_conversation_flow") and self.conversation_flow is not None ): self.pipeline.set_conversation_flow(self.conversation_flow) if hasattr(self.pipeline, 'set_wake_up_callback'): self.pipeline.set_wake_up_callback(self._reset_wake_up_timer) global_event_emitter.on("ON_SPEECH_IN", self._on_speech_in) global_event_emitter.on("ON_SPEECH_OUT", self._on_speech_out) try: job_ctx = get_current_job_context() if job_ctx: job_ctx.add_shutdown_callback(self.close) except Exception: pass def _on_speech_in(self, data: dict) -> None: self.emit("on_speech_in", data) def _on_speech_out(self, data: dict) -> None: self.emit("on_speech_out", data) def _start_wake_up_timer(self) -> None: if self.wake_up is not None and self.on_wake_up is not None: self._wake_up_timer_active = True self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop()) def _reset_wake_up_timer(self) -> None: if self.wake_up is not None and self.on_wake_up is not None: if self._reply_in_progress: return if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() self._wake_up_timer_active = True self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop()) def _pause_wake_up_timer(self) -> None: if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() def _cancel_wake_up_timer(self) -> None: if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() self._wake_up_timer_active = False async def _wake_up_timer_loop(self) -> None: try: await asyncio.sleep(self.wake_up) if self._wake_up_timer_active and self.on_wake_up and not self._reply_in_progress: if asyncio.iscoroutinefunction(self.on_wake_up): await self.on_wake_up() else: self.on_wake_up() except asyncio.CancelledError: pass def _emit_user_state(self, state: UserState, data: dict | None = None) -> None: if state != self._user_state: self._user_state = state payload = {"state": state.value, **(data or {})} self.emit("user_state_changed", payload) def _emit_agent_state(self, state: AgentState, data: dict | None = None) -> None: if state != self._agent_state: self._agent_state = state payload = {"state": state.value, **(data or {})} self.emit("agent_state_changed", payload) @property def user_state(self) -> UserState: return self._user_state @property def agent_state(self) -> AgentState: return self._agent_state async def start(self, **kwargs: Any) -> None: """ Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set) Args: **kwargs: Additional arguments to pass to the pipeline start method """ self._emit_agent_state(AgentState.STARTING) await self.agent.initialize_mcp() if isinstance(self.pipeline, RealTimePipeline): await realtime_metrics_collector.start_session(self.agent, self.pipeline) else: traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: config_attributes = { "system_instructions": self.agent.instructions, "function_tools": [ get_tool_info(tool).name for tool in ( [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools] if self.agent.mcp_manager else self.agent.tools ) ] if self.agent.tools else [], "mcp_tools": [ get_tool_info(tool).name for tool in self.agent.mcp_manager.tools ] if self.agent.mcp_manager else [], "pipeline": self.pipeline.__class__.__name__, **({ "stt_provider": self.pipeline.stt.__class__.__name__ if self.pipeline.stt else None, "tts_provider": self.pipeline.tts.__class__.__name__ if self.pipeline.tts else None, "llm_provider": self.pipeline.llm.__class__.__name__ if self.pipeline.llm else None, "vad_provider": self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else None, "eou_provider": self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None, "stt_model": self.pipeline.get_component_configs()['stt'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.stt else None, "llm_model": self.pipeline.get_component_configs()['llm'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.llm else None, "tts_model": self.pipeline.get_component_configs()['tts'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.tts else None, "vad_model": self.pipeline.get_component_configs()['vad'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'vad') and self.pipeline.vad else None, "eou_model": self.pipeline.get_component_configs()['eou'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None } if self.pipeline.__class__.__name__ == "CascadingPipeline" else {}), } start_time = time.perf_counter() config_attributes["start_time"] = start_time await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({"start_time": start_time}) if self.pipeline.__class__.__name__ == "CascadingPipeline": configs = self.pipeline.get_component_configs() if hasattr(self.pipeline, 'get_component_configs') else {} cascading_metrics_collector.set_provider_info( llm_provider=self.pipeline.llm.__class__.__name__ if self.pipeline.llm else "", llm_model=configs.get('llm', {}).get('model', "") if self.pipeline.llm else "", stt_provider=self.pipeline.stt.__class__.__name__ if self.pipeline.stt else "", stt_model=configs.get('stt', {}).get('model', "") if self.pipeline.stt else "", tts_provider=self.pipeline.tts.__class__.__name__ if self.pipeline.tts else "", tts_model=configs.get('tts', {}).get('model', "") if self.pipeline.tts else "", vad_provider=self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "", vad_model=configs.get('vad', {}).get('model', "") if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "", eou_provider=self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "", eou_model=configs.get('eou', {}).get('model', "") if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "" ) if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) self.on("on_speech_in", self.agent.on_speech_in) self.on("on_speech_out", self.agent.on_speech_out) await self.pipeline.start() await self.agent.on_enter() global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer() self._emit_agent_state(AgentState.IDLE) async def say(self, message: str) -> None: """ Send an initial message to the agent. """ if not isinstance(self.pipeline, RealTimePipeline): traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_say_called(message) self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message) await self.pipeline.send_message(message) async def reply(self, instructions: str, wait_for_playback: bool = True) -> None: """ Generate a response from agent using instructions and current chat context. Subsequent calls are discarded while the first one is still running. """ if not instructions: return if self._reply_in_progress: return self._reply_in_progress = True self._pause_wake_up_timer() try: if not isinstance(self.pipeline, RealTimePipeline): traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_reply_called(instructions) # Use the pipeline to handle the reply with wait_for_playback logic if hasattr(self.pipeline, 'reply_with_context'): await self.pipeline.reply_with_context(instructions, wait_for_playback) else: # Fallback for other pipeline types (like RealTimePipeline) if hasattr(self.pipeline, 'send_text_message'): await self.pipeline.send_text_message(instructions) else: await self.pipeline.send_message(instructions) finally: self._reply_in_progress = False def interrupt(self) -> None: """ Interrupt the agent. """ self.pipeline.interrupt() async def close(self) -> None: """ Close the agent session. """ logger.info("Closing agent session") if self._closed: logger.info("Agent session already closed") return self._closed = True self._emit_agent_state(AgentState.CLOSING) if isinstance(self.pipeline, RealTimePipeline): realtime_metrics_collector.finalize_session() traces_flow_manager = realtime_metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() else: traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() self._cancel_wake_up_timer() global_event_emitter.off("ON_SPEECH_IN", self._on_speech_in) global_event_emitter.off("ON_SPEECH_OUT", self._on_speech_out) self.off("on_speech_in", self.agent.on_speech_in) self.off("on_speech_out", self.agent.on_speech_out) logger.info("Cleaning up agent session") try: await self.agent.on_exit() except Exception as e: logger.error(f"Error in agent.on_exit(): {e}") try: await self.pipeline.cleanup() except Exception as e: logger.error(f"Error cleaning up pipeline: {e}") if self.conversation_flow: try: await self.conversation_flow.cleanup() except Exception as e: logger.error(f"Error cleaning up conversation flow: {e}") self.conversation_flow = None try: await self.agent.cleanup() except Exception as e: logger.error(f"Error cleaning up agent: {e}") self.agent = None self.pipeline = None self.on_wake_up = None self._wake_up_task = None logger.info("Agent session cleaned up") async def leave(self) -> None: """ Leave the agent session. """ self._emit_agent_state(AgentState.CLOSING) await self.pipeline.leave()Manages an agent session with its associated conversation flow and pipeline.
Initialize an agent session.
Args
agent- Instance of an Agent class that handles the core logic
pipeline- Pipeline instance to process the agent's operations
conversation_flow- ConversationFlow instance to manage conversation state
wake_up- Time in seconds after which to trigger wake-up callback if no speech detected
background_audio- Configuration for background audio (optional)
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop agent_state : AgentState-
Expand source code
@property def agent_state(self) -> AgentState: return self._agent_state prop user_state : UserState-
Expand source code
@property def user_state(self) -> UserState: return self._user_state
Methods
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """ Close the agent session. """ logger.info("Closing agent session") if self._closed: logger.info("Agent session already closed") return self._closed = True self._emit_agent_state(AgentState.CLOSING) if isinstance(self.pipeline, RealTimePipeline): realtime_metrics_collector.finalize_session() traces_flow_manager = realtime_metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() else: traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() self._cancel_wake_up_timer() global_event_emitter.off("ON_SPEECH_IN", self._on_speech_in) global_event_emitter.off("ON_SPEECH_OUT", self._on_speech_out) self.off("on_speech_in", self.agent.on_speech_in) self.off("on_speech_out", self.agent.on_speech_out) logger.info("Cleaning up agent session") try: await self.agent.on_exit() except Exception as e: logger.error(f"Error in agent.on_exit(): {e}") try: await self.pipeline.cleanup() except Exception as e: logger.error(f"Error cleaning up pipeline: {e}") if self.conversation_flow: try: await self.conversation_flow.cleanup() except Exception as e: logger.error(f"Error cleaning up conversation flow: {e}") self.conversation_flow = None try: await self.agent.cleanup() except Exception as e: logger.error(f"Error cleaning up agent: {e}") self.agent = None self.pipeline = None self.on_wake_up = None self._wake_up_task = None logger.info("Agent session cleaned up")Close the agent session.
def interrupt(self) ‑> None-
Expand source code
def interrupt(self) -> None: """ Interrupt the agent. """ self.pipeline.interrupt()Interrupt the agent.
async def leave(self) ‑> None-
Expand source code
async def leave(self) -> None: """ Leave the agent session. """ self._emit_agent_state(AgentState.CLOSING) await self.pipeline.leave()Leave the agent session.
async def reply(self, instructions: str, wait_for_playback: bool = True) ‑> None-
Expand source code
async def reply(self, instructions: str, wait_for_playback: bool = True) -> None: """ Generate a response from agent using instructions and current chat context. Subsequent calls are discarded while the first one is still running. """ if not instructions: return if self._reply_in_progress: return self._reply_in_progress = True self._pause_wake_up_timer() try: if not isinstance(self.pipeline, RealTimePipeline): traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_reply_called(instructions) # Use the pipeline to handle the reply with wait_for_playback logic if hasattr(self.pipeline, 'reply_with_context'): await self.pipeline.reply_with_context(instructions, wait_for_playback) else: # Fallback for other pipeline types (like RealTimePipeline) if hasattr(self.pipeline, 'send_text_message'): await self.pipeline.send_text_message(instructions) else: await self.pipeline.send_message(instructions) finally: self._reply_in_progress = FalseGenerate a response from agent using instructions and current chat context. Subsequent calls are discarded while the first one is still running.
async def say(self, message: str) ‑> None-
Expand source code
async def say(self, message: str) -> None: """ Send an initial message to the agent. """ if not isinstance(self.pipeline, RealTimePipeline): traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_say_called(message) self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message) await self.pipeline.send_message(message)Send an initial message to the agent.
async def start(self, **kwargs: Any) ‑> None-
Expand source code
async def start(self, **kwargs: Any) -> None: """ Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set) Args: **kwargs: Additional arguments to pass to the pipeline start method """ self._emit_agent_state(AgentState.STARTING) await self.agent.initialize_mcp() if isinstance(self.pipeline, RealTimePipeline): await realtime_metrics_collector.start_session(self.agent, self.pipeline) else: traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: config_attributes = { "system_instructions": self.agent.instructions, "function_tools": [ get_tool_info(tool).name for tool in ( [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools] if self.agent.mcp_manager else self.agent.tools ) ] if self.agent.tools else [], "mcp_tools": [ get_tool_info(tool).name for tool in self.agent.mcp_manager.tools ] if self.agent.mcp_manager else [], "pipeline": self.pipeline.__class__.__name__, **({ "stt_provider": self.pipeline.stt.__class__.__name__ if self.pipeline.stt else None, "tts_provider": self.pipeline.tts.__class__.__name__ if self.pipeline.tts else None, "llm_provider": self.pipeline.llm.__class__.__name__ if self.pipeline.llm else None, "vad_provider": self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else None, "eou_provider": self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None, "stt_model": self.pipeline.get_component_configs()['stt'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.stt else None, "llm_model": self.pipeline.get_component_configs()['llm'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.llm else None, "tts_model": self.pipeline.get_component_configs()['tts'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.tts else None, "vad_model": self.pipeline.get_component_configs()['vad'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'vad') and self.pipeline.vad else None, "eou_model": self.pipeline.get_component_configs()['eou'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None } if self.pipeline.__class__.__name__ == "CascadingPipeline" else {}), } start_time = time.perf_counter() config_attributes["start_time"] = start_time await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({"start_time": start_time}) if self.pipeline.__class__.__name__ == "CascadingPipeline": configs = self.pipeline.get_component_configs() if hasattr(self.pipeline, 'get_component_configs') else {} cascading_metrics_collector.set_provider_info( llm_provider=self.pipeline.llm.__class__.__name__ if self.pipeline.llm else "", llm_model=configs.get('llm', {}).get('model', "") if self.pipeline.llm else "", stt_provider=self.pipeline.stt.__class__.__name__ if self.pipeline.stt else "", stt_model=configs.get('stt', {}).get('model', "") if self.pipeline.stt else "", tts_provider=self.pipeline.tts.__class__.__name__ if self.pipeline.tts else "", tts_model=configs.get('tts', {}).get('model', "") if self.pipeline.tts else "", vad_provider=self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "", vad_model=configs.get('vad', {}).get('model', "") if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "", eou_provider=self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "", eou_model=configs.get('eou', {}).get('model', "") if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "" ) if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) self.on("on_speech_in", self.agent.on_speech_in) self.on("on_speech_out", self.agent.on_speech_out) await self.pipeline.start() await self.agent.on_enter() global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer() self._emit_agent_state(AgentState.IDLE)Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set)
Args
**kwargs- Additional arguments to pass to the pipeline start method