Module agents.conversation_flow
Classes
class ConversationFlow (agent: Agent | None = None,
stt: STT | None = None,
llm: LLM | None = None,
tts: TTS | None = None,
vad: VAD | None = None,
turn_detector: EOU | None = None,
denoise: Denoise | None = None,
avatar: Any | None = None)-
Expand source code
class ConversationFlow(EventEmitter[Literal["transcription"]], ABC): """ Manages the conversation flow by listening to transcription events. """ def __init__(self, agent: Agent | None = None, stt: STT | None = None, llm: LLM | None = None, tts: TTS | None = None, vad: VAD | None = None, turn_detector: EOU | None = None, denoise: Denoise | None = None, avatar: Any | None = None) -> None: """Initialize conversation flow with event emitter capabilities""" super().__init__() self.transcription_callback: Callable[[ STTResponse], Awaitable[None]] | None = None self.stt = stt self.llm = llm self.tts = tts self.audio_track = None self.vad = vad self.turn_detector = turn_detector self.agent = agent self.denoise = denoise self.avatar = avatar self._stt_started = False self.stt_lock = asyncio.Lock() self.llm_lock = asyncio.Lock() self.tts_lock = asyncio.Lock() self.user_speech_callback: Callable[[], None] | None = None if self.stt: self.stt.on_stt_transcript(self.on_stt_transcript) if self.vad: self.vad.on_vad_event(self.on_vad_event) self._current_tts_task: asyncio.Task | None = None self._current_llm_task: asyncio.Task | None = None self._partial_response = "" self._is_interrupted = False self._accumulated_transcript = "" self._waiting_for_more_speech = False self._wait_timer: asyncio.TimerHandle | None = None self._transcript_processing_lock = asyncio.Lock() self.min_speech_wait_timeout = 0.5 self.max_speech_wait_timeout = 0.8 self.mode: Literal["ADAPTIVE", "DEFAULT"] = "DEFAULT" self.eou_certainty_threshold = 0.85 self.interrupt_mode: Literal["VAD_ONLY", "STT_ONLY", "HYBRID"] = "HYBRID" self.interrupt_min_duration = 0.5 self.interrupt_min_words = 1 self.false_interrupt_pause_duration = 2.0 self.resume_on_false_interrupt = False self._is_in_false_interrupt_pause = False self._false_interrupt_timer: asyncio.TimerHandle | None = None self._false_interrupt_paused_speech = False self._is_user_speaking = False # Preemptive generation state self._preemptive_transcript: str | None = None self._preemptive_lock = asyncio.Lock() self._preemptive_generation_task: asyncio.Task | None = None self._preemptive_authorized = asyncio.Event() # Authorization gate self._preemptive_cancelled = False # Voice Mail detection state self.voice_mail_detector: VoiceMailDetector | None = None self.voice_mail_detection_done = False self._vmd_buffer = "" self._vmd_check_task: asyncio.Task | None = None # Conversational Graph self.conversational_graph = None # Context truncation self.max_context_items: int | None = None def apply_flow_config(self, eou_config: "EOUConfig", interrupt_config: "InterruptConfig") -> None: """Override default timing/interaction parameters using pipeline config.""" self.mode = eou_config.mode self.min_speech_wait_timeout = eou_config.min_max_speech_wait_timeout[0] self.max_speech_wait_timeout = eou_config.min_max_speech_wait_timeout[1] self.interrupt_mode = interrupt_config.mode self.interrupt_min_duration = interrupt_config.interrupt_min_duration self.interrupt_min_words = interrupt_config.interrupt_min_words self.false_interrupt_pause_duration = interrupt_config.false_interrupt_pause_duration self.resume_on_false_interrupt = interrupt_config.resume_on_false_interrupt def _update_preemptive_generation_flag(self) -> None: """Update the preemptive generation flag based on current STT instance""" self._enable_preemptive_generation = getattr(self.stt, 'enable_preemptive_generation', False) if self.stt else False cascading_metrics_collector.set_preemptive_generation_enabled() async def start(self) -> None: global_event_emitter.on("speech_started", self.on_speech_started_stt) global_event_emitter.on("speech_stopped", self.on_speech_stopped_stt) if self.agent and self.agent.instructions: cascading_metrics_collector.set_system_instructions( self.agent.instructions) def set_voice_mail_detector(self, detector: VoiceMailDetector | None) -> None: """Configures voicemail detection. Called by AgentSession.""" self.voice_mail_detector = detector self.voice_mail_detection_done = False self._vmd_buffer = "" def on_transcription(self, callback: Callable[[str], None]) -> None: """ Set the callback for transcription events. Args: callback: Function to call when transcription occurs, takes transcribed text as argument """ self.on("transcription_event", lambda data: callback(data["text"])) async def send_audio_delta(self, audio_data: bytes) -> None: """ Send audio delta to the STT """ asyncio.create_task(self._process_audio_delta(audio_data)) async def _process_audio_delta(self, audio_data: bytes) -> None: """Background processing of audio delta""" try: if self.denoise: audio_data = await self.denoise.denoise(audio_data) if self.stt: async with self.stt_lock: await self.stt.process_audio(audio_data) if self.vad: await self.vad.process_audio(audio_data) except Exception as e: self.emit("error", f"Audio processing failed: {str(e)}") async def on_vad_event(self, vad_response: VADResponse) -> None: """Handle VAD events with interruption logic""" if (self.agent and self.agent.session and self.agent.session.agent_state == AgentState.SPEAKING): if vad_response.event_type == VADEventType.START_OF_SPEECH: if not hasattr(self, '_interruption_check_task') or self._interruption_check_task.done(): logger.info("User started speaking during agent response, initiating interruption monitoring") self._interruption_check_task = asyncio.create_task( self._monitor_interruption_duration() ) return elif vad_response.event_type == VADEventType.END_OF_SPEECH: if hasattr(self, '_interruption_check_task') and not self._interruption_check_task.done(): logger.info("User stopped speaking, cancelling interruption check") self._interruption_check_task.cancel() return if vad_response.event_type == VADEventType.START_OF_SPEECH: self._is_user_speaking = True if self._waiting_for_more_speech: logger.debug("User continued speaking, cancelling wait timer") await self._handle_continued_speech() await self.on_speech_started() elif vad_response.event_type == VADEventType.END_OF_SPEECH: self._is_user_speaking = False self.on_speech_stopped() async def _monitor_interruption_duration(self) -> None: """ Monitor user speech duration during agent response. Triggers interruption if speech exceeds the configured threshold. """ logger.debug(f"Interruption monitoring started (mode={self.interrupt_mode}, threshold={self.interrupt_min_duration}s)") if self.interrupt_mode not in ("VAD_ONLY", "HYBRID"): logger.debug(f"Interruption mode is {self.interrupt_mode}, VAD monitoring not active") return try: await asyncio.sleep(self.interrupt_min_duration) if (self.agent.session and self.agent.session.current_utterance and self.agent.session.current_utterance.is_interruptible): logger.info(f"User speech duration exceeded {self.interrupt_min_duration}s threshold, triggering interruption") await self._trigger_interruption() else: logger.debug("Interruption threshold reached but utterance is not interruptible") except asyncio.CancelledError: logger.debug("Interruption monitoring cancelled (user stopped speaking)") async def _handle_continued_speech(self) -> None: """Handle when user continues speaking while we're waiting""" if self._wait_timer: self._wait_timer.cancel() self._wait_timer = None self._waiting_for_more_speech = False async def on_stt_transcript(self, stt_response: STTResponse) -> None: """Handle STT transcript events with enhanced EOU logic""" utterance = self.agent.session.current_utterance if self.agent and self.agent.session else None if utterance and not utterance.is_interruptible and self.agent.session.agent_state == AgentState.SPEAKING: logger.info(f"Agent is playing non-interruptible message. Ignoring user speech until message completes.") return if self._waiting_for_more_speech: await self._handle_continued_speech() text = stt_response.data.text if stt_response.data else "" if self.voice_mail_detector and not self.voice_mail_detection_done and text.strip(): self._vmd_buffer += f" {text}" if not self._vmd_check_task: logger.info("Starting Voice Mail Detection Timer") self._vmd_check_task = asyncio.create_task(self._run_vmd_check()) if self.agent.session: state = self.agent.session.agent_state if state == AgentState.SPEAKING: logger.info(f"Agent is speaking, handling STT event") await self.handle_stt_event(text) elif state == AgentState.THINKING: if not self._enable_preemptive_generation: await self.handle_stt_event(text) if self.agent.session: self.agent.session._emit_user_state(UserState.SPEAKING) # Handle different event types if stt_response.event_type == SpeechEventType.PREFLIGHT: if cascading_metrics_collector.data.current_turn: cascading_metrics_collector.on_stt_preflight_end() await self._handle_preflight_transcript(text) elif stt_response.event_type == SpeechEventType.FINAL: if cascading_metrics_collector.data.current_turn: cascading_metrics_collector.data.current_turn.stt_preemptive_generation_occurred = False user_text = stt_response.data.text if self._enable_preemptive_generation: if cascading_metrics_collector.data.current_turn: cascading_metrics_collector.on_stt_complete() cascading_metrics_collector.data.current_turn.stt_preemptive_generation_occurred = True await self._authorize_or_process_final_transcript(user_text) else: await self._process_transcript_with_eou(user_text) elif stt_response.event_type == SpeechEventType.INTERIM: if cascading_metrics_collector.data.current_turn and self._enable_preemptive_generation: cascading_metrics_collector.on_stt_interim_end() if stt_response.metadata and stt_response.metadata.get("turn_resumed"): await self._handle_turn_resumed(text) async def _run_vmd_check(self) -> None: """Internal task to wait and check LLM, then emit result.""" try: if not self.voice_mail_detector: return await asyncio.sleep(self.voice_mail_detector.duration) is_voicemail = await self.voice_mail_detector.detect(self._vmd_buffer.strip()) self.voice_mail_detection_done = True if is_voicemail: await self._interrupt_tts() await self._cancel_llm() self.emit("voicemail_result", {"is_voicemail": is_voicemail}) except Exception as e: logger.error(f"Error in VMD check: {e}") self.voice_mail_detection_done = True self.emit("voicemail_result", {"is_voicemail": False}) finally: self._vmd_check_task = None self._vmd_buffer = "" async def _handle_preflight_transcript(self, preflight_text: str) -> None: """ Handle preflight transcript - start generation but wait for authorization. """ async with self._preemptive_lock: self._preemptive_transcript = preflight_text.strip() self._preemptive_authorized.clear() # Not authorized yet self._preemptive_cancelled = False user_text = preflight_text.strip() if self.agent.knowledge_base: kb_context = await self.agent.knowledge_base.process_query(user_text) if kb_context: user_text = f"{kb_context}\n\nUser: {user_text}" # Add preflight transcript to temporary context self.agent.chat_context.add_message( role=ChatRole.USER, content=user_text ) if self.agent and self.agent.session: if self.agent.session.current_utterance and not self.agent.session.current_utterance.done(): self.agent.session.current_utterance.interrupt() handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}") self.agent.session.current_utterance = handle else: handle = UtteranceHandle(utterance_id="utt_fallback") handle._mark_done() self._preemptive_generation_task = asyncio.create_task( self._generate_and_synthesize_response( user_text, handle, wait_for_authorization=True ) ) async def _process_transcript_with_eou(self, new_transcript: str) -> None: """Enhanced transcript processing with EOU-based decision making""" async with self._transcript_processing_lock: if self.agent.session: self.agent.session._emit_agent_state(AgentState.LISTENING) if self._accumulated_transcript: self._accumulated_transcript += " " + new_transcript else: self._accumulated_transcript = new_transcript if self.mode == 'DEFAULT': logger.info(f"DEFAULT Mode, using min speech wait timeout seconds {self.min_speech_wait_timeout} and max speech wait timeout seconds {self.max_speech_wait_timeout}") delay = self.min_speech_wait_timeout if self.turn_detector: logger.info(f"Turn detector is available, getting EOU probability") cascading_metrics_collector.on_eou_start() eou_probability = self.turn_detector.get_eou_probability(self.agent.chat_context) cascading_metrics_collector.on_eou_complete() logger.info(f"EOU probability: {eou_probability}") if eou_probability < self.eou_certainty_threshold: logger.info(f"EOU probability is less than the threshold, using max speech wait timeout") delay = self.max_speech_wait_timeout logger.info(f"Using delay: {delay} seconds") await self._wait_for_additional_speech(delay) elif self.mode == 'ADAPTIVE': logger.info(f"ADAPTIVE Mode, using min speech wait timeout seconds {self.min_speech_wait_timeout} and max speech wait timeout seconds {self.max_speech_wait_timeout}") delay = self.min_speech_wait_timeout if self.turn_detector: logger.info(f"Turn detector is available, getting EOU probability") cascading_metrics_collector.on_eou_start() eou_probability = self.turn_detector.get_eou_probability(self.agent.chat_context) cascading_metrics_collector.on_eou_complete() logger.info(f"EOU probability: {eou_probability}") logger.info(f"Calculating delay using sliding scale {self.min_speech_wait_timeout} to {self.max_speech_wait_timeout}") delay_range = self.max_speech_wait_timeout - self.min_speech_wait_timeout wait_factor = 1.0 - eou_probability logger.info(f"Wait factor: {wait_factor}") delay = self.min_speech_wait_timeout + (delay_range * wait_factor) logger.info(f"Calculated delay: {delay}") await self._wait_for_additional_speech(delay) async def _check_end_of_utterance(self, transcript: str) -> bool: """Check if the current transcript represents end of utterance""" if not self.turn_detector: return True temp_context = self.agent.chat_context.copy() temp_context.add_message(role=ChatRole.USER, content=transcript) cascading_metrics_collector.on_eou_start() is_eou = self.turn_detector.detect_end_of_utterance(temp_context) cascading_metrics_collector.on_eou_complete() return is_eou async def _wait_for_additional_speech(self, delay: float) -> None: """Wait for additional speech within the timeout period""" logger.info(f"Called _wait_for_additional_speech method, Waiting for additional speech for {delay} seconds") if self._waiting_for_more_speech: if self._wait_timer: self._wait_timer.cancel() self._waiting_for_more_speech = True loop = asyncio.get_event_loop() self._wait_timer = loop.call_later( delay, lambda: asyncio.create_task(self._on_speech_timeout()) ) async def _on_speech_timeout(self) -> None: """Handle timeout when no additional speech is detected""" async with self._transcript_processing_lock: if not self._waiting_for_more_speech: return self._waiting_for_more_speech = False self._wait_timer = None await self._finalize_transcript_and_respond() async def _finalize_transcript_and_respond(self) -> None: """Finalize the accumulated transcript and generate response""" if not self._accumulated_transcript.strip(): return final_transcript = self._accumulated_transcript.strip() logger.info(f"Finalizing transcript: '{final_transcript}'") self._accumulated_transcript = "" await self._process_final_transcript(final_transcript) async def _process_final_transcript(self, user_text: str) -> None: """Process final transcript with EOU detection and response generation""" if not cascading_metrics_collector.data.current_turn: cascading_metrics_collector.start_new_interaction() cascading_metrics_collector.set_user_transcript(user_text) cascading_metrics_collector.on_stt_complete() if self.vad and cascading_metrics_collector.data.is_user_speaking: cascading_metrics_collector.on_user_speech_end() elif not self.vad: cascading_metrics_collector.on_user_speech_end() final_user_text = user_text if self.agent.knowledge_base: kb_context = await self.agent.knowledge_base.process_query(user_text) if kb_context: final_user_text = f"{kb_context}\n\nUser: {user_text}" if self.conversational_graph: final_user_text = self.conversational_graph.handle_input(user_text) self.agent.chat_context.add_message( role=ChatRole.USER, content=final_user_text ) if self.agent and self.agent.session: if self.agent.session.current_utterance and not self.agent.session.current_utterance.done(): if self.agent.session.current_utterance.is_interruptible: self.agent.session.current_utterance.interrupt() else: logger.info("Current utterance is not interruptible. Skipping interruption in cascading pipeline.") handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}") self.agent.session.current_utterance = handle else: handle = UtteranceHandle(utterance_id="utt_fallback") handle._mark_done() await self.on_turn_start(final_user_text) # Generate response asyncio.create_task(self._generate_and_synthesize_response(final_user_text, handle)) await self.on_turn_end() async def _process_reply_instructions(self, instructions: str, wait_for_playback: bool, handle: UtteranceHandle, frames: list[av.VideoFrame] | None = None) -> None: """Process reply instructions and generate response using existing flow""" original_vad_handler = None original_stt_handler = None if wait_for_playback: if self.vad: original_vad_handler = self.on_vad_event self.on_vad_event = lambda x: None if self.stt: original_stt_handler = self.on_stt_transcript self.on_stt_transcript = lambda x: None try: final_instructions = instructions if self.agent.knowledge_base: kb_context = await self.agent.knowledge_base.process_query(instructions) if kb_context: final_instructions = f"{kb_context}\n\nUser: {instructions}" content_parts = [final_instructions] if frames: for frame in frames: image_part = ImageContent(image=frame, inference_detail="auto") content_parts.append(image_part) self.agent.chat_context.add_message( role=ChatRole.USER, content=content_parts if len(content_parts) > 1 else final_instructions ) await self.on_turn_start(final_instructions) await self._generate_and_synthesize_response(final_instructions, handle) await self.on_turn_end() if wait_for_playback: while (hasattr(cascading_metrics_collector.data, 'is_agent_speaking') and cascading_metrics_collector.data.is_agent_speaking): await asyncio.sleep(0.1) finally: if wait_for_playback: if original_vad_handler is not None: self.on_vad_event = original_vad_handler if original_stt_handler is not None: self.on_stt_transcript = original_stt_handler if not handle.done(): handle._mark_done() async def _authorize_or_process_final_transcript(self, final_text: str) -> None: """ Handle final transcript - authorize preemptive generation or start new. """ async with self._preemptive_lock: final_text_normalized = final_text.strip() if self._preemptive_transcript: preflight_normalized = self._preemptive_transcript.strip() # Compare transcripts if final_text_normalized == preflight_normalized: logger.info(f"MATCH! Authorizing preemptive generation") # Authorize the waiting TTS to play audio self._preemptive_authorized.set() # Wait for preemptive task to complete if self._preemptive_generation_task: try: await asyncio.wait_for( self._preemptive_generation_task, timeout=30.0 # Generous timeout for playback ) logger.info("Preemptive generation completed successfully") except asyncio.TimeoutError: logger.error("Preemptive playback timeout") except Exception as e: logger.error(f"Error in preemptive playback: {e}") else: logger.info(f"MISMATCH! Cancelling Preemptive Generation") # Cancel preemptive generation await self._cancel_preemptive_generation() # Remove the wrong user message from context if self.agent.chat_context.messages and \ self.agent.chat_context.messages[-1].role == ChatRole.USER: self.agent.chat_context.messages.pop() # Follow normal flow with correct transcript await self._process_transcript_with_eou(final_text_normalized) else: # No preflight, normal flow logger.info(f"No preflight, processing normally: '{final_text_normalized}'") await self._process_transcript_with_eou(final_text_normalized) # Cleanup self._preemptive_transcript = None self._preemptive_generation_task = None async def _cancel_preemptive_generation(self) -> None: """Cancel preemptive generation""" logger.info("Cancelling preemptive generation...") if self._enable_preemptive_generation: self._preemptive_cancelled = True self._preemptive_authorized.set() # Unblock to allow cancellation # Cancel the task if self._preemptive_generation_task and not self._preemptive_generation_task.done(): self._preemptive_generation_task.cancel() try: await self._preemptive_generation_task except asyncio.CancelledError: logger.info("Preemptive task cancelled successfully") self._preemptive_generation_task = None # Cancel LLM/TTS if self.llm: try: await self.llm.cancel_current_generation() except Exception as e: logger.debug(f"LLM cancellation: {e}") if self.tts: await self.tts.interrupt() self._preemptive_transcript = None logger.info("Preemptive generation cancelled and cleaned up") async def _handle_turn_resumed(self, resumed_text: str) -> None: """ Handle TurnResumed event (user continued speaking). Edge case: Cancel preemptive generation immediately. """ await self._cancel_preemptive_generation() # Update accumulated transcript if self._accumulated_transcript: self._accumulated_transcript += " " + resumed_text else: self._accumulated_transcript = resumed_text async def _generate_and_synthesize_response(self, user_text: str, handle: UtteranceHandle, wait_for_authorization: bool = False) -> None: """Generate agent response and manage handle lifecycle""" self._is_interrupted = False full_response = "" self._partial_response = "" try: if self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.start_thinking_audio() llm_stream = self.run(user_text) q = asyncio.Queue(maxsize=50) async def collector(): response_parts = [] metadata = None try: async for chunk in llm_stream: if handle.interrupted or (wait_for_authorization and self._preemptive_cancelled): logger.info("LLM collection interrupted") await q.put(None) return "".join(response_parts) content = chunk chunk_metadata = None if hasattr(chunk, "content"): content = chunk.content if hasattr(chunk, "metadata"): chunk_metadata = chunk.metadata if content: response_parts.append(content) await q.put(content) if chunk_metadata: metadata = chunk_metadata self._partial_response = "".join(response_parts) if not handle.interrupted: await q.put(None) if self.conversational_graph and metadata.get("graph_response"): _ = await self.conversational_graph.handle_decision(self.agent, metadata.get("graph_response")) return "".join(response_parts) except asyncio.CancelledError: logger.info("LLM collection cancelled") await q.put(None) return "".join(response_parts) async def tts_consumer(): """Consumes LLM chunks and sends to TTS with authorization gate""" # NEW: Wait for authorization if this is preemptive generation if wait_for_authorization: try: # Wait for authorization or cancellation await asyncio.wait_for( self._preemptive_authorized.wait(), timeout=10.0 # Safety timeout ) if self._preemptive_cancelled: logger.info("Preemptive generation cancelled during authorization wait") return except asyncio.TimeoutError: logger.error("Authorization timeout - cancelling preemptive generation") self._preemptive_cancelled = True return async def tts_stream_gen(): while True: if handle.interrupted or (wait_for_authorization and self._preemptive_cancelled): break try: chunk = await asyncio.wait_for(q.get(), timeout=0.1) if chunk is None: break yield chunk except asyncio.TimeoutError: if handle.interrupted or (wait_for_authorization and self._preemptive_cancelled): break continue if self.tts: try: await self._synthesize_with_tts(tts_stream_gen()) except asyncio.CancelledError: await self.tts.interrupt() collector_task = asyncio.create_task(collector()) tts_task = asyncio.create_task(tts_consumer()) self._current_llm_task = collector_task self._current_tts_task = tts_task try: await asyncio.gather(collector_task, tts_task, return_exceptions=True) except asyncio.CancelledError: if not collector_task.done(): collector_task.cancel() if not tts_task.done(): tts_task.cancel() if not collector_task.cancelled() and not self._is_interrupted: full_response = collector_task.result() else: full_response = self._partial_response if ( full_response and self.agent and getattr(self.agent, "chat_context", None) ): cascading_metrics_collector.set_agent_response(full_response) self.agent.chat_context.add_message( role=ChatRole.ASSISTANT, content=full_response ) finally: self._current_tts_task = None self._current_llm_task = None if not handle.done(): handle._mark_done() async def process_with_llm(self) -> AsyncIterator[str]: """ Process the current chat context with LLM and yield response chunks. This method can be called by user implementations to get LLM responses. """ async with self.llm_lock: if not self.llm: return if not self.agent or not getattr(self.agent, "chat_context", None): logger.info("Agent not available for LLM processing, exiting") return if self.max_context_items: current_items = len(self.agent.chat_context.items) if current_items > self.max_context_items: try: logger.info(f"Chat Context Truncating from {current_items} to {self.max_context_items} items (max_context_items={self.max_context_items})") self.agent.chat_context.truncate(self.max_context_items) logger.info(f"Chat Context Truncation complete. Final size: {len(self.agent.chat_context.items)} items") except Exception as e: logger.error(f"Chat Context Error during truncation: {e}", exc_info=True) else: logger.debug(f"Context size {current_items} is within limit (max_context_items={self.max_context_items})") cascading_metrics_collector.on_llm_start() first_chunk_received = False agent_session = getattr(self.agent, "session", None) if self.agent else None if agent_session: agent_session._emit_user_state(UserState.IDLE) agent_session._emit_agent_state(AgentState.THINKING) async for llm_chunk_resp in self.llm.chat( self.agent.chat_context, tools=self.agent._tools, conversational_graph=self.conversational_graph if self.conversational_graph else None ): if llm_chunk_resp.metadata and "usage" in llm_chunk_resp.metadata: cascading_metrics_collector.set_llm_usage(llm_chunk_resp.metadata["usage"]) if self._is_interrupted: logger.info("LLM processing interrupted") break if not self.agent or not getattr(self.agent, "chat_context", None): logger.info("Agent context unavailable, stopping LLM processing") break if not first_chunk_received: first_chunk_received = True cascading_metrics_collector.on_llm_first_token() if llm_chunk_resp.metadata and "function_call" in llm_chunk_resp.metadata: func_call = llm_chunk_resp.metadata["function_call"] cascading_metrics_collector.add_function_tool_call(func_call["name"]) chat_context = getattr(self.agent, "chat_context", None) if not chat_context: logger.info("Chat context missing while handling function call, aborting") return chat_context.add_function_call( name=func_call["name"], arguments=json.dumps(func_call["arguments"]), call_id=func_call.get( "call_id", f"call_{int(time.time())}") ) try: if not self.agent: logger.info("Agent cleaned up before selecting tool, aborting") return tool = next( (t for t in self.agent.tools if is_function_tool( t) and get_tool_info(t).name == func_call["name"]), None ) except Exception as e: logger.error(f"Error while selecting tool: {e}") continue if tool: agent_session = getattr(self.agent, "session", None) if self.agent else None if agent_session: agent_session._is_executing_tool = True try: result = await tool(**func_call["arguments"]) if isinstance(result, Agent): new_agent = result current_session = self.agent.session logger.info(f"Switching from agent {type(self.agent).__name__} to {type(new_agent).__name__}") if getattr(new_agent, 'inherit_context', True): logger.info(f"Inheriting context from {type(self.agent).__name__} to {type(new_agent).__name__}") logger.info(f"Chat context: {self.agent.chat_context.items}") new_agent.chat_context = self.agent.chat_context new_agent.chat_context.add_message( role=ChatRole.SYSTEM, content=new_agent.instructions, replace=True ) if hasattr(self.agent, 'on_speech_in'): current_session.off("on_speech_in", self.agent.on_speech_in) if hasattr(self.agent, 'on_speech_out'): current_session.off("on_speech_out", self.agent.on_speech_out) new_agent.session = current_session self.agent = new_agent current_session.agent = new_agent if hasattr(current_session.pipeline, 'set_agent'): current_session.pipeline.set_agent(new_agent) if hasattr(current_session.pipeline, 'set_conversation_flow'): current_session.pipeline.set_conversation_flow(self) if hasattr(new_agent, 'on_speech_in'): current_session.on("on_speech_in", new_agent.on_speech_in) if hasattr(new_agent, 'on_speech_out'): current_session.on("on_speech_out", new_agent.on_speech_out) if hasattr(new_agent, 'on_enter') and asyncio.iscoroutinefunction(new_agent.on_enter): await new_agent.on_enter() return chat_context = getattr(self.agent, "chat_context", None) if not chat_context: logger.info("Agent chat context missing after tool execution, stopping LLM processing") return chat_context.add_function_output( name=func_call["name"], output=json.dumps(result), call_id=func_call.get( "call_id", f"call_{int(time.time())}") ) async for new_resp in self.llm.chat( chat_context, tools=self.agent.tools, conversational_graph=self.conversational_graph if self.conversational_graph else None ): if self._is_interrupted: break if new_resp: yield ResponseChunk(new_resp.content, new_resp.metadata, new_resp.role) except Exception as e: logger.error( f"Error executing function {func_call['name']}: {e}") continue finally: agent_session = getattr(self.agent, "session", None) if self.agent else None if agent_session: agent_session._is_executing_tool = False else: if llm_chunk_resp: yield ResponseChunk(llm_chunk_resp.content, llm_chunk_resp.metadata, llm_chunk_resp.role) if not self._is_interrupted: cascading_metrics_collector.on_llm_complete() async def say(self, message: str, handle: UtteranceHandle) -> None: """ Direct TTS synthesis (used for initial messages) and manage handle lifecycle. """ if self.tts: cascading_metrics_collector.start_new_interaction("") cascading_metrics_collector.set_agent_response(message) try: await self._synthesize_with_tts(message) finally: handle._mark_done() async def process_text_input(self, text: str) -> None: """ Process text input directly (for A2A communication). This bypasses STT and directly processes the text through the LLM. """ cascading_metrics_collector.start_new_interaction(text) self.agent.chat_context.add_message( role=ChatRole.USER, content=text ) full_response = "" async for response_chunk in self.process_with_llm(): if response_chunk.content: full_response += response_chunk.content if full_response: cascading_metrics_collector.set_agent_response(full_response) cascading_metrics_collector.complete_current_turn() global_event_emitter.emit("text_response", {"text": full_response}) async def run(self, transcript: str) -> AsyncIterator[str]: """ Main conversation loop: handle a user turn. Users should implement this method to preprocess transcripts and yield response chunks. """ if not cascading_metrics_collector.data.current_turn: cascading_metrics_collector.start_new_interaction(transcript) async for response in self.process_with_llm(): yield response async def on_turn_start(self, transcript: str) -> None: """Called at the start of a user turn.""" pass async def on_turn_end(self) -> None: """Called at the end of a user turn.""" pass def on_speech_started_stt(self, event_data: Any) -> None: if self.user_speech_callback: self.user_speech_callback() if self.agent and self.agent.session: self.agent.session._emit_user_state(UserState.SPEAKING) def on_speech_stopped_stt(self, event_data: Any) -> None: pass async def handle_stt_event(self, text: str) -> None: """Handle STT event""" if not text or not text.strip(): return word_count = len(text.strip().split()) logger.info(f"handle_stt_event: Word count: {word_count}") if self.resume_on_false_interrupt and self._is_in_false_interrupt_pause and word_count >= self.interrupt_min_words: logger.info(f"[FALSE_INTERRUPT] STT transcript received while in paused state: '{text}' ({word_count} words). Confirming real interruption.") self._cancel_false_interrupt_timer() self._is_in_false_interrupt_pause = False self._false_interrupt_paused_speech = False logger.info("[FALSE_INTERRUPT] Clearing audio buffers and finalizing interruption.") await self._interrupt_tts() return if self.interrupt_mode in ("STT_ONLY", "HYBRID"): if word_count >= self.interrupt_min_words: if self.agent.session and self.agent.session.current_utterance and self.agent.session.current_utterance.is_interruptible: await self._trigger_interruption() else: logger.info("Interruption not allowed for the current utterance.") async def _trigger_interruption(self) -> None: """Trigger interruption once, respecting the utterance's interruptible flag.""" logger.info("Interruption triggered") if self._is_interrupted: logger.info("Already interrupted, skipping") return utterance = self.agent.session.current_utterance if self.agent and self.agent.session else None if utterance and not utterance.is_interruptible: logger.info("Interruption is disabled for the current utterance. Ignoring.") return self._is_interrupted = True can_resume = self.resume_on_false_interrupt and self.tts and self.tts.can_pause if can_resume: logger.info(f"[FALSE_INTERRUPT] Pausing TTS for potential resume. (resume_on_false_interrupt={self.resume_on_false_interrupt}, can_pause={self.tts.can_pause if self.tts else False})") self._false_interrupt_paused_speech = True self._is_in_false_interrupt_pause = True await self.tts.pause() self._start_false_interrupt_timer() else: logger.info("performing full interruption.") await self._interrupt_tts() def _start_false_interrupt_timer(self): """Starts a timer to detect if an interruption was just a brief false interrupt.""" if self._false_interrupt_timer: logger.info("[FALSE_INTERRUPT] Cancelling existing timer before starting a new one.") self._false_interrupt_timer.cancel() if self.false_interrupt_pause_duration is None: logger.info("[FALSE_INTERRUPT] Timeout is None, skipping timer.") return logger.info(f"[FALSE_INTERRUPT] Starting timer for {self.false_interrupt_pause_duration}s. If no STT transcript is received within this time, speech will resume.") loop = asyncio.get_event_loop() self._false_interrupt_timer = loop.call_later( self.false_interrupt_pause_duration, lambda: asyncio.create_task(self._on_false_interrupt_timeout()) ) def _cancel_false_interrupt_timer(self): """Cancels the false-interrupt timer if it's running.""" if self._false_interrupt_timer: logger.info("[FALSE_INTERRUPT] Cancelling false-interrupt timer - real interruption confirmed.") self._false_interrupt_timer.cancel() self._false_interrupt_timer = None async def _on_false_interrupt_timeout(self): """Called when the user remains silent after an interruption.""" logger.info(f"[FALSE_INTERRUPT] Timeout reached after {self.false_interrupt_pause_duration}s. User did not follow up with speech.") self._false_interrupt_timer = None if self._is_user_speaking: logger.info("[FALSE_INTERRUPT] User is still speaking at timeout. Confirming as a real interruption.") self._is_in_false_interrupt_pause = False self._false_interrupt_paused_speech = False await self._interrupt_tts() return if self._is_in_false_interrupt_pause and self.tts and self.tts.can_pause: logger.info("[FALSE_INTERRUPT] Resuming agent speech from paused position - false interruption detected.") self._is_interrupted = False self._is_in_false_interrupt_pause = False self._false_interrupt_paused_speech = False await self.tts.resume() else: if self._is_interrupted or self._false_interrupt_paused_speech: logger.info(f"[FALSE_INTERRUPT] Cannot resume (is_in_false_interrupt_pause={self._is_in_false_interrupt_pause}, can_pause={self.tts.can_pause if self.tts else False}). Finalizing interruption.") await self._interrupt_tts() else: logger.info("[FALSE_INTERRUPT] Timeout reached but no paused state found. No action needed.") async def on_speech_started(self) -> None: cascading_metrics_collector.on_user_speech_start() if self.user_speech_callback: self.user_speech_callback() if self._stt_started: self._stt_started = False utterance = self.agent.session.current_utterance if self.agent and self.agent.session else None if utterance and not utterance.is_interruptible: logger.info("Interruption is disabled for the current utterance. Not interrupting.") if self.agent and self.agent.session: self.agent.session._emit_user_state(UserState.SPEAKING) return self._cancel_false_interrupt_timer() can_resume = self.resume_on_false_interrupt and self.tts and self.tts.can_pause if self._false_interrupt_paused_speech: logger.info("User continued speaking, confirming interruption of paused speech.") self._false_interrupt_paused_speech = False await self._interrupt_tts() elif self.agent and self.agent.session and self.agent.session.agent_state == AgentState.SPEAKING: if can_resume: logger.info("Pausing agent speech for potential hesitation (will resume if user stops quickly).") await self._trigger_interruption() else: await self._interrupt_tts() if self.agent and self.agent.session: self.agent.session._emit_user_state(UserState.SPEAKING) async def _interrupt_tts(self) -> None: self._is_interrupted = True self._cancel_false_interrupt_timer() self._is_in_false_interrupt_pause = False self._false_interrupt_paused_speech = False if self.agent and self.agent.session and self.agent.session.current_utterance: if self.agent.session.current_utterance.is_interruptible: self.agent.session.current_utterance.interrupt() else: logger.info("Cannot interrupt non-interruptible utterance in _interrupt_tts") if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() if self._wait_timer: self._wait_timer.cancel() self._wait_timer = None self._waiting_for_more_speech = False if self.tts: await self.tts.interrupt() if self.avatar and hasattr(self.avatar, 'interrupt'): await self.avatar.interrupt() if self.llm: await self._cancel_llm() tasks_to_cancel = [] if self._current_tts_task and not self._current_tts_task.done(): tasks_to_cancel.append(self._current_tts_task) if self._current_llm_task and not self._current_llm_task.done(): tasks_to_cancel.append(self._current_llm_task) if tasks_to_cancel: for task in tasks_to_cancel: task.cancel() await asyncio.gather(*tasks_to_cancel, return_exceptions=True) self._partial_response = "" self._is_interrupted = False cascading_metrics_collector.on_interrupted() async def _cancel_llm(self) -> None: """Cancel LLM generation""" try: await self.llm.cancel_current_generation() cascading_metrics_collector.on_llm_complete() except Exception as e: logger.error(f"LLM cancellation failed: {e}") def on_speech_stopped(self) -> None: if not self._stt_started: cascading_metrics_collector.on_stt_start() self._stt_started = True cascading_metrics_collector.on_user_speech_end() if self.agent and self.agent.session: self.agent.session._emit_user_state(UserState.IDLE) async def _synthesize_with_tts(self, response_gen: AsyncIterator[str] | str) -> None: """ Stream LLM response directly to TTS. """ if not self.tts: return if self.agent and self.agent.session: self.agent.session._pause_wake_up_timer() if not self.audio_track: if self.agent and self.agent.session and hasattr(self.agent.session, "pipeline") and hasattr(self.agent.session.pipeline, "audio_track"): self.audio_track = self.agent.session.pipeline.audio_track else: logger.warning("[ConversationFlow] Audio track not found in pipeline — last audio callback will be skipped.") if self.audio_track and hasattr(self.audio_track, "enable_audio_input"): # Require manual re-enable so old audio never bleeds into the next utterance. self.audio_track.enable_audio_input(manual_control=True) async def on_first_audio_byte(): if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() cascading_metrics_collector.on_tts_first_byte() cascading_metrics_collector.on_agent_speech_start() if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.SPEAKING) self.agent.session._emit_user_state(UserState.LISTENING) async def on_last_audio_byte(): if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.IDLE) self.agent.session._emit_user_state(UserState.IDLE) logger.info("[TTS] Last audio byte processed — Agent and User set to IDLE") cascading_metrics_collector.on_agent_speech_end() cascading_metrics_collector.complete_current_turn() self.tts.on_first_audio_byte(on_first_audio_byte) if self.audio_track: if hasattr(self.audio_track, "on_last_audio_byte"): self.audio_track.on_last_audio_byte(on_last_audio_byte) else: logger.warning(f"[ConversationFlow] Audio track '{type(self.audio_track).__name__}' does not have 'on_last_audio_byte' method — skipping callback registration.") else: logger.warning("[ConversationFlow] Audio track not initialized — skipping last audio callback registration.") self.tts.reset_first_audio_tracking() cascading_metrics_collector.on_tts_start() try: response_iterator: AsyncIterator[str] if isinstance(response_gen, str): async def string_to_iterator(text: str): yield text response_iterator = string_to_iterator(response_gen) else: response_iterator = response_gen async def counting_wrapper(iterator: AsyncIterator[str]): async for chunk in iterator: if chunk: # Count characters and update metrics cascading_metrics_collector.add_tts_characters(len(chunk)) yield chunk await self.tts.synthesize(counting_wrapper(response_iterator)) finally: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() if self.agent and self.agent.session: self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() async def cleanup(self) -> None: """Cleanup conversation flow resources""" logger.info("Cleaning up conversation flow") if self._current_tts_task and not self._current_tts_task.done(): self._current_tts_task.cancel() try: await self._current_tts_task except asyncio.CancelledError: pass self._current_tts_task = None if self._current_llm_task and not self._current_llm_task.done(): self._current_llm_task.cancel() try: await self._current_llm_task except asyncio.CancelledError: pass self._current_llm_task = None if self._vmd_check_task and not self._vmd_check_task.done(): self._vmd_check_task.cancel() self.voice_mail_detector = None await self._cancel_preemptive_generation() if hasattr(self, 'agent') and self.agent and hasattr(self.agent, 'chat_context') and self.agent.chat_context: try: self.agent.chat_context.cleanup() logger.info("Agent chat context cleaned up") except Exception as e: logger.error(f"Error cleaning up agent chat context: {e}") self.transcription_callback = None self.user_speech_callback = None self.stt = None self.llm = None self.tts = None self.vad = None self.turn_detector = None self.agent = None self.denoise = None self._stt_started = False self._partial_response = "" self._is_interrupted = False logger.info("Conversation flow cleaned up")Manages the conversation flow by listening to transcription events.
Initialize conversation flow with event emitter capabilities
Ancestors
- EventEmitter
- typing.Generic
- abc.ABC
Methods
def apply_flow_config(self, eou_config: "'EOUConfig'", interrupt_config: "'InterruptConfig'") ‑> None-
Expand source code
def apply_flow_config(self, eou_config: "EOUConfig", interrupt_config: "InterruptConfig") -> None: """Override default timing/interaction parameters using pipeline config.""" self.mode = eou_config.mode self.min_speech_wait_timeout = eou_config.min_max_speech_wait_timeout[0] self.max_speech_wait_timeout = eou_config.min_max_speech_wait_timeout[1] self.interrupt_mode = interrupt_config.mode self.interrupt_min_duration = interrupt_config.interrupt_min_duration self.interrupt_min_words = interrupt_config.interrupt_min_words self.false_interrupt_pause_duration = interrupt_config.false_interrupt_pause_duration self.resume_on_false_interrupt = interrupt_config.resume_on_false_interruptOverride default timing/interaction parameters using pipeline config.
async def cleanup(self) ‑> None-
Expand source code
async def cleanup(self) -> None: """Cleanup conversation flow resources""" logger.info("Cleaning up conversation flow") if self._current_tts_task and not self._current_tts_task.done(): self._current_tts_task.cancel() try: await self._current_tts_task except asyncio.CancelledError: pass self._current_tts_task = None if self._current_llm_task and not self._current_llm_task.done(): self._current_llm_task.cancel() try: await self._current_llm_task except asyncio.CancelledError: pass self._current_llm_task = None if self._vmd_check_task and not self._vmd_check_task.done(): self._vmd_check_task.cancel() self.voice_mail_detector = None await self._cancel_preemptive_generation() if hasattr(self, 'agent') and self.agent and hasattr(self.agent, 'chat_context') and self.agent.chat_context: try: self.agent.chat_context.cleanup() logger.info("Agent chat context cleaned up") except Exception as e: logger.error(f"Error cleaning up agent chat context: {e}") self.transcription_callback = None self.user_speech_callback = None self.stt = None self.llm = None self.tts = None self.vad = None self.turn_detector = None self.agent = None self.denoise = None self._stt_started = False self._partial_response = "" self._is_interrupted = False logger.info("Conversation flow cleaned up")Cleanup conversation flow resources
async def handle_stt_event(self, text: str) ‑> None-
Expand source code
async def handle_stt_event(self, text: str) -> None: """Handle STT event""" if not text or not text.strip(): return word_count = len(text.strip().split()) logger.info(f"handle_stt_event: Word count: {word_count}") if self.resume_on_false_interrupt and self._is_in_false_interrupt_pause and word_count >= self.interrupt_min_words: logger.info(f"[FALSE_INTERRUPT] STT transcript received while in paused state: '{text}' ({word_count} words). Confirming real interruption.") self._cancel_false_interrupt_timer() self._is_in_false_interrupt_pause = False self._false_interrupt_paused_speech = False logger.info("[FALSE_INTERRUPT] Clearing audio buffers and finalizing interruption.") await self._interrupt_tts() return if self.interrupt_mode in ("STT_ONLY", "HYBRID"): if word_count >= self.interrupt_min_words: if self.agent.session and self.agent.session.current_utterance and self.agent.session.current_utterance.is_interruptible: await self._trigger_interruption() else: logger.info("Interruption not allowed for the current utterance.")Handle STT event
async def on_speech_started(self) ‑> None-
Expand source code
async def on_speech_started(self) -> None: cascading_metrics_collector.on_user_speech_start() if self.user_speech_callback: self.user_speech_callback() if self._stt_started: self._stt_started = False utterance = self.agent.session.current_utterance if self.agent and self.agent.session else None if utterance and not utterance.is_interruptible: logger.info("Interruption is disabled for the current utterance. Not interrupting.") if self.agent and self.agent.session: self.agent.session._emit_user_state(UserState.SPEAKING) return self._cancel_false_interrupt_timer() can_resume = self.resume_on_false_interrupt and self.tts and self.tts.can_pause if self._false_interrupt_paused_speech: logger.info("User continued speaking, confirming interruption of paused speech.") self._false_interrupt_paused_speech = False await self._interrupt_tts() elif self.agent and self.agent.session and self.agent.session.agent_state == AgentState.SPEAKING: if can_resume: logger.info("Pausing agent speech for potential hesitation (will resume if user stops quickly).") await self._trigger_interruption() else: await self._interrupt_tts() if self.agent and self.agent.session: self.agent.session._emit_user_state(UserState.SPEAKING) def on_speech_started_stt(self, event_data: Any) ‑> None-
Expand source code
def on_speech_started_stt(self, event_data: Any) -> None: if self.user_speech_callback: self.user_speech_callback() if self.agent and self.agent.session: self.agent.session._emit_user_state(UserState.SPEAKING) def on_speech_stopped(self) ‑> None-
Expand source code
def on_speech_stopped(self) -> None: if not self._stt_started: cascading_metrics_collector.on_stt_start() self._stt_started = True cascading_metrics_collector.on_user_speech_end() if self.agent and self.agent.session: self.agent.session._emit_user_state(UserState.IDLE) def on_speech_stopped_stt(self, event_data: Any) ‑> None-
Expand source code
def on_speech_stopped_stt(self, event_data: Any) -> None: pass async def on_stt_transcript(self, stt_response: STTResponse) ‑> None-
Expand source code
async def on_stt_transcript(self, stt_response: STTResponse) -> None: """Handle STT transcript events with enhanced EOU logic""" utterance = self.agent.session.current_utterance if self.agent and self.agent.session else None if utterance and not utterance.is_interruptible and self.agent.session.agent_state == AgentState.SPEAKING: logger.info(f"Agent is playing non-interruptible message. Ignoring user speech until message completes.") return if self._waiting_for_more_speech: await self._handle_continued_speech() text = stt_response.data.text if stt_response.data else "" if self.voice_mail_detector and not self.voice_mail_detection_done and text.strip(): self._vmd_buffer += f" {text}" if not self._vmd_check_task: logger.info("Starting Voice Mail Detection Timer") self._vmd_check_task = asyncio.create_task(self._run_vmd_check()) if self.agent.session: state = self.agent.session.agent_state if state == AgentState.SPEAKING: logger.info(f"Agent is speaking, handling STT event") await self.handle_stt_event(text) elif state == AgentState.THINKING: if not self._enable_preemptive_generation: await self.handle_stt_event(text) if self.agent.session: self.agent.session._emit_user_state(UserState.SPEAKING) # Handle different event types if stt_response.event_type == SpeechEventType.PREFLIGHT: if cascading_metrics_collector.data.current_turn: cascading_metrics_collector.on_stt_preflight_end() await self._handle_preflight_transcript(text) elif stt_response.event_type == SpeechEventType.FINAL: if cascading_metrics_collector.data.current_turn: cascading_metrics_collector.data.current_turn.stt_preemptive_generation_occurred = False user_text = stt_response.data.text if self._enable_preemptive_generation: if cascading_metrics_collector.data.current_turn: cascading_metrics_collector.on_stt_complete() cascading_metrics_collector.data.current_turn.stt_preemptive_generation_occurred = True await self._authorize_or_process_final_transcript(user_text) else: await self._process_transcript_with_eou(user_text) elif stt_response.event_type == SpeechEventType.INTERIM: if cascading_metrics_collector.data.current_turn and self._enable_preemptive_generation: cascading_metrics_collector.on_stt_interim_end() if stt_response.metadata and stt_response.metadata.get("turn_resumed"): await self._handle_turn_resumed(text)Handle STT transcript events with enhanced EOU logic
def on_transcription(self, callback: Callable[[str], None]) ‑> None-
Expand source code
def on_transcription(self, callback: Callable[[str], None]) -> None: """ Set the callback for transcription events. Args: callback: Function to call when transcription occurs, takes transcribed text as argument """ self.on("transcription_event", lambda data: callback(data["text"]))Set the callback for transcription events.
Args
callback- Function to call when transcription occurs, takes transcribed text as argument
async def on_turn_end(self) ‑> None-
Expand source code
async def on_turn_end(self) -> None: """Called at the end of a user turn.""" passCalled at the end of a user turn.
async def on_turn_start(self, transcript: str) ‑> None-
Expand source code
async def on_turn_start(self, transcript: str) -> None: """Called at the start of a user turn.""" passCalled at the start of a user turn.
async def on_vad_event(self, vad_response: VADResponse) ‑> None-
Expand source code
async def on_vad_event(self, vad_response: VADResponse) -> None: """Handle VAD events with interruption logic""" if (self.agent and self.agent.session and self.agent.session.agent_state == AgentState.SPEAKING): if vad_response.event_type == VADEventType.START_OF_SPEECH: if not hasattr(self, '_interruption_check_task') or self._interruption_check_task.done(): logger.info("User started speaking during agent response, initiating interruption monitoring") self._interruption_check_task = asyncio.create_task( self._monitor_interruption_duration() ) return elif vad_response.event_type == VADEventType.END_OF_SPEECH: if hasattr(self, '_interruption_check_task') and not self._interruption_check_task.done(): logger.info("User stopped speaking, cancelling interruption check") self._interruption_check_task.cancel() return if vad_response.event_type == VADEventType.START_OF_SPEECH: self._is_user_speaking = True if self._waiting_for_more_speech: logger.debug("User continued speaking, cancelling wait timer") await self._handle_continued_speech() await self.on_speech_started() elif vad_response.event_type == VADEventType.END_OF_SPEECH: self._is_user_speaking = False self.on_speech_stopped()Handle VAD events with interruption logic
async def process_text_input(self, text: str) ‑> None-
Expand source code
async def process_text_input(self, text: str) -> None: """ Process text input directly (for A2A communication). This bypasses STT and directly processes the text through the LLM. """ cascading_metrics_collector.start_new_interaction(text) self.agent.chat_context.add_message( role=ChatRole.USER, content=text ) full_response = "" async for response_chunk in self.process_with_llm(): if response_chunk.content: full_response += response_chunk.content if full_response: cascading_metrics_collector.set_agent_response(full_response) cascading_metrics_collector.complete_current_turn() global_event_emitter.emit("text_response", {"text": full_response})Process text input directly (for A2A communication). This bypasses STT and directly processes the text through the LLM.
async def process_with_llm(self) ‑> AsyncIterator[str]-
Expand source code
async def process_with_llm(self) -> AsyncIterator[str]: """ Process the current chat context with LLM and yield response chunks. This method can be called by user implementations to get LLM responses. """ async with self.llm_lock: if not self.llm: return if not self.agent or not getattr(self.agent, "chat_context", None): logger.info("Agent not available for LLM processing, exiting") return if self.max_context_items: current_items = len(self.agent.chat_context.items) if current_items > self.max_context_items: try: logger.info(f"Chat Context Truncating from {current_items} to {self.max_context_items} items (max_context_items={self.max_context_items})") self.agent.chat_context.truncate(self.max_context_items) logger.info(f"Chat Context Truncation complete. Final size: {len(self.agent.chat_context.items)} items") except Exception as e: logger.error(f"Chat Context Error during truncation: {e}", exc_info=True) else: logger.debug(f"Context size {current_items} is within limit (max_context_items={self.max_context_items})") cascading_metrics_collector.on_llm_start() first_chunk_received = False agent_session = getattr(self.agent, "session", None) if self.agent else None if agent_session: agent_session._emit_user_state(UserState.IDLE) agent_session._emit_agent_state(AgentState.THINKING) async for llm_chunk_resp in self.llm.chat( self.agent.chat_context, tools=self.agent._tools, conversational_graph=self.conversational_graph if self.conversational_graph else None ): if llm_chunk_resp.metadata and "usage" in llm_chunk_resp.metadata: cascading_metrics_collector.set_llm_usage(llm_chunk_resp.metadata["usage"]) if self._is_interrupted: logger.info("LLM processing interrupted") break if not self.agent or not getattr(self.agent, "chat_context", None): logger.info("Agent context unavailable, stopping LLM processing") break if not first_chunk_received: first_chunk_received = True cascading_metrics_collector.on_llm_first_token() if llm_chunk_resp.metadata and "function_call" in llm_chunk_resp.metadata: func_call = llm_chunk_resp.metadata["function_call"] cascading_metrics_collector.add_function_tool_call(func_call["name"]) chat_context = getattr(self.agent, "chat_context", None) if not chat_context: logger.info("Chat context missing while handling function call, aborting") return chat_context.add_function_call( name=func_call["name"], arguments=json.dumps(func_call["arguments"]), call_id=func_call.get( "call_id", f"call_{int(time.time())}") ) try: if not self.agent: logger.info("Agent cleaned up before selecting tool, aborting") return tool = next( (t for t in self.agent.tools if is_function_tool( t) and get_tool_info(t).name == func_call["name"]), None ) except Exception as e: logger.error(f"Error while selecting tool: {e}") continue if tool: agent_session = getattr(self.agent, "session", None) if self.agent else None if agent_session: agent_session._is_executing_tool = True try: result = await tool(**func_call["arguments"]) if isinstance(result, Agent): new_agent = result current_session = self.agent.session logger.info(f"Switching from agent {type(self.agent).__name__} to {type(new_agent).__name__}") if getattr(new_agent, 'inherit_context', True): logger.info(f"Inheriting context from {type(self.agent).__name__} to {type(new_agent).__name__}") logger.info(f"Chat context: {self.agent.chat_context.items}") new_agent.chat_context = self.agent.chat_context new_agent.chat_context.add_message( role=ChatRole.SYSTEM, content=new_agent.instructions, replace=True ) if hasattr(self.agent, 'on_speech_in'): current_session.off("on_speech_in", self.agent.on_speech_in) if hasattr(self.agent, 'on_speech_out'): current_session.off("on_speech_out", self.agent.on_speech_out) new_agent.session = current_session self.agent = new_agent current_session.agent = new_agent if hasattr(current_session.pipeline, 'set_agent'): current_session.pipeline.set_agent(new_agent) if hasattr(current_session.pipeline, 'set_conversation_flow'): current_session.pipeline.set_conversation_flow(self) if hasattr(new_agent, 'on_speech_in'): current_session.on("on_speech_in", new_agent.on_speech_in) if hasattr(new_agent, 'on_speech_out'): current_session.on("on_speech_out", new_agent.on_speech_out) if hasattr(new_agent, 'on_enter') and asyncio.iscoroutinefunction(new_agent.on_enter): await new_agent.on_enter() return chat_context = getattr(self.agent, "chat_context", None) if not chat_context: logger.info("Agent chat context missing after tool execution, stopping LLM processing") return chat_context.add_function_output( name=func_call["name"], output=json.dumps(result), call_id=func_call.get( "call_id", f"call_{int(time.time())}") ) async for new_resp in self.llm.chat( chat_context, tools=self.agent.tools, conversational_graph=self.conversational_graph if self.conversational_graph else None ): if self._is_interrupted: break if new_resp: yield ResponseChunk(new_resp.content, new_resp.metadata, new_resp.role) except Exception as e: logger.error( f"Error executing function {func_call['name']}: {e}") continue finally: agent_session = getattr(self.agent, "session", None) if self.agent else None if agent_session: agent_session._is_executing_tool = False else: if llm_chunk_resp: yield ResponseChunk(llm_chunk_resp.content, llm_chunk_resp.metadata, llm_chunk_resp.role) if not self._is_interrupted: cascading_metrics_collector.on_llm_complete()Process the current chat context with LLM and yield response chunks. This method can be called by user implementations to get LLM responses.
async def run(self, transcript: str) ‑> AsyncIterator[str]-
Expand source code
async def run(self, transcript: str) -> AsyncIterator[str]: """ Main conversation loop: handle a user turn. Users should implement this method to preprocess transcripts and yield response chunks. """ if not cascading_metrics_collector.data.current_turn: cascading_metrics_collector.start_new_interaction(transcript) async for response in self.process_with_llm(): yield responseMain conversation loop: handle a user turn. Users should implement this method to preprocess transcripts and yield response chunks.
async def say(self, message: str, handle: UtteranceHandle) ‑> None-
Expand source code
async def say(self, message: str, handle: UtteranceHandle) -> None: """ Direct TTS synthesis (used for initial messages) and manage handle lifecycle. """ if self.tts: cascading_metrics_collector.start_new_interaction("") cascading_metrics_collector.set_agent_response(message) try: await self._synthesize_with_tts(message) finally: handle._mark_done()Direct TTS synthesis (used for initial messages) and manage handle lifecycle.
async def send_audio_delta(self, audio_data: bytes) ‑> None-
Expand source code
async def send_audio_delta(self, audio_data: bytes) -> None: """ Send audio delta to the STT """ asyncio.create_task(self._process_audio_delta(audio_data))Send audio delta to the STT
def set_voice_mail_detector(self, detector: VoiceMailDetector | None) ‑> None-
Expand source code
def set_voice_mail_detector(self, detector: VoiceMailDetector | None) -> None: """Configures voicemail detection. Called by AgentSession.""" self.voice_mail_detector = detector self.voice_mail_detection_done = False self._vmd_buffer = ""Configures voicemail detection. Called by AgentSession.
async def start(self) ‑> None-
Expand source code
async def start(self) -> None: global_event_emitter.on("speech_started", self.on_speech_started_stt) global_event_emitter.on("speech_stopped", self.on_speech_stopped_stt) if self.agent and self.agent.instructions: cascading_metrics_collector.set_system_instructions( self.agent.instructions)