Module agents.pipeline_orchestrator
Classes
class PipelineOrchestrator (agent: Agent | None = None,
stt: STT | None = None,
llm: LLM | None = None,
tts: TTS | None = None,
vad: VAD | None = None,
turn_detector: EOU | None = None,
denoise: Denoise | None = None,
avatar: Any | None = None,
mode: "Literal['ADAPTIVE', 'DEFAULT']" = 'DEFAULT',
min_speech_wait_timeout: tuple[float, float] = (0.5, 0.8),
interrupt_mode: "Literal['VAD_ONLY', 'STT_ONLY', 'HYBRID']" = 'HYBRID',
interrupt_min_duration: float = 0.5,
interrupt_min_words: int = 2,
interrupt_min_confidence: float = 0.0,
false_interrupt_pause_duration: float = 2.0,
resume_on_false_interrupt: bool = False,
interrupt_fade_duration: float = 0.4,
graph_adapter: Any | None = None,
context_window: Any | None = None,
voice_mail_detector: VoiceMailDetector | None = None,
hooks: "'PipelineHooks | None'" = None,
chunker: "'SentenceChunker | None'" = None,
text_filter: "'TextFilter | None'" = None,
chunking_language: str = 'auto')-
Expand source code
class PipelineOrchestrator(EventEmitter[Literal[ "transcript_ready", "content_generated", "synthesis_complete", "voicemail_result", "error" ]]): """ Orchestrates the execution of speech understanding, content generation, and speech generation components. Supports various component chains: 1. Full Chain: VAD → STT → TurnD → LLM → TTS 2. No TTS: VAD → STT → TurnD → LLM (text output) 3. No STT: LLM → TTS (text input) 4. Hybrid: VAD → STT → [User Processing] → TTS Events: - transcript_ready: Transcript is ready for processing - content_generated: LLM has generated content - synthesis_complete: TTS synthesis is complete - voicemail_result: Voicemail detection result - error: Error occurred in pipeline """ def __init__( self, agent: Agent | None = None, stt: STT | None = None, llm: LLM | None = None, tts: TTS | None = None, vad: VAD | None = None, turn_detector: EOU | None = None, denoise: Denoise | None = None, avatar: Any | None = None, mode: Literal["ADAPTIVE", "DEFAULT"] = "DEFAULT", min_speech_wait_timeout: tuple[float, float] = (0.5, 0.8), interrupt_mode: Literal["VAD_ONLY", "STT_ONLY", "HYBRID"] = "HYBRID", interrupt_min_duration: float = 0.5, interrupt_min_words: int = 2, interrupt_min_confidence: float = 0.0, false_interrupt_pause_duration: float = 2.0, resume_on_false_interrupt: bool = False, interrupt_fade_duration: float = 0.4, graph_adapter: Any | None = None, context_window: Any | None = None, voice_mail_detector: VoiceMailDetector | None = None, hooks: "PipelineHooks | None" = None, chunker: "SentenceChunker | None" = None, text_filter: "TextFilter | None" = None, chunking_language: str = "auto", ) -> None: super().__init__() self.agent = agent self.avatar = avatar self.graph_adapter = graph_adapter self.voice_mail_detector = voice_mail_detector self.voice_mail_detection_done = False self._vmd_buffer = "" self._vmd_check_task: asyncio.Task | None = None self.hooks = hooks # Text chunking / filtering (cascade-mode only). self._chunker = chunker self._text_filter = text_filter self._chunking_language = chunking_language # Interruption configuration self.interrupt_mode = interrupt_mode self.interrupt_min_duration = interrupt_min_duration self.interrupt_min_words = interrupt_min_words self.interrupt_min_confidence = interrupt_min_confidence self.false_interrupt_pause_duration = false_interrupt_pause_duration self.resume_on_false_interrupt = resume_on_false_interrupt self.interrupt_fade_duration = interrupt_fade_duration # Interruption state self._generation_id = 0 self._is_interrupted = False self._is_in_false_interrupt_pause = False self._false_interrupt_paused_speech = False self._false_interrupt_timer: asyncio.TimerHandle | None = None self._is_user_speaking = False self._interruption_check_task: asyncio.Task | None = None self._speech_start_energy: float = 0.0 # Component modules self.speech_understanding: SpeechUnderstanding | None = None self.content_generation: ContentGeneration | None = None self.speech_generation: SpeechGeneration | None = None # Initialize components based on what's available if stt or vad or turn_detector: self.speech_understanding = SpeechUnderstanding( agent=agent, stt=stt, vad=vad, turn_detector=turn_detector, denoise=denoise, mode=mode, min_speech_wait_timeout=min_speech_wait_timeout[0], max_speech_wait_timeout=min_speech_wait_timeout[1], hooks=hooks, ) # Setup event listeners with sync wrappers self.speech_understanding.on("transcript_final", self._wrap_async(self._on_transcript_final)) self.speech_understanding.on("transcript_preflight", self._wrap_async(self._on_transcript_preflight)) self.speech_understanding.on("transcript_interim", self._wrap_async(self._on_transcript_interim)) self.speech_understanding.on("speech_started", self._wrap_async(self._on_speech_started)) self.speech_understanding.on("speech_stopped", self._wrap_async(self._on_speech_stopped)) self.speech_understanding.on("turn_resumed", self._wrap_async(self._on_turn_resumed)) if llm: self.content_generation = ContentGeneration( agent=agent, llm=llm, graph_adapter=graph_adapter, context_window=context_window, ) # Setup event listeners self.content_generation.on("generation_started", lambda data: logger.info("Content generation started")) self.content_generation.on("generation_chunk", lambda data: None) self.content_generation.on("generation_complete", lambda data: logger.info("Content generation complete")) if tts: self.speech_generation = SpeechGeneration( agent=agent, tts=tts, avatar=avatar, hooks=hooks, ) # Setup event listeners self.speech_generation.on("synthesis_started", lambda data: logger.info("Speech synthesis started")) self.speech_generation.on("first_audio_byte", lambda data: logger.info("First audio byte ready")) self.speech_generation.on("last_audio_byte", lambda data: logger.info("Synthesis complete")) self.speech_generation.on("synthesis_interrupted", lambda data: logger.info("Synthesis interrupted")) # Generation tasks self._current_generation_task: asyncio.Task | None = None self._partial_response = "" # Preemptive generation self._preemptive_generation_task: asyncio.Task | None = None self._preemptive_authorized = asyncio.Event() self._preemptive_cancelled = False def _wrap_async(self, async_func): """ Wrap an async function to be compatible with EventEmitter's sync-only handlers. Args: async_func: The async function to wrap Returns: A sync function that schedules the async function as a task """ def sync_wrapper(*args, **kwargs): asyncio.create_task(async_func(*args, **kwargs)) return sync_wrapper def set_audio_track(self, audio_track: Any) -> None: """Set audio track for TTS output""" if self.speech_generation: self.speech_generation.set_audio_track(audio_track) if audio_track is not None and hasattr(audio_track, "interrupt_fade_duration"): audio_track.interrupt_fade_duration = self.interrupt_fade_duration def set_voice_mail_detector(self, detector: VoiceMailDetector | None) -> None: """Configure voicemail detection""" self.voice_mail_detector = detector self.voice_mail_detection_done = False self._vmd_buffer = "" async def start(self) -> None: """Start all components""" if self.speech_understanding: await self.speech_understanding.start() if self.content_generation: await self.content_generation.start() if self.speech_generation: await self.speech_generation.start() logger.info("PipelineOrchestrator started") async def process_audio(self, audio_data: bytes) -> None: """ Process incoming audio through the pipeline. Args: audio_data: Raw audio bytes """ if self.speech_understanding: await self.speech_understanding.process_audio(audio_data) async def process_text(self, text: str) -> None: """ Process text input directly (bypasses STT). Args: text: User text input """ if not self.agent: logger.warning("No agent available for text processing") return metrics_collector.start_turn() metrics_collector.set_user_transcript(text) if self.graph_adapter: self.graph_adapter.set_agent(self.agent) graph_prompt, skip_llm = await self.graph_adapter.handle_input(text) if skip_llm: return if graph_prompt is not None: text = graph_prompt self.agent.chat_context.add_message( role=ChatRole.USER, content=text ) if not self.speech_understanding: metrics_collector.on_user_speech_start() metrics_collector.on_user_speech_end() if not self.speech_generation: metrics_collector.on_agent_speech_start() if self.content_generation: self.agent.session._emit_agent_state(AgentState.THINKING) full_response = "" graph_response_text = None async for response_chunk in self.content_generation.generate(text): if response_chunk.content: full_response += response_chunk.content if self.graph_adapter and response_chunk.metadata: if response_chunk.metadata.get("graph_response"): graph_response_text = response_chunk.metadata.get("graph_response") if self.graph_adapter and graph_response_text: new_response, skip = await self.graph_adapter.handle_decision( self.agent, graph_response_text ) if skip: return if new_response is not None: full_response = new_response if full_response: self.agent.chat_context.add_message( role=ChatRole.ASSISTANT, content=full_response ) self.emit("content_generated", {"text": full_response}) if self.hooks and self.hooks.has_llm_hooks(): try: await self.hooks.trigger_llm({"text": full_response}) except Exception as e: logger.error(f"Error in legacy LLM hook: {e}", exc_info=True) tts_text = full_response if self.hooks and self.hooks.has_llm_stream_hook(): async def _single_chunk(): yield full_response parts = [] async for chunk in self.hooks.process_llm_stream(_single_chunk()): parts.append(chunk) if parts: tts_text = "".join(parts) if self.speech_generation: await self.speech_generation.synthesize(tts_text) else: # No TTS - complete the turn after LLM self.agent.session._emit_agent_state(AgentState.IDLE) self.agent.session._emit_user_state(UserState.IDLE) metrics_collector.on_agent_speech_end() metrics_collector.set_agent_response(full_response) metrics_collector.complete_turn() self.emit("synthesis_complete", {}) def get_latest_transcript(self) -> str: """Get the latest accumulated transcript (for hybrid scenarios)""" if self.speech_understanding: return self.speech_understanding._accumulated_transcript return "" async def _on_transcript_final(self, data: dict) -> None: """Handle final transcript from speech understanding""" text = data["text"] is_preemptive = data.get("is_preemptive", False) agent_state = self.agent.session.agent_state if self.agent and self.agent.session else None cu = self.agent.session.current_utterance if self.agent and self.agent.session else None is_playing_audio = bool(self.speech_generation and self.speech_generation.is_speaking) playback_counts_for_turn = is_playing_audio and not (cu is not None and cu.done()) is_agent_active = (agent_state in (AgentState.SPEAKING, AgentState.THINKING)) or playback_counts_for_turn if is_agent_active: word_count = len(text.strip().split()) if text.strip() else 0 logger.info(f"[orchestrator] Final transcript while agent is {agent_state.value} | word_count={word_count}, min_words={self.interrupt_min_words}") if cu and not cu.is_interruptible and (not cu.done() or is_playing_audio): logger.info( f"[orchestrator] Transcript '{text}' received during non-interruptible playback, dropping" ) return if word_count < self.interrupt_min_words: logger.info(f"[orchestrator] Transcript '{text}' below min_words threshold ({word_count} < {self.interrupt_min_words}), dropping") return # Word count threshold met — check if utterance is interruptible if self.agent.session.current_utterance and not self.agent.session.current_utterance.is_interruptible: logger.info(f"[orchestrator] Transcript '{text}' meets min_words but utterance is not interruptible, dropping") return has_active_preemptive = ( is_preemptive and self._preemptive_generation_task is not None and not self._preemptive_generation_task.done() ) if has_active_preemptive: logger.info( f"[orchestrator] Word count {word_count} >= min_words " f"{self.interrupt_min_words}, deferring to preemptive handler" ) else: logger.info(f"[orchestrator] Word count {word_count} >= min_words {self.interrupt_min_words}, interrupting pipeline") await self._interrupt_pipeline() metrics_collector.start_turn() metrics_collector.set_user_transcript(text) if self.voice_mail_detector and not self.voice_mail_detection_done and text.strip(): self._vmd_buffer += f" {text}" if not self._vmd_check_task: logger.info("Starting Voice Mail Detection Timer") self._vmd_check_task = asyncio.create_task(self._run_vmd_check()) self.emit("transcript_ready", { "text": text, "is_final": True, "is_preemptive": is_preemptive, "metadata": data.get("metadata", {}) }) if is_preemptive: await self._handle_preemptive_final(text) else: await self._process_final_transcript(text) async def _on_transcript_preflight(self, data: dict) -> None: """Handle preflight transcript for preemptive generation""" preflight_text = data["text"] if self.agent and self.content_generation: context_prefix = None if self.agent.knowledge_base: kb_context = await self.agent.knowledge_base.process_query(preflight_text) if kb_context: context_prefix = kb_context self.agent.chat_context.add_message( role=ChatRole.USER, content=preflight_text ) if self.agent.session: if self.agent.session.current_utterance and not self.agent.session.current_utterance.done(): self.agent.session.current_utterance.interrupt() handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}") self.agent.session.current_utterance = handle else: handle = UtteranceHandle(utterance_id="utt_fallback") handle._mark_done() self._preemptive_authorized.clear() self._preemptive_cancelled = False if metrics_collector: metrics_collector.on_stt_preflight_end() self._preemptive_generation_task = asyncio.create_task( self._generate_and_synthesize(preflight_text, handle, wait_for_authorization=True, context_prefix=context_prefix) ) async def _handle_preemptive_final(self, final_text: str) -> None: """Handle final transcript when preemptive generation is active""" if not self.speech_understanding: return if self.speech_understanding.check_preemptive_match(final_text): logger.info("Preemptive generation MATCH - authorizing playback") self._preemptive_authorized.set() if self._preemptive_generation_task: try: await asyncio.wait_for(self._preemptive_generation_task, timeout=30.0) logger.info("Preemptive generation completed successfully") except asyncio.TimeoutError: logger.error("Preemptive playback timeout") except Exception as e: logger.error(f"Error in preemptive playback: {e}") else: logger.info("Preemptive generation MISMATCH - cancelling") await self._cancel_preemptive_generation() if self.agent: msgs = self.agent.chat_context.messages() if msgs and msgs[-1].role == ChatRole.USER: self.agent.chat_context._items.remove(msgs[-1]) await self._process_final_transcript(final_text) self.speech_understanding.clear_preemptive_state() self._preemptive_generation_task = None async def _on_transcript_interim(self, data: dict) -> None: """Handle interim transcript — check for STT-based interruption""" text = data.get("text", "") confidence = data.get("confidence", 1.0) if text and text.strip(): await self.handle_stt_event(text, confidence=confidence) async def _on_speech_started(self, data: dict) -> None: """Handle speech started event with VAD metadata.""" vad_confidence = data.get("confidence", 0.0) if isinstance(data, dict) else 0.0 vad_energy = data.get("energy", 0.0) if isinstance(data, dict) else 0.0 vad_speech_dur = data.get("speech_duration", 0.0) if isinstance(data, dict) else 0.0 logger.info( f"[orchestrator] _on_speech_started fired " f"| confidence={vad_confidence:.3f} energy={vad_energy:.4f} " f"speech_dur={vad_speech_dur:.3f}s" ) self._is_user_speaking = True self._speech_start_energy = vad_energy agent_state = self.agent.session.agent_state if self.agent and self.agent.session else None cu = self.agent.session.current_utterance if self.agent and self.agent.session else None is_synthesizing = bool(self.speech_generation and self.speech_generation.tts_lock.locked()) is_agent_active = ( agent_state in (AgentState.SPEAKING, AgentState.THINKING) or (cu is not None and not cu.done()) or is_synthesizing ) if is_agent_active: if self._interruption_check_task is None or self._interruption_check_task.done(): logger.info("User started speaking during agent response, initiating interruption monitoring") self._interruption_check_task = asyncio.create_task( self._monitor_interruption_duration() ) async def _on_speech_stopped(self, data: dict) -> None: """Handle speech stopped event with VAD metadata.""" speech_dur = data.get("speech_duration", 0.0) if isinstance(data, dict) else 0.0 has_audio = data.get("has_audio", False) if isinstance(data, dict) else False logger.info( f"[orchestrator] _on_speech_stopped fired " f"| speech_dur={speech_dur:.3f}s has_audio={has_audio}" ) self._is_user_speaking = False if self._interruption_check_task is not None and not self._interruption_check_task.done(): logger.info("User stopped speaking, cancelling interruption check") self._interruption_check_task.cancel() async def _on_turn_resumed(self, data: dict) -> None: """Handle turn resumed event""" await self._cancel_preemptive_generation() async def _process_final_transcript(self, user_text: str) -> None: """Process final transcript through the full pipeline""" if not self.agent: logger.warning("No agent available") return if self.hooks and self.hooks.has_user_turn_start_hooks(): await self.hooks.trigger_user_turn_start(user_text) context_prefix = None if self.agent.knowledge_base: kb_context = await self.agent.knowledge_base.process_query(user_text) if kb_context: context_prefix = kb_context final_user_text = user_text if not metrics_collector.current_turn: metrics_collector.start_turn() metrics_collector.set_user_transcript(user_text) self.agent.chat_context.add_message( role=ChatRole.USER, content=final_user_text ) if self.agent.session: if self.agent.session.current_utterance and not self.agent.session.current_utterance.done(): if self.agent.session.current_utterance.is_interruptible: self.agent.session.current_utterance.interrupt() else: logger.info("Current utterance is not interruptible") handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}") self.agent.session.current_utterance = handle else: handle = UtteranceHandle(utterance_id="utt_fallback") handle._mark_done() if self._current_generation_task and not self._current_generation_task.done(): self._current_generation_task.cancel() self._current_generation_task = asyncio.create_task( self._generate_and_synthesize(final_user_text, handle, context_prefix=context_prefix) ) async def _generate_and_synthesize( self, user_text: str, handle: UtteranceHandle, wait_for_authorization: bool = False, context_prefix: str | None = None ) -> None: """Generate LLM response and synthesize with TTS""" self._generation_id += 1 my_generation_id = self._generation_id self._is_interrupted = False full_response = "" self._partial_response = "" if self.speech_generation: self.speech_generation.spoken_transcript = "" try: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.start_thinking_audio() if self.speech_generation: self.speech_generation.reset_interrupt() if not self.content_generation and not self.graph_adapter: logger.warning("No content generation available") return if not self.speech_generation: logger.warning("No speech generation available") metrics_collector.on_agent_speech_start() if self.graph_adapter: self.graph_adapter.set_agent(self.agent) graph_prompt, skip_llm = await self.graph_adapter.handle_input(user_text) if skip_llm: return if graph_prompt is not None: user_text = graph_prompt msgs = self.agent.chat_context.messages() for msg in reversed(msgs): if msg.role == ChatRole.USER: msg.content = user_text break self.agent.session._emit_agent_state(AgentState.THINKING) llm_stream = self.content_generation.generate(user_text, context_prefix=context_prefix) q = asyncio.Queue(maxsize=50) graph_response_text = None async def collector(): """Collect LLM chunks and feed queue for TTS""" nonlocal graph_response_text response_parts = [] def _safe_set_agent_response(text: str) -> None: """Only set agent response if this collector still owns the current generation.""" if self.hooks and self.hooks.has_tts_stream_hook(): return if text and self._generation_id == my_generation_id and metrics_collector.current_turn: metrics_collector.set_agent_response(text) try: async for chunk in llm_stream: if handle.interrupted or (wait_for_authorization and self._preemptive_cancelled): logger.info("LLM collection interrupted") await q.put(None) full = "".join(response_parts) _safe_set_agent_response(full) return full content = chunk.content if hasattr(chunk, "content") else chunk metadata = chunk.metadata if hasattr(chunk, "metadata") else chunk if content: response_parts.append(content) await q.put(content) logger.debug("[chunking] LLM delta → queue: %r", content) if self.graph_adapter and metadata and isinstance(metadata, dict): if metadata.get("graph_response"): graph_response_text = metadata.get("graph_response") self._partial_response = "".join(response_parts) if not handle.interrupted: await q.put(None) full = "".join(response_parts) _safe_set_agent_response(full) return full except asyncio.CancelledError: logger.info("LLM collection cancelled") await q.put(None) full = "".join(response_parts) _safe_set_agent_response(full) return full async def tts_consumer(): """Consume LLM chunks and send to TTS""" if wait_for_authorization: try: await asyncio.wait_for( self._preemptive_authorized.wait(), timeout=10.0 ) if self._preemptive_cancelled: logger.info("Preemptive generation cancelled during authorization wait") return except asyncio.TimeoutError: logger.error("Authorization timeout - cancelling preemptive generation") self._preemptive_cancelled = True return async def tts_stream_gen(): """Generate TTS stream from queue""" while True: if handle.interrupted or (wait_for_authorization and self._preemptive_cancelled): break try: chunk = await asyncio.wait_for(q.get(), timeout=0.1) if chunk is None: break yield chunk except asyncio.TimeoutError: if handle.interrupted or (wait_for_authorization and self._preemptive_cancelled): break continue if self.speech_generation: playback_done = asyncio.Event() def _on_playback_done(_data: Any = None) -> None: playback_done.set() self.speech_generation.on("last_audio_byte", _on_playback_done) self.speech_generation.on("synthesis_interrupted", _on_playback_done) try: text_source = tts_stream_gen() if self._text_filter is not None: text_source = self._text_filter.filter(text_source) if self._chunker is not None: text_source = _pipe_through_chunk_stream( text_source, self._chunker, language=self._chunking_language, ) if self.hooks: text_source = self.hooks.process_llm_stream(text_source) await self.speech_generation.synthesize(text_source) await playback_done.wait() except asyncio.CancelledError: if self.speech_generation: await self.speech_generation.interrupt() finally: self.speech_generation.off("last_audio_byte", _on_playback_done) self.speech_generation.off("synthesis_interrupted", _on_playback_done) collector_task = asyncio.create_task(collector()) tts_task = asyncio.create_task(tts_consumer()) try: await asyncio.gather(collector_task, tts_task, return_exceptions=True) except asyncio.CancelledError: if not collector_task.done(): collector_task.cancel() if not tts_task.done(): tts_task.cancel() if not collector_task.cancelled() and not self._is_interrupted: full_response = collector_task.result() else: full_response = self._partial_response if self.graph_adapter and graph_response_text and not self._is_interrupted: new_response, skip = await self.graph_adapter.handle_decision( self.agent, graph_response_text ) if skip: return if new_response is not None: full_response = new_response if self._is_interrupted or self._generation_id != my_generation_id: logger.info( "[ctx-add] SKIP post-gather add (gen_id=%s my_gen_id=%s is_interrupted=%s partial_len=%d full_len=%d)", self._generation_id, my_generation_id, self._is_interrupted, len(self._partial_response or ""), len(full_response or ""), ) elif full_response and self.agent: logger.info( "[ctx-add] SITE-A post-gather add (gen_id=%s is_interrupted=%s len=%d preview=%r)", my_generation_id, self._is_interrupted, len(full_response), full_response, ) self.agent.chat_context.add_message( role=ChatRole.ASSISTANT, content=full_response ) self.emit("content_generated", {"text": full_response}) if self.hooks and self.hooks.has_llm_hooks(): try: await self.hooks.trigger_llm({"text": full_response}) except Exception as e: logger.error(f"Error in legacy LLM hook: {e}", exc_info=True) # For no-TTS modes, complete the turn here since there's no speech_generation if not self.speech_generation: self.agent.session._emit_user_state(UserState.IDLE) self.agent.session._emit_agent_state(AgentState.IDLE) metrics_collector.on_agent_speech_end() metrics_collector.complete_turn() finally: if self.hooks and self.hooks.has_user_turn_end_hooks(): await self.hooks.trigger_user_turn_end() if not handle.done(): handle._mark_done() if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() async def say( self, message: str, handle: UtteranceHandle, audio_data: bytes | bytearray | Iterable[bytes] | AsyncIterator[bytes] | None = None, ) -> None: """ Direct TTS synthesis (for initial messages). Args: message: Message to synthesize handle: Utterance handle to track audio_data: Optional pre-synthesized PCM bytes. When provided, bypasses TTS synthesis entirely and streams these bytes to the agent audio track. Chunker and TTS text filter are also skipped on this path. """ if not self.speech_generation: handle._mark_done() return playback_done = asyncio.Event() def _on_playback_done(_data: Any = None) -> None: playback_done.set() self.speech_generation.on("last_audio_byte", _on_playback_done) self.speech_generation.on("synthesis_interrupted", _on_playback_done) try: if audio_data is not None: metrics_collector.set_agent_response(message, emit_transport=True) await self.speech_generation.synthesize_audio_bytes( audio_data, message, handle=handle ) await playback_done.wait() return if not (self.hooks and self.hooks.has_tts_stream_hook()): tts = self.speech_generation.tts emit = not bool(getattr(tts, "supports_word_timestamps", False)) metrics_collector.set_agent_response(message, emit_transport=emit) if self._chunker is not None or self._text_filter is not None: async def _string_iter() -> AsyncIterator[str]: yield message text_source: AsyncIterator[Any] = _string_iter() if self._text_filter is not None: text_source = self._text_filter.filter(text_source) if self._chunker is not None: text_source = _pipe_through_chunk_stream( text_source, self._chunker, language=self._chunking_language, ) await self.speech_generation.synthesize(text_source) else: await self.speech_generation.synthesize(message) await playback_done.wait() finally: self.speech_generation.off("last_audio_byte", _on_playback_done) self.speech_generation.off("synthesis_interrupted", _on_playback_done) handle._mark_done() async def reply_with_context( self, instructions: str, wait_for_playback: bool, handle: UtteranceHandle, frames: list[av.VideoFrame] | None = None ) -> None: """ Generate a reply using instructions and current chat context. Args: instructions: Instructions to add to chat context wait_for_playback: If True, disable VAD/STT during response handle: Utterance handle frames: Optional video frames for vision """ if not self.agent: handle._mark_done() return original_handlers = {} if wait_for_playback and self.speech_understanding: if self.speech_understanding.vad: original_handlers['vad'] = self.speech_understanding._on_vad_event self.speech_understanding._on_vad_event = lambda x: None if self.speech_understanding.stt: original_handlers['stt'] = self.speech_understanding._on_stt_transcript self.speech_understanding._on_stt_transcript = lambda x: None try: context_prefix = None if self.agent.knowledge_base: kb_context = await self.agent.knowledge_base.process_query(instructions) if kb_context: context_prefix = kb_context content_parts = [instructions] if frames: for frame in frames: image_part = ImageContent(image=frame, inference_detail="auto") content_parts.append(image_part) self.agent.chat_context.add_message( role=ChatRole.USER, content=content_parts if len(content_parts) > 1 else instructions ) await self._generate_and_synthesize(instructions, handle, context_prefix=context_prefix) finally: if wait_for_playback and self.speech_understanding: if 'vad' in original_handlers: self.speech_understanding._on_vad_event = original_handlers['vad'] if 'stt' in original_handlers: self.speech_understanding._on_stt_transcript = original_handlers['stt'] if not handle.done(): handle._mark_done() async def _monitor_interruption_duration(self) -> None: """Monitor user speech duration during agent response. Polls the real-time VAD probability from SpeechUnderstanding every 50 ms instead of a single blind sleep. This lets the pipeline react the instant sustained speech crosses the configured duration threshold, and avoids false-triggering if the user stops speaking or the probability drops midway. """ if self.interrupt_mode not in ("VAD_ONLY", "HYBRID"): return try: elapsed = 0.0 poll_interval = 0.05 while elapsed < self.interrupt_min_duration: await asyncio.sleep(poll_interval) elapsed += poll_interval if not self._is_user_speaking: logger.debug( f"User stopped speaking at {elapsed:.2f}s " f"(< {self.interrupt_min_duration}s), aborting interruption check" ) return if self.speech_understanding: prob = self.speech_understanding.current_vad_probability if prob < 0.15: logger.debug( f"VAD probability dropped to {prob:.3f} during " f"interruption monitoring, resetting elapsed timer" ) elapsed = 0.0 if self.agent and self.agent.session and self.agent.session.current_utterance: if self.agent.session.current_utterance.is_interruptible: prob = 0.0 if self.speech_understanding: prob = self.speech_understanding.current_vad_probability if self.interrupt_min_confidence > 0.0 and prob < self.interrupt_min_confidence: logger.info( f"[orchestrator] VAD probability {prob:.3f} below " f"interrupt_min_confidence {self.interrupt_min_confidence:.3f} " f"at trigger time, aborting interruption" ) return logger.info( f"User speech exceeded {self.interrupt_min_duration}s " f"threshold (vad_prob={prob:.3f}), triggering interruption" ) await self._trigger_interruption() except asyncio.CancelledError: logger.debug("Interruption monitoring cancelled") async def handle_stt_event(self, text: str, confidence: float = 1.0) -> bool: """Handle STT event for interruption (word-based). Only processes interruptions when the agent is actively speaking or generating. Respects interrupt_min_words, interrupt_min_confidence, and interrupt_mode configuration. Returns: True if interruption was triggered, False otherwise. """ if not text or not text.strip(): return False # Only consider interruption when agent is actively speaking or thinking agent_state = self.agent.session.agent_state if self.agent and self.agent.session else None if agent_state not in (AgentState.SPEAKING, AgentState.THINKING): return False if self.interrupt_min_confidence > 0.0 and confidence < self.interrupt_min_confidence: logger.info( f"[orchestrator] Ignoring low-confidence transcript " f"(confidence={confidence:.3f} < {self.interrupt_min_confidence:.3f}): {text!r}" ) return False word_count = len(text.strip().split()) # Debug: log exact state of current_utterance if self.agent and self.agent.session and self.agent.session.current_utterance: cu = self.agent.session.current_utterance logger.info(f"[orchestrator] handle_stt_event: word_count={word_count}, min_words={self.interrupt_min_words}, mode={self.interrupt_mode}, utterance_id={cu.id}, interruptible={cu.is_interruptible}, done={cu.done()}") else: logger.info(f"[orchestrator] handle_stt_event: word_count={word_count}, min_words={self.interrupt_min_words}, mode={self.interrupt_mode}, no current_utterance") if self.resume_on_false_interrupt and self._is_in_false_interrupt_pause and word_count >= self.interrupt_min_words: logger.info(f"STT transcript received while in paused state, confirming real interruption") self._cancel_false_interrupt_timer() self._is_in_false_interrupt_pause = False self._false_interrupt_paused_speech = False await self._interrupt_pipeline() return True if self.interrupt_mode in ("STT_ONLY", "HYBRID"): if word_count >= self.interrupt_min_words: if self.agent and self.agent.session and self.agent.session.current_utterance: if self.agent.session.current_utterance.is_interruptible: logger.info(f"[orchestrator] Word count {word_count} >= min_words {self.interrupt_min_words}, triggering interruption") await self._trigger_interruption() return True else: logger.info(f"[orchestrator] Word count threshold met but utterance is not interruptible") else: logger.info(f"[orchestrator] Word count {word_count} < min_words {self.interrupt_min_words}, ignoring") return False async def _trigger_interruption(self) -> None: """Trigger interruption with optional pause/resume support""" if self._is_interrupted: return if self.agent and self.agent.session and self.agent.session.current_utterance: if not self.agent.session.current_utterance.is_interruptible: logger.info("Interruption disabled for current utterance") return self._is_interrupted = True can_resume = self.resume_on_false_interrupt and self.speech_generation and self.speech_generation.can_pause() if can_resume: logger.info("Pausing TTS for potential resume") self._false_interrupt_paused_speech = True self._is_in_false_interrupt_pause = True if self.speech_generation: await self.speech_generation.pause() self._start_false_interrupt_timer() else: logger.info("Performing full interruption") await self._interrupt_pipeline() def _start_false_interrupt_timer(self): """Start timer to detect false interrupts""" if self._false_interrupt_timer: self._false_interrupt_timer.cancel() if self.false_interrupt_pause_duration is None: return logger.info(f"Starting false interrupt timer for {self.false_interrupt_pause_duration}s") loop = asyncio.get_event_loop() self._false_interrupt_timer = loop.call_later( self.false_interrupt_pause_duration, lambda: asyncio.create_task(self._on_false_interrupt_timeout()) ) def _cancel_false_interrupt_timer(self): """Cancel false interrupt timer""" if self._false_interrupt_timer: logger.info("Cancelling false interrupt timer") self._false_interrupt_timer.cancel() self._false_interrupt_timer = None async def _on_false_interrupt_timeout(self): """Handle false interrupt timeout. Uses real-time VAD probability (via FRAME_PROCESSED) to distinguish genuine speech from transient noise. """ logger.info(f"False interrupt timeout reached after {self.false_interrupt_pause_duration}s") self._false_interrupt_timer = None vad_prob = 0.0 vad_energy = 0.0 if self.speech_understanding: vad_prob = self.speech_understanding.current_vad_probability vad_energy = self.speech_understanding.current_vad_energy if self._is_user_speaking: if vad_prob >= 0.3: logger.info( f"User still speaking (vad_prob={vad_prob:.3f}, " f"energy={vad_energy:.4f}) - confirming real interruption" ) self._is_in_false_interrupt_pause = False self._false_interrupt_paused_speech = False await self._interrupt_pipeline() return else: logger.info( f"User flagged as speaking but vad_prob={vad_prob:.3f} " f"is low - treating as false interruption" ) if self._is_in_false_interrupt_pause and self.speech_generation and self.speech_generation.can_pause(): logger.info("Resuming agent speech - false interruption detected") self._is_interrupted = False self._is_in_false_interrupt_pause = False self._false_interrupt_paused_speech = False await self.speech_generation.resume() async def _interrupt_pipeline(self) -> None: """Interrupt all components""" logger.info( "[ctx-add] _interrupt_pipeline ENTER (gen_id=%s already_interrupted=%s partial_len=%d)", self._generation_id, self._is_interrupted, len(self._partial_response or ""), ) self.agent.session._emit_agent_state(AgentState.LISTENING) self._is_interrupted = True self._cancel_false_interrupt_timer() self._is_in_false_interrupt_pause = False self._false_interrupt_paused_speech = False metrics_collector.on_interrupted() if self.agent and self.agent.session and self.agent.session.current_utterance: if self.agent.session.current_utterance.is_interruptible: self.agent.session.current_utterance.interrupt() cleanup_awaitables = [] if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: cleanup_awaitables.append(self.agent.session.stop_thinking_audio()) if self.speech_generation: cleanup_awaitables.append(self.speech_generation.interrupt()) if self.content_generation: cleanup_awaitables.append(self.content_generation.cancel()) if cleanup_awaitables: await asyncio.gather(*cleanup_awaitables, return_exceptions=True) if self._current_generation_task and not self._current_generation_task.done(): self._current_generation_task.cancel() truncated_response = "" source = "none" sg_spoken_len = len(self.speech_generation.spoken_transcript) if self.speech_generation and self.speech_generation.spoken_transcript else 0 partial_len = len(self._partial_response or "") if self.speech_generation and self.speech_generation.spoken_transcript: truncated_response = self.speech_generation.spoken_transcript source = "spoken_transcript" elif self._partial_response: truncated_response = self._partial_response source = "partial_response(LLM)" logger.info( "[ctx-add] INTERRUPT source=%s spoken_len=%d partial_len=%d chosen_len=%d preview=%r", source, sg_spoken_len, partial_len, len(truncated_response), truncated_response[:80], ) if truncated_response and metrics_collector.current_turn: if not metrics_collector.current_turn.agent_speech: metrics_collector.set_agent_response(truncated_response) else: metrics_collector.emit_agent_transcript_transport( metrics_collector.current_turn.agent_speech, type="final" ) if truncated_response and self.agent: last = self.agent.chat_context.items[-1] if self.agent.chat_context.items else None last_info = ( f"role={getattr(last, 'role', None)} id={getattr(last, 'id', None)} " f"len={len(getattr(last, 'content', '') or '') if isinstance(getattr(last, 'content', None), str) else 'list'}" if last else "none" ) logger.info( "[ctx-add] SITE-B interrupt add (source=%s len=%d last_item=%s)", source, len(truncated_response), last_info, ) msg = self.agent.chat_context.add_message( role=ChatRole.ASSISTANT, content=truncated_response ) msg.interrupted = True logger.info("[ctx-add] SITE-B added msg id=%s interrupted=True", msg.id) if metrics_collector.current_turn: logger.info("[orchestrator] Completing interrupted turn") metrics_collector.complete_turn() self._partial_response = "" async def interrupt(self) -> None: """Public method to interrupt the pipeline""" await self._interrupt_pipeline() async def _cancel_preemptive_generation(self) -> None: """Cancel preemptive generation. If `_interrupt_pipeline()` already ran this turn (`_is_interrupted` set), skip the redundant content/speech cleanup it already did and only do the preemptive-task-specific work. Cuts ~200-500ms off the post-interrupt path when a barge-in lands during preemptive generation. """ logger.info("Cancelling preemptive generation") self._preemptive_cancelled = True self._preemptive_authorized.set() if self._preemptive_generation_task and not self._preemptive_generation_task.done(): self._preemptive_generation_task.cancel() try: await self._preemptive_generation_task except asyncio.CancelledError: logger.info("Preemptive task cancelled successfully") self._preemptive_generation_task = None if not self._is_interrupted: redundant_cleanups = [] if self.content_generation: redundant_cleanups.append(self.content_generation.cancel()) if self.speech_generation: redundant_cleanups.append(self.speech_generation.interrupt()) if redundant_cleanups: await asyncio.gather(*redundant_cleanups, return_exceptions=True) if self._current_generation_task and not self._current_generation_task.done(): self._current_generation_task.cancel() if self.speech_understanding: self.speech_understanding.clear_preemptive_state() logger.info("Preemptive generation cancelled") async def _run_vmd_check(self) -> None: """Run voicemail detection check""" try: if not self.voice_mail_detector: return await asyncio.sleep(self.voice_mail_detector.duration) is_voicemail = await self.voice_mail_detector.detect(self._vmd_buffer.strip()) self.voice_mail_detection_done = True if is_voicemail: await self._interrupt_pipeline() self.emit("voicemail_result", {"is_voicemail": is_voicemail}) except Exception as e: logger.error(f"Error in VMD check: {e}") self.voice_mail_detection_done = True self.emit("voicemail_result", {"is_voicemail": False}) finally: self._vmd_check_task = None self._vmd_buffer = "" async def cleanup(self) -> None: """Cleanup all components""" logger.info("Cleaning up pipeline orchestrator") if self._vmd_check_task and not self._vmd_check_task.done(): self._vmd_check_task.cancel() if self._false_interrupt_timer: self._false_interrupt_timer.cancel() if self.speech_understanding: await self.speech_understanding.cleanup() if self.content_generation: await self.content_generation.cleanup() if self.speech_generation: await self.speech_generation.cleanup() self.agent = None self.avatar = None self.graph_adapter = None self.voice_mail_detector = None logger.info("Pipeline orchestrator cleaned up")Orchestrates the execution of speech understanding, content generation, and speech generation components.
Supports various component chains: 1. Full Chain: VAD → STT → TurnD → LLM → TTS 2. No TTS: VAD → STT → TurnD → LLM (text output) 3. No STT: LLM → TTS (text input) 4. Hybrid: VAD → STT → [User Processing] → TTS
Events: - transcript_ready: Transcript is ready for processing - content_generated: LLM has generated content - synthesis_complete: TTS synthesis is complete - voicemail_result: Voicemail detection result - error: Error occurred in pipeline
Ancestors
- EventEmitter
- typing.Generic
Methods
async def cleanup(self) ‑> None-
Expand source code
async def cleanup(self) -> None: """Cleanup all components""" logger.info("Cleaning up pipeline orchestrator") if self._vmd_check_task and not self._vmd_check_task.done(): self._vmd_check_task.cancel() if self._false_interrupt_timer: self._false_interrupt_timer.cancel() if self.speech_understanding: await self.speech_understanding.cleanup() if self.content_generation: await self.content_generation.cleanup() if self.speech_generation: await self.speech_generation.cleanup() self.agent = None self.avatar = None self.graph_adapter = None self.voice_mail_detector = None logger.info("Pipeline orchestrator cleaned up")Cleanup all components
def get_latest_transcript(self) ‑> str-
Expand source code
def get_latest_transcript(self) -> str: """Get the latest accumulated transcript (for hybrid scenarios)""" if self.speech_understanding: return self.speech_understanding._accumulated_transcript return ""Get the latest accumulated transcript (for hybrid scenarios)
async def handle_stt_event(self, text: str, confidence: float = 1.0) ‑> bool-
Expand source code
async def handle_stt_event(self, text: str, confidence: float = 1.0) -> bool: """Handle STT event for interruption (word-based). Only processes interruptions when the agent is actively speaking or generating. Respects interrupt_min_words, interrupt_min_confidence, and interrupt_mode configuration. Returns: True if interruption was triggered, False otherwise. """ if not text or not text.strip(): return False # Only consider interruption when agent is actively speaking or thinking agent_state = self.agent.session.agent_state if self.agent and self.agent.session else None if agent_state not in (AgentState.SPEAKING, AgentState.THINKING): return False if self.interrupt_min_confidence > 0.0 and confidence < self.interrupt_min_confidence: logger.info( f"[orchestrator] Ignoring low-confidence transcript " f"(confidence={confidence:.3f} < {self.interrupt_min_confidence:.3f}): {text!r}" ) return False word_count = len(text.strip().split()) # Debug: log exact state of current_utterance if self.agent and self.agent.session and self.agent.session.current_utterance: cu = self.agent.session.current_utterance logger.info(f"[orchestrator] handle_stt_event: word_count={word_count}, min_words={self.interrupt_min_words}, mode={self.interrupt_mode}, utterance_id={cu.id}, interruptible={cu.is_interruptible}, done={cu.done()}") else: logger.info(f"[orchestrator] handle_stt_event: word_count={word_count}, min_words={self.interrupt_min_words}, mode={self.interrupt_mode}, no current_utterance") if self.resume_on_false_interrupt and self._is_in_false_interrupt_pause and word_count >= self.interrupt_min_words: logger.info(f"STT transcript received while in paused state, confirming real interruption") self._cancel_false_interrupt_timer() self._is_in_false_interrupt_pause = False self._false_interrupt_paused_speech = False await self._interrupt_pipeline() return True if self.interrupt_mode in ("STT_ONLY", "HYBRID"): if word_count >= self.interrupt_min_words: if self.agent and self.agent.session and self.agent.session.current_utterance: if self.agent.session.current_utterance.is_interruptible: logger.info(f"[orchestrator] Word count {word_count} >= min_words {self.interrupt_min_words}, triggering interruption") await self._trigger_interruption() return True else: logger.info(f"[orchestrator] Word count threshold met but utterance is not interruptible") else: logger.info(f"[orchestrator] Word count {word_count} < min_words {self.interrupt_min_words}, ignoring") return FalseHandle STT event for interruption (word-based).
Only processes interruptions when the agent is actively speaking or generating. Respects interrupt_min_words, interrupt_min_confidence, and interrupt_mode configuration.
Returns
True if interruption was triggered, False otherwise.
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Public method to interrupt the pipeline""" await self._interrupt_pipeline()Public method to interrupt the pipeline
async def process_audio(self, audio_data: bytes) ‑> None-
Expand source code
async def process_audio(self, audio_data: bytes) -> None: """ Process incoming audio through the pipeline. Args: audio_data: Raw audio bytes """ if self.speech_understanding: await self.speech_understanding.process_audio(audio_data)Process incoming audio through the pipeline.
Args
audio_data- Raw audio bytes
async def process_text(self, text: str) ‑> None-
Expand source code
async def process_text(self, text: str) -> None: """ Process text input directly (bypasses STT). Args: text: User text input """ if not self.agent: logger.warning("No agent available for text processing") return metrics_collector.start_turn() metrics_collector.set_user_transcript(text) if self.graph_adapter: self.graph_adapter.set_agent(self.agent) graph_prompt, skip_llm = await self.graph_adapter.handle_input(text) if skip_llm: return if graph_prompt is not None: text = graph_prompt self.agent.chat_context.add_message( role=ChatRole.USER, content=text ) if not self.speech_understanding: metrics_collector.on_user_speech_start() metrics_collector.on_user_speech_end() if not self.speech_generation: metrics_collector.on_agent_speech_start() if self.content_generation: self.agent.session._emit_agent_state(AgentState.THINKING) full_response = "" graph_response_text = None async for response_chunk in self.content_generation.generate(text): if response_chunk.content: full_response += response_chunk.content if self.graph_adapter and response_chunk.metadata: if response_chunk.metadata.get("graph_response"): graph_response_text = response_chunk.metadata.get("graph_response") if self.graph_adapter and graph_response_text: new_response, skip = await self.graph_adapter.handle_decision( self.agent, graph_response_text ) if skip: return if new_response is not None: full_response = new_response if full_response: self.agent.chat_context.add_message( role=ChatRole.ASSISTANT, content=full_response ) self.emit("content_generated", {"text": full_response}) if self.hooks and self.hooks.has_llm_hooks(): try: await self.hooks.trigger_llm({"text": full_response}) except Exception as e: logger.error(f"Error in legacy LLM hook: {e}", exc_info=True) tts_text = full_response if self.hooks and self.hooks.has_llm_stream_hook(): async def _single_chunk(): yield full_response parts = [] async for chunk in self.hooks.process_llm_stream(_single_chunk()): parts.append(chunk) if parts: tts_text = "".join(parts) if self.speech_generation: await self.speech_generation.synthesize(tts_text) else: # No TTS - complete the turn after LLM self.agent.session._emit_agent_state(AgentState.IDLE) self.agent.session._emit_user_state(UserState.IDLE) metrics_collector.on_agent_speech_end() metrics_collector.set_agent_response(full_response) metrics_collector.complete_turn() self.emit("synthesis_complete", {})Process text input directly (bypasses STT).
Args
text- User text input
async def reply_with_context(self,
instructions: str,
wait_for_playback: bool,
handle: UtteranceHandle,
frames: list[av.VideoFrame] | None = None) ‑> None-
Expand source code
async def reply_with_context( self, instructions: str, wait_for_playback: bool, handle: UtteranceHandle, frames: list[av.VideoFrame] | None = None ) -> None: """ Generate a reply using instructions and current chat context. Args: instructions: Instructions to add to chat context wait_for_playback: If True, disable VAD/STT during response handle: Utterance handle frames: Optional video frames for vision """ if not self.agent: handle._mark_done() return original_handlers = {} if wait_for_playback and self.speech_understanding: if self.speech_understanding.vad: original_handlers['vad'] = self.speech_understanding._on_vad_event self.speech_understanding._on_vad_event = lambda x: None if self.speech_understanding.stt: original_handlers['stt'] = self.speech_understanding._on_stt_transcript self.speech_understanding._on_stt_transcript = lambda x: None try: context_prefix = None if self.agent.knowledge_base: kb_context = await self.agent.knowledge_base.process_query(instructions) if kb_context: context_prefix = kb_context content_parts = [instructions] if frames: for frame in frames: image_part = ImageContent(image=frame, inference_detail="auto") content_parts.append(image_part) self.agent.chat_context.add_message( role=ChatRole.USER, content=content_parts if len(content_parts) > 1 else instructions ) await self._generate_and_synthesize(instructions, handle, context_prefix=context_prefix) finally: if wait_for_playback and self.speech_understanding: if 'vad' in original_handlers: self.speech_understanding._on_vad_event = original_handlers['vad'] if 'stt' in original_handlers: self.speech_understanding._on_stt_transcript = original_handlers['stt'] if not handle.done(): handle._mark_done()Generate a reply using instructions and current chat context.
Args
instructions- Instructions to add to chat context
wait_for_playback- If True, disable VAD/STT during response
handle- Utterance handle
frames- Optional video frames for vision
async def say(self,
message: str,
handle: UtteranceHandle,
audio_data: bytes | bytearray | Iterable[bytes] | AsyncIterator[bytes] | None = None) ‑> None-
Expand source code
async def say( self, message: str, handle: UtteranceHandle, audio_data: bytes | bytearray | Iterable[bytes] | AsyncIterator[bytes] | None = None, ) -> None: """ Direct TTS synthesis (for initial messages). Args: message: Message to synthesize handle: Utterance handle to track audio_data: Optional pre-synthesized PCM bytes. When provided, bypasses TTS synthesis entirely and streams these bytes to the agent audio track. Chunker and TTS text filter are also skipped on this path. """ if not self.speech_generation: handle._mark_done() return playback_done = asyncio.Event() def _on_playback_done(_data: Any = None) -> None: playback_done.set() self.speech_generation.on("last_audio_byte", _on_playback_done) self.speech_generation.on("synthesis_interrupted", _on_playback_done) try: if audio_data is not None: metrics_collector.set_agent_response(message, emit_transport=True) await self.speech_generation.synthesize_audio_bytes( audio_data, message, handle=handle ) await playback_done.wait() return if not (self.hooks and self.hooks.has_tts_stream_hook()): tts = self.speech_generation.tts emit = not bool(getattr(tts, "supports_word_timestamps", False)) metrics_collector.set_agent_response(message, emit_transport=emit) if self._chunker is not None or self._text_filter is not None: async def _string_iter() -> AsyncIterator[str]: yield message text_source: AsyncIterator[Any] = _string_iter() if self._text_filter is not None: text_source = self._text_filter.filter(text_source) if self._chunker is not None: text_source = _pipe_through_chunk_stream( text_source, self._chunker, language=self._chunking_language, ) await self.speech_generation.synthesize(text_source) else: await self.speech_generation.synthesize(message) await playback_done.wait() finally: self.speech_generation.off("last_audio_byte", _on_playback_done) self.speech_generation.off("synthesis_interrupted", _on_playback_done) handle._mark_done()Direct TTS synthesis (for initial messages).
Args
message- Message to synthesize
handle- Utterance handle to track
audio_data- Optional pre-synthesized PCM bytes. When provided, bypasses TTS synthesis entirely and streams these bytes to the agent audio track. Chunker and TTS text filter are also skipped on this path.
def set_audio_track(self, audio_track: Any) ‑> None-
Expand source code
def set_audio_track(self, audio_track: Any) -> None: """Set audio track for TTS output""" if self.speech_generation: self.speech_generation.set_audio_track(audio_track) if audio_track is not None and hasattr(audio_track, "interrupt_fade_duration"): audio_track.interrupt_fade_duration = self.interrupt_fade_durationSet audio track for TTS output
def set_voice_mail_detector(self, detector: VoiceMailDetector | None) ‑> None-
Expand source code
def set_voice_mail_detector(self, detector: VoiceMailDetector | None) -> None: """Configure voicemail detection""" self.voice_mail_detector = detector self.voice_mail_detection_done = False self._vmd_buffer = ""Configure voicemail detection
async def start(self) ‑> None-
Expand source code
async def start(self) -> None: """Start all components""" if self.speech_understanding: await self.speech_understanding.start() if self.content_generation: await self.content_generation.start() if self.speech_generation: await self.speech_generation.start() logger.info("PipelineOrchestrator started")Start all components
Inherited members