Module agents.pipeline_orchestrator

Classes

class PipelineOrchestrator (agent: Agent | None = None,
stt: STT | None = None,
llm: LLM | None = None,
tts: TTS | None = None,
vad: VAD | None = None,
turn_detector: EOU | None = None,
denoise: Denoise | None = None,
avatar: Any | None = None,
mode: "Literal['ADAPTIVE', 'DEFAULT']" = 'DEFAULT',
min_speech_wait_timeout: tuple[float, float] = (0.5, 0.8),
interrupt_mode: "Literal['VAD_ONLY', 'STT_ONLY', 'HYBRID']" = 'HYBRID',
interrupt_min_duration: float = 0.5,
interrupt_min_words: int = 2,
interrupt_min_confidence: float = 0.0,
false_interrupt_pause_duration: float = 2.0,
resume_on_false_interrupt: bool = False,
interrupt_fade_duration: float = 0.4,
graph_adapter: Any | None = None,
context_window: Any | None = None,
voice_mail_detector: VoiceMailDetector | None = None,
hooks: "'PipelineHooks | None'" = None,
chunker: "'SentenceChunker | None'" = None,
text_filter: "'TextFilter | None'" = None,
chunking_language: str = 'auto')
Expand source code
class PipelineOrchestrator(EventEmitter[Literal[
    "transcript_ready", 
    "content_generated", 
    "synthesis_complete",
    "voicemail_result",
    "error"
]]):
    """
    Orchestrates the execution of speech understanding, content generation, and speech generation components.
    
    Supports various component chains:
    1. Full Chain: VAD → STT → TurnD → LLM → TTS
    2. No TTS: VAD → STT → TurnD → LLM (text output)
    3. No STT: LLM → TTS (text input)
    4. Hybrid: VAD → STT → [User Processing] → TTS
    
    Events:
    - transcript_ready: Transcript is ready for processing
    - content_generated: LLM has generated content
    - synthesis_complete: TTS synthesis is complete
    - voicemail_result: Voicemail detection result
    - error: Error occurred in pipeline
    """
    
    def __init__(
        self,
        agent: Agent | None = None,
        stt: STT | None = None,
        llm: LLM | None = None,
        tts: TTS | None = None,
        vad: VAD | None = None,
        turn_detector: EOU | None = None,
        denoise: Denoise | None = None,
        avatar: Any | None = None,
        mode: Literal["ADAPTIVE", "DEFAULT"] = "DEFAULT",
        min_speech_wait_timeout: tuple[float, float] = (0.5, 0.8),
        interrupt_mode: Literal["VAD_ONLY", "STT_ONLY", "HYBRID"] = "HYBRID",
        interrupt_min_duration: float = 0.5,
        interrupt_min_words: int = 2,
        interrupt_min_confidence: float = 0.0,
        false_interrupt_pause_duration: float = 2.0,
        resume_on_false_interrupt: bool = False,
        interrupt_fade_duration: float = 0.4,
        graph_adapter: Any | None = None,
        context_window: Any | None = None,
        voice_mail_detector: VoiceMailDetector | None = None,
        hooks: "PipelineHooks | None" = None,
        chunker: "SentenceChunker | None" = None,
        text_filter: "TextFilter | None" = None,
        chunking_language: str = "auto",
    ) -> None:
        super().__init__()

        self.agent = agent
        self.avatar = avatar
        self.graph_adapter = graph_adapter
        self.voice_mail_detector = voice_mail_detector
        self.voice_mail_detection_done = False
        self._vmd_buffer = ""
        self._vmd_check_task: asyncio.Task | None = None
        self.hooks = hooks

        # Text chunking / filtering (cascade-mode only).
        self._chunker = chunker
        self._text_filter = text_filter
        self._chunking_language = chunking_language
        
        # Interruption configuration
        self.interrupt_mode = interrupt_mode
        self.interrupt_min_duration = interrupt_min_duration
        self.interrupt_min_words = interrupt_min_words
        self.interrupt_min_confidence = interrupt_min_confidence
        self.false_interrupt_pause_duration = false_interrupt_pause_duration
        self.resume_on_false_interrupt = resume_on_false_interrupt
        self.interrupt_fade_duration = interrupt_fade_duration
        
        # Interruption state
        self._generation_id = 0
        self._is_interrupted = False
        self._is_in_false_interrupt_pause = False
        self._false_interrupt_paused_speech = False
        self._false_interrupt_timer: asyncio.TimerHandle | None = None
        self._is_user_speaking = False
        self._interruption_check_task: asyncio.Task | None = None
        self._speech_start_energy: float = 0.0
        
        # Component modules
        self.speech_understanding: SpeechUnderstanding | None = None
        self.content_generation: ContentGeneration | None = None
        self.speech_generation: SpeechGeneration | None = None
        
        # Initialize components based on what's available
        if stt or vad or turn_detector:
            self.speech_understanding = SpeechUnderstanding(
                agent=agent,
                stt=stt,
                vad=vad,
                turn_detector=turn_detector,
                denoise=denoise,
                mode=mode,
                min_speech_wait_timeout=min_speech_wait_timeout[0],
                max_speech_wait_timeout=min_speech_wait_timeout[1],
                hooks=hooks,
            )
            # Setup event listeners with sync wrappers
            self.speech_understanding.on("transcript_final", self._wrap_async(self._on_transcript_final))
            self.speech_understanding.on("transcript_preflight", self._wrap_async(self._on_transcript_preflight))
            self.speech_understanding.on("transcript_interim", self._wrap_async(self._on_transcript_interim))
            self.speech_understanding.on("speech_started", self._wrap_async(self._on_speech_started))
            self.speech_understanding.on("speech_stopped", self._wrap_async(self._on_speech_stopped))
            self.speech_understanding.on("turn_resumed", self._wrap_async(self._on_turn_resumed))
        
        if llm:
            self.content_generation = ContentGeneration(
                agent=agent,
                llm=llm,
                graph_adapter=graph_adapter,
                context_window=context_window,
            )
            # Setup event listeners
            self.content_generation.on("generation_started", lambda data: logger.info("Content generation started"))
            self.content_generation.on("generation_chunk", lambda data: None)
            self.content_generation.on("generation_complete", lambda data: logger.info("Content generation complete"))
        
        if tts:
            self.speech_generation = SpeechGeneration(
                agent=agent,
                tts=tts,
                avatar=avatar,
                hooks=hooks,
            )
            # Setup event listeners
            self.speech_generation.on("synthesis_started", lambda data: logger.info("Speech synthesis started"))
            self.speech_generation.on("first_audio_byte", lambda data: logger.info("First audio byte ready"))
            self.speech_generation.on("last_audio_byte", lambda data: logger.info("Synthesis complete"))
            self.speech_generation.on("synthesis_interrupted", lambda data: logger.info("Synthesis interrupted"))
        
        # Generation tasks
        self._current_generation_task: asyncio.Task | None = None
        self._partial_response = ""
        
        # Preemptive generation
        self._preemptive_generation_task: asyncio.Task | None = None
        self._preemptive_authorized = asyncio.Event()
        self._preemptive_cancelled = False
    
    def _wrap_async(self, async_func):
        """
        Wrap an async function to be compatible with EventEmitter's sync-only handlers.
        
        Args:
            async_func: The async function to wrap
            
        Returns:
            A sync function that schedules the async function as a task
        """
        def sync_wrapper(*args, **kwargs):
            asyncio.create_task(async_func(*args, **kwargs))
        return sync_wrapper
    
    def set_audio_track(self, audio_track: Any) -> None:
        """Set audio track for TTS output"""
        if self.speech_generation:
            self.speech_generation.set_audio_track(audio_track)
            
        if audio_track is not None and hasattr(audio_track, "interrupt_fade_duration"):
            audio_track.interrupt_fade_duration = self.interrupt_fade_duration
    
    def set_voice_mail_detector(self, detector: VoiceMailDetector | None) -> None:
        """Configure voicemail detection"""
        self.voice_mail_detector = detector
        self.voice_mail_detection_done = False
        self._vmd_buffer = ""
    
    async def start(self) -> None:
        """Start all components"""
        if self.speech_understanding:
            await self.speech_understanding.start()
        if self.content_generation:
            await self.content_generation.start()
        if self.speech_generation:
            await self.speech_generation.start()
        
        logger.info("PipelineOrchestrator started")
    
    async def process_audio(self, audio_data: bytes) -> None:
        """
        Process incoming audio through the pipeline.
        
        Args:
            audio_data: Raw audio bytes
        """
        if self.speech_understanding:
            await self.speech_understanding.process_audio(audio_data)
    
    async def process_text(self, text: str) -> None:
        """
        Process text input directly (bypasses STT).

        Args:
            text: User text input
        """
        if not self.agent:
            logger.warning("No agent available for text processing")
            return

        metrics_collector.start_turn()
        metrics_collector.set_user_transcript(text)

        if self.graph_adapter:
            self.graph_adapter.set_agent(self.agent)
            graph_prompt, skip_llm = await self.graph_adapter.handle_input(text)
            if skip_llm:
                return
            if graph_prompt is not None:
                text = graph_prompt

        self.agent.chat_context.add_message(
            role=ChatRole.USER,
            content=text
        )

        if not self.speech_understanding:
            metrics_collector.on_user_speech_start()
            metrics_collector.on_user_speech_end()
            
            if not self.speech_generation:
                metrics_collector.on_agent_speech_start()

        if self.content_generation:
            self.agent.session._emit_agent_state(AgentState.THINKING)
            full_response = ""
            graph_response_text = None
            async for response_chunk in self.content_generation.generate(text):
                if response_chunk.content:
                    full_response += response_chunk.content
                if self.graph_adapter and response_chunk.metadata:
                    if response_chunk.metadata.get("graph_response"):
                        graph_response_text = response_chunk.metadata.get("graph_response")

            if self.graph_adapter and graph_response_text:
                new_response, skip = await self.graph_adapter.handle_decision(
                    self.agent, graph_response_text
                )
                if skip:
                    return
                if new_response is not None:
                    full_response = new_response

            if full_response:
                self.agent.chat_context.add_message(
                    role=ChatRole.ASSISTANT,
                    content=full_response
                )

                self.emit("content_generated", {"text": full_response})

                if self.hooks and self.hooks.has_llm_hooks():
                    try:
                        await self.hooks.trigger_llm({"text": full_response})
                    except Exception as e:
                        logger.error(f"Error in legacy LLM hook: {e}", exc_info=True)

                tts_text = full_response
                if self.hooks and self.hooks.has_llm_stream_hook():
                    async def _single_chunk():
                        yield full_response
                    parts = []
                    async for chunk in self.hooks.process_llm_stream(_single_chunk()):
                        parts.append(chunk)
                    if parts:
                        tts_text = "".join(parts)

                if self.speech_generation:
                    await self.speech_generation.synthesize(tts_text)
                else:
                    # No TTS - complete the turn after LLM
                    self.agent.session._emit_agent_state(AgentState.IDLE)
                    self.agent.session._emit_user_state(UserState.IDLE)
                    metrics_collector.on_agent_speech_end()
                    metrics_collector.set_agent_response(full_response)
                    metrics_collector.complete_turn()

                self.emit("synthesis_complete", {})
    
    def get_latest_transcript(self) -> str:
        """Get the latest accumulated transcript (for hybrid scenarios)"""
        if self.speech_understanding:
            return self.speech_understanding._accumulated_transcript
        return ""
    
    async def _on_transcript_final(self, data: dict) -> None:
        """Handle final transcript from speech understanding"""
        text = data["text"]
        is_preemptive = data.get("is_preemptive", False)

        agent_state = self.agent.session.agent_state if self.agent and self.agent.session else None
        cu = self.agent.session.current_utterance if self.agent and self.agent.session else None
        is_playing_audio = bool(self.speech_generation and self.speech_generation.is_speaking)
        playback_counts_for_turn = is_playing_audio and not (cu is not None and cu.done())
        is_agent_active = (agent_state in (AgentState.SPEAKING, AgentState.THINKING)) or playback_counts_for_turn

        if is_agent_active:
            word_count = len(text.strip().split()) if text.strip() else 0
            logger.info(f"[orchestrator] Final transcript while agent is {agent_state.value} | word_count={word_count}, min_words={self.interrupt_min_words}")

            if cu and not cu.is_interruptible and (not cu.done() or is_playing_audio):
                logger.info(
                    f"[orchestrator] Transcript '{text}' received during non-interruptible playback, dropping"
                )
                return

            if word_count < self.interrupt_min_words:
                logger.info(f"[orchestrator] Transcript '{text}' below min_words threshold ({word_count} < {self.interrupt_min_words}), dropping")
                return

            # Word count threshold met — check if utterance is interruptible
            if self.agent.session.current_utterance and not self.agent.session.current_utterance.is_interruptible:
                logger.info(f"[orchestrator] Transcript '{text}' meets min_words but utterance is not interruptible, dropping")
                return

            has_active_preemptive = (
                is_preemptive
                and self._preemptive_generation_task is not None
                and not self._preemptive_generation_task.done()
            )

            if has_active_preemptive:
                logger.info(
                    f"[orchestrator] Word count {word_count} >= min_words "
                    f"{self.interrupt_min_words}, deferring to preemptive handler"
                )
            else:
                logger.info(f"[orchestrator] Word count {word_count} >= min_words {self.interrupt_min_words}, interrupting pipeline")
                await self._interrupt_pipeline()

            metrics_collector.start_turn()
            metrics_collector.set_user_transcript(text)

        if self.voice_mail_detector and not self.voice_mail_detection_done and text.strip():
            self._vmd_buffer += f" {text}"
            if not self._vmd_check_task:
                logger.info("Starting Voice Mail Detection Timer")
                self._vmd_check_task = asyncio.create_task(self._run_vmd_check())
        
        self.emit("transcript_ready", {
            "text": text,
            "is_final": True,
            "is_preemptive": is_preemptive,
            "metadata": data.get("metadata", {})
        })
        
        if is_preemptive:
            await self._handle_preemptive_final(text)
        else:
            await self._process_final_transcript(text)
    
    async def _on_transcript_preflight(self, data: dict) -> None:
        """Handle preflight transcript for preemptive generation"""
        preflight_text = data["text"]

        if self.agent and self.content_generation:
            context_prefix = None
            if self.agent.knowledge_base:
                kb_context = await self.agent.knowledge_base.process_query(preflight_text)
                if kb_context:
                    context_prefix = kb_context

            self.agent.chat_context.add_message(
                role=ChatRole.USER,
                content=preflight_text
            )

            if self.agent.session:
                if self.agent.session.current_utterance and not self.agent.session.current_utterance.done():
                    self.agent.session.current_utterance.interrupt()

                handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}")
                self.agent.session.current_utterance = handle
            else:
                handle = UtteranceHandle(utterance_id="utt_fallback")
                handle._mark_done()

            self._preemptive_authorized.clear()
            self._preemptive_cancelled = False
            if metrics_collector:
                metrics_collector.on_stt_preflight_end()
            self._preemptive_generation_task = asyncio.create_task(
                self._generate_and_synthesize(preflight_text, handle, wait_for_authorization=True, context_prefix=context_prefix)
            )
    
    async def _handle_preemptive_final(self, final_text: str) -> None:
        """Handle final transcript when preemptive generation is active"""
        if not self.speech_understanding:
            return
        
        if self.speech_understanding.check_preemptive_match(final_text):
            logger.info("Preemptive generation MATCH - authorizing playback")
            
            self._preemptive_authorized.set()
            
            if self._preemptive_generation_task:
                try:
                    await asyncio.wait_for(self._preemptive_generation_task, timeout=30.0)
                    logger.info("Preemptive generation completed successfully")
                except asyncio.TimeoutError:
                    logger.error("Preemptive playback timeout")
                except Exception as e:
                    logger.error(f"Error in preemptive playback: {e}")
        else:
            logger.info("Preemptive generation MISMATCH - cancelling")
            
            await self._cancel_preemptive_generation()
            
            if self.agent:
                msgs = self.agent.chat_context.messages()
                if msgs and msgs[-1].role == ChatRole.USER:
                    self.agent.chat_context._items.remove(msgs[-1])
            
            await self._process_final_transcript(final_text)
        
        self.speech_understanding.clear_preemptive_state()
        self._preemptive_generation_task = None
    
    async def _on_transcript_interim(self, data: dict) -> None:
        """Handle interim transcript — check for STT-based interruption"""
        text = data.get("text", "")
        confidence = data.get("confidence", 1.0)
        if text and text.strip():
            await self.handle_stt_event(text, confidence=confidence)
    
    async def _on_speech_started(self, data: dict) -> None:
        """Handle speech started event with VAD metadata."""
        vad_confidence = data.get("confidence", 0.0) if isinstance(data, dict) else 0.0
        vad_energy = data.get("energy", 0.0) if isinstance(data, dict) else 0.0
        vad_speech_dur = data.get("speech_duration", 0.0) if isinstance(data, dict) else 0.0
        logger.info(
            f"[orchestrator] _on_speech_started fired "
            f"| confidence={vad_confidence:.3f} energy={vad_energy:.4f} "
            f"speech_dur={vad_speech_dur:.3f}s"
        )
        self._is_user_speaking = True
        self._speech_start_energy = vad_energy

        agent_state = self.agent.session.agent_state if self.agent and self.agent.session else None
        cu = self.agent.session.current_utterance if self.agent and self.agent.session else None
        is_synthesizing = bool(self.speech_generation and self.speech_generation.tts_lock.locked())
        is_agent_active = (
            agent_state in (AgentState.SPEAKING, AgentState.THINKING)
            or (cu is not None and not cu.done())
            or is_synthesizing
        )
        if is_agent_active:
            if self._interruption_check_task is None or self._interruption_check_task.done():
                logger.info("User started speaking during agent response, initiating interruption monitoring")
                self._interruption_check_task = asyncio.create_task(
                    self._monitor_interruption_duration()
                )

    async def _on_speech_stopped(self, data: dict) -> None:
        """Handle speech stopped event with VAD metadata."""
        speech_dur = data.get("speech_duration", 0.0) if isinstance(data, dict) else 0.0
        has_audio = data.get("has_audio", False) if isinstance(data, dict) else False
        logger.info(
            f"[orchestrator] _on_speech_stopped fired "
            f"| speech_dur={speech_dur:.3f}s has_audio={has_audio}"
        )
        self._is_user_speaking = False

        if self._interruption_check_task is not None and not self._interruption_check_task.done():
            logger.info("User stopped speaking, cancelling interruption check")
            self._interruption_check_task.cancel()
    
    async def _on_turn_resumed(self, data: dict) -> None:
        """Handle turn resumed event"""
        await self._cancel_preemptive_generation()
    
    async def _process_final_transcript(self, user_text: str) -> None:
        """Process final transcript through the full pipeline"""
        if not self.agent:
            logger.warning("No agent available")
            return
        
        if self.hooks and self.hooks.has_user_turn_start_hooks():
            await self.hooks.trigger_user_turn_start(user_text)

        context_prefix = None
        if self.agent.knowledge_base:
            kb_context = await self.agent.knowledge_base.process_query(user_text)
            if kb_context:
                context_prefix = kb_context

        final_user_text = user_text

        if not metrics_collector.current_turn:
            metrics_collector.start_turn()
        metrics_collector.set_user_transcript(user_text)

        self.agent.chat_context.add_message(
            role=ChatRole.USER,
            content=final_user_text
        )

        if self.agent.session:
            if self.agent.session.current_utterance and not self.agent.session.current_utterance.done():
                if self.agent.session.current_utterance.is_interruptible:
                    self.agent.session.current_utterance.interrupt()
                else:
                    logger.info("Current utterance is not interruptible")

            handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}")
            self.agent.session.current_utterance = handle
        else:
            handle = UtteranceHandle(utterance_id="utt_fallback")
            handle._mark_done()

        if self._current_generation_task and not self._current_generation_task.done():
            self._current_generation_task.cancel()
        self._current_generation_task = asyncio.create_task(
            self._generate_and_synthesize(final_user_text, handle, context_prefix=context_prefix)
        )
    
    
    async def _generate_and_synthesize(
        self,
        user_text: str,
        handle: UtteranceHandle,
        wait_for_authorization: bool = False,
        context_prefix: str | None = None
    ) -> None:
        """Generate LLM response and synthesize with TTS"""
        self._generation_id += 1
        my_generation_id = self._generation_id
        self._is_interrupted = False
        full_response = ""
        self._partial_response = ""

        if self.speech_generation:
            self.speech_generation.spoken_transcript = ""
        
        
        try:
            if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled:
                await self.agent.session.start_thinking_audio()

            if self.speech_generation:
                self.speech_generation.reset_interrupt()    
            
            if not self.content_generation and not self.graph_adapter:
                logger.warning("No content generation available")
                return
            
            if not self.speech_generation:
                logger.warning("No speech generation available")
                metrics_collector.on_agent_speech_start()

            if self.graph_adapter:
                self.graph_adapter.set_agent(self.agent)
                graph_prompt, skip_llm = await self.graph_adapter.handle_input(user_text)
                if skip_llm:
                    return
                if graph_prompt is not None:
                    user_text = graph_prompt
                    msgs = self.agent.chat_context.messages()
                    for msg in reversed(msgs):
                        if msg.role == ChatRole.USER:
                            msg.content = user_text
                            break

            self.agent.session._emit_agent_state(AgentState.THINKING)
            llm_stream = self.content_generation.generate(user_text, context_prefix=context_prefix)
            
            q = asyncio.Queue(maxsize=50)
            graph_response_text = None

            async def collector():
                """Collect LLM chunks and feed queue for TTS"""
                nonlocal graph_response_text
                response_parts = []

                def _safe_set_agent_response(text: str) -> None:
                    """Only set agent response if this collector still owns the current generation."""
                    if self.hooks and self.hooks.has_tts_stream_hook():
                        return
                    if text and self._generation_id == my_generation_id and metrics_collector.current_turn:
                        metrics_collector.set_agent_response(text)

                try:
                    async for chunk in llm_stream:
                        if handle.interrupted or (wait_for_authorization and self._preemptive_cancelled):
                            logger.info("LLM collection interrupted")
                            await q.put(None)
                            full = "".join(response_parts)
                            _safe_set_agent_response(full)
                            return full

                        content = chunk.content if hasattr(chunk, "content") else chunk
                        metadata = chunk.metadata if hasattr(chunk, "metadata") else chunk

                        if content:
                            response_parts.append(content)
                            await q.put(content)
                            logger.debug("[chunking] LLM delta → queue: %r", content)

                        if self.graph_adapter and metadata and isinstance(metadata, dict):
                            if metadata.get("graph_response"):
                                graph_response_text = metadata.get("graph_response")

                        self._partial_response = "".join(response_parts)

                    if not handle.interrupted:
                        await q.put(None)

                    full = "".join(response_parts)
                    _safe_set_agent_response(full)
                    return full

                except asyncio.CancelledError:
                    logger.info("LLM collection cancelled")
                    await q.put(None)
                    full = "".join(response_parts)
                    _safe_set_agent_response(full)
                    return full
            
            async def tts_consumer():
                """Consume LLM chunks and send to TTS"""
                if wait_for_authorization:
                    try:
                        await asyncio.wait_for(
                            self._preemptive_authorized.wait(),
                            timeout=10.0
                        )
                        
                        if self._preemptive_cancelled:
                            logger.info("Preemptive generation cancelled during authorization wait")
                            return
                    
                    except asyncio.TimeoutError:
                        logger.error("Authorization timeout - cancelling preemptive generation")
                        self._preemptive_cancelled = True
                        return
                
                async def tts_stream_gen():
                    """Generate TTS stream from queue"""
                    while True:
                        if handle.interrupted or (wait_for_authorization and self._preemptive_cancelled):
                            break

                        try:
                            chunk = await asyncio.wait_for(q.get(), timeout=0.1)
                            if chunk is None:
                                break
                            yield chunk

                        except asyncio.TimeoutError:
                            if handle.interrupted or (wait_for_authorization and self._preemptive_cancelled):
                                break
                            continue

                if self.speech_generation:
                    playback_done = asyncio.Event()

                    def _on_playback_done(_data: Any = None) -> None:
                        playback_done.set()

                    self.speech_generation.on("last_audio_byte", _on_playback_done)
                    self.speech_generation.on("synthesis_interrupted", _on_playback_done)
                    try:
                        text_source = tts_stream_gen()
                        if self._text_filter is not None:
                            text_source = self._text_filter.filter(text_source)
                        if self._chunker is not None:
                            text_source = _pipe_through_chunk_stream(
                                text_source,
                                self._chunker,
                                language=self._chunking_language,
                            )
                        if self.hooks:
                            text_source = self.hooks.process_llm_stream(text_source)
                        await self.speech_generation.synthesize(text_source)
                        await playback_done.wait()
                    except asyncio.CancelledError:
                        if self.speech_generation:
                            await self.speech_generation.interrupt()
                    finally:
                        self.speech_generation.off("last_audio_byte", _on_playback_done)
                        self.speech_generation.off("synthesis_interrupted", _on_playback_done)
            
            collector_task = asyncio.create_task(collector())
            tts_task = asyncio.create_task(tts_consumer())
            
            try:
                await asyncio.gather(collector_task, tts_task, return_exceptions=True)
            except asyncio.CancelledError:
                if not collector_task.done():
                    collector_task.cancel()
                if not tts_task.done():
                    tts_task.cancel()
            
            if not collector_task.cancelled() and not self._is_interrupted:
                full_response = collector_task.result()
            else:
                full_response = self._partial_response

            if self.graph_adapter and graph_response_text and not self._is_interrupted:
                new_response, skip = await self.graph_adapter.handle_decision(
                    self.agent, graph_response_text
                )
                if skip:
                    return
                if new_response is not None:
                    full_response = new_response

            if self._is_interrupted or self._generation_id != my_generation_id:
                logger.info(
                    "[ctx-add] SKIP post-gather add (gen_id=%s my_gen_id=%s is_interrupted=%s partial_len=%d full_len=%d)",
                    self._generation_id, my_generation_id, self._is_interrupted,
                    len(self._partial_response or ""), len(full_response or ""),
                )
            elif full_response and self.agent:
                logger.info(
                    "[ctx-add] SITE-A post-gather add (gen_id=%s is_interrupted=%s len=%d preview=%r)",
                    my_generation_id, self._is_interrupted, len(full_response), full_response,
                )
                self.agent.chat_context.add_message(
                    role=ChatRole.ASSISTANT,
                    content=full_response
                )
                self.emit("content_generated", {"text": full_response})

                if self.hooks and self.hooks.has_llm_hooks():
                    try:
                        await self.hooks.trigger_llm({"text": full_response})
                    except Exception as e:
                        logger.error(f"Error in legacy LLM hook: {e}", exc_info=True)

                # For no-TTS modes, complete the turn here since there's no speech_generation
                if not self.speech_generation:
                    self.agent.session._emit_user_state(UserState.IDLE)
                    self.agent.session._emit_agent_state(AgentState.IDLE)
                    metrics_collector.on_agent_speech_end()
                    metrics_collector.complete_turn()

        finally:
            if self.hooks and self.hooks.has_user_turn_end_hooks():
                await self.hooks.trigger_user_turn_end()

            if not handle.done():
                handle._mark_done()

            if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled:
                await self.agent.session.stop_thinking_audio()
    
    async def say(
        self,
        message: str,
        handle: UtteranceHandle,
        audio_data: bytes | bytearray | Iterable[bytes] | AsyncIterator[bytes] | None = None,
    ) -> None:
        """
        Direct TTS synthesis (for initial messages).

        Args:
            message: Message to synthesize
            handle: Utterance handle to track
            audio_data: Optional pre-synthesized PCM bytes. When provided,
                bypasses TTS synthesis entirely and streams these bytes to
                the agent audio track. Chunker and TTS text filter are
                also skipped on this path.
        """
        if not self.speech_generation:
            handle._mark_done()
            return

        playback_done = asyncio.Event()

        def _on_playback_done(_data: Any = None) -> None:
            playback_done.set()

        self.speech_generation.on("last_audio_byte", _on_playback_done)
        self.speech_generation.on("synthesis_interrupted", _on_playback_done)

        try:
            if audio_data is not None:
                metrics_collector.set_agent_response(message, emit_transport=True)
                await self.speech_generation.synthesize_audio_bytes(
                    audio_data, message, handle=handle
                )
                await playback_done.wait()
                return

            if not (self.hooks and self.hooks.has_tts_stream_hook()):
                tts = self.speech_generation.tts
                emit = not bool(getattr(tts, "supports_word_timestamps", False))
                metrics_collector.set_agent_response(message, emit_transport=emit)

            if self._chunker is not None or self._text_filter is not None:
                async def _string_iter() -> AsyncIterator[str]:
                    yield message
                text_source: AsyncIterator[Any] = _string_iter()
                if self._text_filter is not None:
                    text_source = self._text_filter.filter(text_source)
                if self._chunker is not None:
                    text_source = _pipe_through_chunk_stream(
                        text_source,
                        self._chunker,
                        language=self._chunking_language,
                    )
                await self.speech_generation.synthesize(text_source)
            else:
                await self.speech_generation.synthesize(message)

            await playback_done.wait()
        finally:
            self.speech_generation.off("last_audio_byte", _on_playback_done)
            self.speech_generation.off("synthesis_interrupted", _on_playback_done)
            handle._mark_done()
    
    async def reply_with_context(
        self,
        instructions: str,
        wait_for_playback: bool,
        handle: UtteranceHandle,
        frames: list[av.VideoFrame] | None = None
    ) -> None:
        """
        Generate a reply using instructions and current chat context.
        
        Args:
            instructions: Instructions to add to chat context
            wait_for_playback: If True, disable VAD/STT during response
            handle: Utterance handle
            frames: Optional video frames for vision
        """
        if not self.agent:
            handle._mark_done()
            return
        
        original_handlers = {}
        if wait_for_playback and self.speech_understanding:
            if self.speech_understanding.vad:
                original_handlers['vad'] = self.speech_understanding._on_vad_event
                self.speech_understanding._on_vad_event = lambda x: None
            
            if self.speech_understanding.stt:
                original_handlers['stt'] = self.speech_understanding._on_stt_transcript
                self.speech_understanding._on_stt_transcript = lambda x: None
        
        try:
            context_prefix = None
            if self.agent.knowledge_base:
                kb_context = await self.agent.knowledge_base.process_query(instructions)
                if kb_context:
                    context_prefix = kb_context

            content_parts = [instructions]
            if frames:
                for frame in frames:
                    image_part = ImageContent(image=frame, inference_detail="auto")
                    content_parts.append(image_part)

            self.agent.chat_context.add_message(
                role=ChatRole.USER,
                content=content_parts if len(content_parts) > 1 else instructions
            )

            await self._generate_and_synthesize(instructions, handle, context_prefix=context_prefix)
        
        finally:
            if wait_for_playback and self.speech_understanding:
                if 'vad' in original_handlers:
                    self.speech_understanding._on_vad_event = original_handlers['vad']
                if 'stt' in original_handlers:
                    self.speech_understanding._on_stt_transcript = original_handlers['stt']
            
            if not handle.done():
                handle._mark_done()
    
    async def _monitor_interruption_duration(self) -> None:
        """Monitor user speech duration during agent response.

        Polls the real-time VAD probability from SpeechUnderstanding
        every 50 ms instead of a single blind sleep.  This lets the
        pipeline react the instant sustained speech crosses the
        configured duration threshold, and avoids false-triggering
        if the user stops speaking or the probability drops midway.
        """
        if self.interrupt_mode not in ("VAD_ONLY", "HYBRID"):
            return

        try:
            elapsed = 0.0
            poll_interval = 0.05

            while elapsed < self.interrupt_min_duration:
                await asyncio.sleep(poll_interval)
                elapsed += poll_interval

                if not self._is_user_speaking:
                    logger.debug(
                        f"User stopped speaking at {elapsed:.2f}s "
                        f"(< {self.interrupt_min_duration}s), aborting interruption check"
                    )
                    return

                if self.speech_understanding:
                    prob = self.speech_understanding.current_vad_probability
                    if prob < 0.15:
                        logger.debug(
                            f"VAD probability dropped to {prob:.3f} during "
                            f"interruption monitoring, resetting elapsed timer"
                        )
                        elapsed = 0.0

            if self.agent and self.agent.session and self.agent.session.current_utterance:
                if self.agent.session.current_utterance.is_interruptible:
                    prob = 0.0
                    if self.speech_understanding:
                        prob = self.speech_understanding.current_vad_probability
                    if self.interrupt_min_confidence > 0.0 and prob < self.interrupt_min_confidence:
                        logger.info(
                            f"[orchestrator] VAD probability {prob:.3f} below "
                            f"interrupt_min_confidence {self.interrupt_min_confidence:.3f} "
                            f"at trigger time, aborting interruption"
                        )
                        return
                    logger.info(
                        f"User speech exceeded {self.interrupt_min_duration}s "
                        f"threshold (vad_prob={prob:.3f}), triggering interruption"
                    )
                    await self._trigger_interruption()

        except asyncio.CancelledError:
            logger.debug("Interruption monitoring cancelled")
    
    async def handle_stt_event(self, text: str, confidence: float = 1.0) -> bool:
        """Handle STT event for interruption (word-based).

        Only processes interruptions when the agent is actively speaking or generating.
        Respects interrupt_min_words, interrupt_min_confidence, and interrupt_mode configuration.

        Returns:
            True if interruption was triggered, False otherwise.
        """
        if not text or not text.strip():
            return False

        # Only consider interruption when agent is actively speaking or thinking
        agent_state = self.agent.session.agent_state if self.agent and self.agent.session else None
        if agent_state not in (AgentState.SPEAKING, AgentState.THINKING):
            return False

        if self.interrupt_min_confidence > 0.0 and confidence < self.interrupt_min_confidence:
            logger.info(
                f"[orchestrator] Ignoring low-confidence transcript "
                f"(confidence={confidence:.3f} < {self.interrupt_min_confidence:.3f}): {text!r}"
            )
            return False

        word_count = len(text.strip().split())
        
        # Debug: log exact state of current_utterance
        if self.agent and self.agent.session and self.agent.session.current_utterance:
            cu = self.agent.session.current_utterance
            logger.info(f"[orchestrator] handle_stt_event: word_count={word_count}, min_words={self.interrupt_min_words}, mode={self.interrupt_mode}, utterance_id={cu.id}, interruptible={cu.is_interruptible}, done={cu.done()}")
        else:
            logger.info(f"[orchestrator] handle_stt_event: word_count={word_count}, min_words={self.interrupt_min_words}, mode={self.interrupt_mode}, no current_utterance")
        
        if self.resume_on_false_interrupt and self._is_in_false_interrupt_pause and word_count >= self.interrupt_min_words:
            logger.info(f"STT transcript received while in paused state, confirming real interruption")
            self._cancel_false_interrupt_timer()
            self._is_in_false_interrupt_pause = False
            self._false_interrupt_paused_speech = False
            await self._interrupt_pipeline()
            return True
        
        if self.interrupt_mode in ("STT_ONLY", "HYBRID"):
            if word_count >= self.interrupt_min_words:
                if self.agent and self.agent.session and self.agent.session.current_utterance:
                    if self.agent.session.current_utterance.is_interruptible:
                        logger.info(f"[orchestrator] Word count {word_count} >= min_words {self.interrupt_min_words}, triggering interruption")
                        await self._trigger_interruption()
                        return True
                    else:
                        logger.info(f"[orchestrator] Word count threshold met but utterance is not interruptible")
            else:
                logger.info(f"[orchestrator] Word count {word_count} < min_words {self.interrupt_min_words}, ignoring")
        
        return False
    
    async def _trigger_interruption(self) -> None:
        """Trigger interruption with optional pause/resume support"""
        if self._is_interrupted:
            return
        
        if self.agent and self.agent.session and self.agent.session.current_utterance:
            if not self.agent.session.current_utterance.is_interruptible:
                logger.info("Interruption disabled for current utterance")
                return
        
        self._is_interrupted = True
        
        can_resume = self.resume_on_false_interrupt and self.speech_generation and self.speech_generation.can_pause()
        
        if can_resume:
            logger.info("Pausing TTS for potential resume")
            self._false_interrupt_paused_speech = True
            self._is_in_false_interrupt_pause = True
            if self.speech_generation:
                await self.speech_generation.pause()
            self._start_false_interrupt_timer()
        else:
            logger.info("Performing full interruption")
            await self._interrupt_pipeline()
    
    def _start_false_interrupt_timer(self):
        """Start timer to detect false interrupts"""
        if self._false_interrupt_timer:
            self._false_interrupt_timer.cancel()
        
        if self.false_interrupt_pause_duration is None:
            return
        
        logger.info(f"Starting false interrupt timer for {self.false_interrupt_pause_duration}s")
        loop = asyncio.get_event_loop()
        self._false_interrupt_timer = loop.call_later(
            self.false_interrupt_pause_duration,
            lambda: asyncio.create_task(self._on_false_interrupt_timeout())
        )
    
    def _cancel_false_interrupt_timer(self):
        """Cancel false interrupt timer"""
        if self._false_interrupt_timer:
            logger.info("Cancelling false interrupt timer")
            self._false_interrupt_timer.cancel()
            self._false_interrupt_timer = None
    
    async def _on_false_interrupt_timeout(self):
        """Handle false interrupt timeout.

        Uses real-time VAD probability (via FRAME_PROCESSED) to
        distinguish genuine speech from transient noise.
        """
        logger.info(f"False interrupt timeout reached after {self.false_interrupt_pause_duration}s")
        self._false_interrupt_timer = None

        vad_prob = 0.0
        vad_energy = 0.0
        if self.speech_understanding:
            vad_prob = self.speech_understanding.current_vad_probability
            vad_energy = self.speech_understanding.current_vad_energy

        if self._is_user_speaking:
            if vad_prob >= 0.3:
                logger.info(
                    f"User still speaking (vad_prob={vad_prob:.3f}, "
                    f"energy={vad_energy:.4f}) - confirming real interruption"
                )
                self._is_in_false_interrupt_pause = False
                self._false_interrupt_paused_speech = False
                await self._interrupt_pipeline()
                return
            else:
                logger.info(
                    f"User flagged as speaking but vad_prob={vad_prob:.3f} "
                    f"is low - treating as false interruption"
                )

        if self._is_in_false_interrupt_pause and self.speech_generation and self.speech_generation.can_pause():
            logger.info("Resuming agent speech - false interruption detected")
            self._is_interrupted = False
            self._is_in_false_interrupt_pause = False
            self._false_interrupt_paused_speech = False
            await self.speech_generation.resume()
    
    async def _interrupt_pipeline(self) -> None:
        """Interrupt all components"""
        logger.info(
            "[ctx-add] _interrupt_pipeline ENTER (gen_id=%s already_interrupted=%s partial_len=%d)",
            self._generation_id, self._is_interrupted, len(self._partial_response or ""),
        )
        self.agent.session._emit_agent_state(AgentState.LISTENING)
        self._is_interrupted = True
        self._cancel_false_interrupt_timer()
        self._is_in_false_interrupt_pause = False
        self._false_interrupt_paused_speech = False
        metrics_collector.on_interrupted()

        if self.agent and self.agent.session and self.agent.session.current_utterance:
            if self.agent.session.current_utterance.is_interruptible:
                self.agent.session.current_utterance.interrupt()

        cleanup_awaitables = []
        if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled:
            cleanup_awaitables.append(self.agent.session.stop_thinking_audio())
        if self.speech_generation:
            cleanup_awaitables.append(self.speech_generation.interrupt())
        if self.content_generation:
            cleanup_awaitables.append(self.content_generation.cancel())

        if cleanup_awaitables:
            await asyncio.gather(*cleanup_awaitables, return_exceptions=True)

        if self._current_generation_task and not self._current_generation_task.done():
            self._current_generation_task.cancel()

        truncated_response = ""
        source = "none"
        sg_spoken_len = len(self.speech_generation.spoken_transcript) if self.speech_generation and self.speech_generation.spoken_transcript else 0
        partial_len = len(self._partial_response or "")
        if self.speech_generation and self.speech_generation.spoken_transcript:
            truncated_response = self.speech_generation.spoken_transcript
            source = "spoken_transcript"
        elif self._partial_response:
            truncated_response = self._partial_response
            source = "partial_response(LLM)"
        logger.info(
            "[ctx-add] INTERRUPT source=%s spoken_len=%d partial_len=%d chosen_len=%d preview=%r",
            source, sg_spoken_len, partial_len, len(truncated_response), truncated_response[:80],
        )

        if truncated_response and metrics_collector.current_turn:
            if not metrics_collector.current_turn.agent_speech:
                metrics_collector.set_agent_response(truncated_response)
            else:
                metrics_collector.emit_agent_transcript_transport(
                    metrics_collector.current_turn.agent_speech, type="final"
                )

        if truncated_response and self.agent:
            last = self.agent.chat_context.items[-1] if self.agent.chat_context.items else None
            last_info = (
                f"role={getattr(last, 'role', None)} id={getattr(last, 'id', None)} "
                f"len={len(getattr(last, 'content', '') or '') if isinstance(getattr(last, 'content', None), str) else 'list'}"
                if last else "none"
            )
            logger.info(
                "[ctx-add] SITE-B interrupt add (source=%s len=%d last_item=%s)",
                source, len(truncated_response), last_info,
            )
            msg = self.agent.chat_context.add_message(
                role=ChatRole.ASSISTANT,
                content=truncated_response
            )
            msg.interrupted = True
            logger.info("[ctx-add] SITE-B added msg id=%s interrupted=True", msg.id)

        if metrics_collector.current_turn:
            logger.info("[orchestrator] Completing interrupted turn")
            metrics_collector.complete_turn()

        self._partial_response = ""

    async def interrupt(self) -> None:
        """Public method to interrupt the pipeline"""
        await self._interrupt_pipeline()
    
    async def _cancel_preemptive_generation(self) -> None:
        """Cancel preemptive generation.

        If `_interrupt_pipeline()` already ran this turn (`_is_interrupted` set),
        skip the redundant content/speech cleanup it already did and only do the
        preemptive-task-specific work. Cuts ~200-500ms off the post-interrupt
        path when a barge-in lands during preemptive generation.
        """
        logger.info("Cancelling preemptive generation")
        self._preemptive_cancelled = True
        self._preemptive_authorized.set()

        if self._preemptive_generation_task and not self._preemptive_generation_task.done():
            self._preemptive_generation_task.cancel()
            try:
                await self._preemptive_generation_task
            except asyncio.CancelledError:
                logger.info("Preemptive task cancelled successfully")

        self._preemptive_generation_task = None

        if not self._is_interrupted:
            redundant_cleanups = []
            if self.content_generation:
                redundant_cleanups.append(self.content_generation.cancel())
            if self.speech_generation:
                redundant_cleanups.append(self.speech_generation.interrupt())
            if redundant_cleanups:
                await asyncio.gather(*redundant_cleanups, return_exceptions=True)

            if self._current_generation_task and not self._current_generation_task.done():
                self._current_generation_task.cancel()

        if self.speech_understanding:
            self.speech_understanding.clear_preemptive_state()

        logger.info("Preemptive generation cancelled")
    
    async def _run_vmd_check(self) -> None:
        """Run voicemail detection check"""
        try:
            if not self.voice_mail_detector:
                return
            
            await asyncio.sleep(self.voice_mail_detector.duration)
            
            is_voicemail = await self.voice_mail_detector.detect(self._vmd_buffer.strip())
            self.voice_mail_detection_done = True
            
            if is_voicemail:
                await self._interrupt_pipeline()
            
            self.emit("voicemail_result", {"is_voicemail": is_voicemail})
        
        except Exception as e:
            logger.error(f"Error in VMD check: {e}")
            self.voice_mail_detection_done = True
            self.emit("voicemail_result", {"is_voicemail": False})
        
        finally:
            self._vmd_check_task = None
            self._vmd_buffer = ""
    
    async def cleanup(self) -> None:
        """Cleanup all components"""
        logger.info("Cleaning up pipeline orchestrator")
        
        if self._vmd_check_task and not self._vmd_check_task.done():
            self._vmd_check_task.cancel()
        
        if self._false_interrupt_timer:
            self._false_interrupt_timer.cancel()
        
        if self.speech_understanding:
            await self.speech_understanding.cleanup()
        
        if self.content_generation:
            await self.content_generation.cleanup()
        
        if self.speech_generation:
            await self.speech_generation.cleanup()
        
        self.agent = None
        self.avatar = None
        self.graph_adapter = None
        self.voice_mail_detector = None
        
        logger.info("Pipeline orchestrator cleaned up")

Orchestrates the execution of speech understanding, content generation, and speech generation components.

Supports various component chains: 1. Full Chain: VAD → STT → TurnD → LLM → TTS 2. No TTS: VAD → STT → TurnD → LLM (text output) 3. No STT: LLM → TTS (text input) 4. Hybrid: VAD → STT → [User Processing] → TTS

Events: - transcript_ready: Transcript is ready for processing - content_generated: LLM has generated content - synthesis_complete: TTS synthesis is complete - voicemail_result: Voicemail detection result - error: Error occurred in pipeline

Ancestors

Methods

async def cleanup(self) ‑> None
Expand source code
async def cleanup(self) -> None:
    """Cleanup all components"""
    logger.info("Cleaning up pipeline orchestrator")
    
    if self._vmd_check_task and not self._vmd_check_task.done():
        self._vmd_check_task.cancel()
    
    if self._false_interrupt_timer:
        self._false_interrupt_timer.cancel()
    
    if self.speech_understanding:
        await self.speech_understanding.cleanup()
    
    if self.content_generation:
        await self.content_generation.cleanup()
    
    if self.speech_generation:
        await self.speech_generation.cleanup()
    
    self.agent = None
    self.avatar = None
    self.graph_adapter = None
    self.voice_mail_detector = None
    
    logger.info("Pipeline orchestrator cleaned up")

Cleanup all components

def get_latest_transcript(self) ‑> str
Expand source code
def get_latest_transcript(self) -> str:
    """Get the latest accumulated transcript (for hybrid scenarios)"""
    if self.speech_understanding:
        return self.speech_understanding._accumulated_transcript
    return ""

Get the latest accumulated transcript (for hybrid scenarios)

async def handle_stt_event(self, text: str, confidence: float = 1.0) ‑> bool
Expand source code
async def handle_stt_event(self, text: str, confidence: float = 1.0) -> bool:
    """Handle STT event for interruption (word-based).

    Only processes interruptions when the agent is actively speaking or generating.
    Respects interrupt_min_words, interrupt_min_confidence, and interrupt_mode configuration.

    Returns:
        True if interruption was triggered, False otherwise.
    """
    if not text or not text.strip():
        return False

    # Only consider interruption when agent is actively speaking or thinking
    agent_state = self.agent.session.agent_state if self.agent and self.agent.session else None
    if agent_state not in (AgentState.SPEAKING, AgentState.THINKING):
        return False

    if self.interrupt_min_confidence > 0.0 and confidence < self.interrupt_min_confidence:
        logger.info(
            f"[orchestrator] Ignoring low-confidence transcript "
            f"(confidence={confidence:.3f} < {self.interrupt_min_confidence:.3f}): {text!r}"
        )
        return False

    word_count = len(text.strip().split())
    
    # Debug: log exact state of current_utterance
    if self.agent and self.agent.session and self.agent.session.current_utterance:
        cu = self.agent.session.current_utterance
        logger.info(f"[orchestrator] handle_stt_event: word_count={word_count}, min_words={self.interrupt_min_words}, mode={self.interrupt_mode}, utterance_id={cu.id}, interruptible={cu.is_interruptible}, done={cu.done()}")
    else:
        logger.info(f"[orchestrator] handle_stt_event: word_count={word_count}, min_words={self.interrupt_min_words}, mode={self.interrupt_mode}, no current_utterance")
    
    if self.resume_on_false_interrupt and self._is_in_false_interrupt_pause and word_count >= self.interrupt_min_words:
        logger.info(f"STT transcript received while in paused state, confirming real interruption")
        self._cancel_false_interrupt_timer()
        self._is_in_false_interrupt_pause = False
        self._false_interrupt_paused_speech = False
        await self._interrupt_pipeline()
        return True
    
    if self.interrupt_mode in ("STT_ONLY", "HYBRID"):
        if word_count >= self.interrupt_min_words:
            if self.agent and self.agent.session and self.agent.session.current_utterance:
                if self.agent.session.current_utterance.is_interruptible:
                    logger.info(f"[orchestrator] Word count {word_count} >= min_words {self.interrupt_min_words}, triggering interruption")
                    await self._trigger_interruption()
                    return True
                else:
                    logger.info(f"[orchestrator] Word count threshold met but utterance is not interruptible")
        else:
            logger.info(f"[orchestrator] Word count {word_count} < min_words {self.interrupt_min_words}, ignoring")
    
    return False

Handle STT event for interruption (word-based).

Only processes interruptions when the agent is actively speaking or generating. Respects interrupt_min_words, interrupt_min_confidence, and interrupt_mode configuration.

Returns

True if interruption was triggered, False otherwise.

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Public method to interrupt the pipeline"""
    await self._interrupt_pipeline()

Public method to interrupt the pipeline

async def process_audio(self, audio_data: bytes) ‑> None
Expand source code
async def process_audio(self, audio_data: bytes) -> None:
    """
    Process incoming audio through the pipeline.
    
    Args:
        audio_data: Raw audio bytes
    """
    if self.speech_understanding:
        await self.speech_understanding.process_audio(audio_data)

Process incoming audio through the pipeline.

Args

audio_data
Raw audio bytes
async def process_text(self, text: str) ‑> None
Expand source code
async def process_text(self, text: str) -> None:
    """
    Process text input directly (bypasses STT).

    Args:
        text: User text input
    """
    if not self.agent:
        logger.warning("No agent available for text processing")
        return

    metrics_collector.start_turn()
    metrics_collector.set_user_transcript(text)

    if self.graph_adapter:
        self.graph_adapter.set_agent(self.agent)
        graph_prompt, skip_llm = await self.graph_adapter.handle_input(text)
        if skip_llm:
            return
        if graph_prompt is not None:
            text = graph_prompt

    self.agent.chat_context.add_message(
        role=ChatRole.USER,
        content=text
    )

    if not self.speech_understanding:
        metrics_collector.on_user_speech_start()
        metrics_collector.on_user_speech_end()
        
        if not self.speech_generation:
            metrics_collector.on_agent_speech_start()

    if self.content_generation:
        self.agent.session._emit_agent_state(AgentState.THINKING)
        full_response = ""
        graph_response_text = None
        async for response_chunk in self.content_generation.generate(text):
            if response_chunk.content:
                full_response += response_chunk.content
            if self.graph_adapter and response_chunk.metadata:
                if response_chunk.metadata.get("graph_response"):
                    graph_response_text = response_chunk.metadata.get("graph_response")

        if self.graph_adapter and graph_response_text:
            new_response, skip = await self.graph_adapter.handle_decision(
                self.agent, graph_response_text
            )
            if skip:
                return
            if new_response is not None:
                full_response = new_response

        if full_response:
            self.agent.chat_context.add_message(
                role=ChatRole.ASSISTANT,
                content=full_response
            )

            self.emit("content_generated", {"text": full_response})

            if self.hooks and self.hooks.has_llm_hooks():
                try:
                    await self.hooks.trigger_llm({"text": full_response})
                except Exception as e:
                    logger.error(f"Error in legacy LLM hook: {e}", exc_info=True)

            tts_text = full_response
            if self.hooks and self.hooks.has_llm_stream_hook():
                async def _single_chunk():
                    yield full_response
                parts = []
                async for chunk in self.hooks.process_llm_stream(_single_chunk()):
                    parts.append(chunk)
                if parts:
                    tts_text = "".join(parts)

            if self.speech_generation:
                await self.speech_generation.synthesize(tts_text)
            else:
                # No TTS - complete the turn after LLM
                self.agent.session._emit_agent_state(AgentState.IDLE)
                self.agent.session._emit_user_state(UserState.IDLE)
                metrics_collector.on_agent_speech_end()
                metrics_collector.set_agent_response(full_response)
                metrics_collector.complete_turn()

            self.emit("synthesis_complete", {})

Process text input directly (bypasses STT).

Args

text
User text input
async def reply_with_context(self,
instructions: str,
wait_for_playback: bool,
handle: UtteranceHandle,
frames: list[av.VideoFrame] | None = None) ‑> None
Expand source code
async def reply_with_context(
    self,
    instructions: str,
    wait_for_playback: bool,
    handle: UtteranceHandle,
    frames: list[av.VideoFrame] | None = None
) -> None:
    """
    Generate a reply using instructions and current chat context.
    
    Args:
        instructions: Instructions to add to chat context
        wait_for_playback: If True, disable VAD/STT during response
        handle: Utterance handle
        frames: Optional video frames for vision
    """
    if not self.agent:
        handle._mark_done()
        return
    
    original_handlers = {}
    if wait_for_playback and self.speech_understanding:
        if self.speech_understanding.vad:
            original_handlers['vad'] = self.speech_understanding._on_vad_event
            self.speech_understanding._on_vad_event = lambda x: None
        
        if self.speech_understanding.stt:
            original_handlers['stt'] = self.speech_understanding._on_stt_transcript
            self.speech_understanding._on_stt_transcript = lambda x: None
    
    try:
        context_prefix = None
        if self.agent.knowledge_base:
            kb_context = await self.agent.knowledge_base.process_query(instructions)
            if kb_context:
                context_prefix = kb_context

        content_parts = [instructions]
        if frames:
            for frame in frames:
                image_part = ImageContent(image=frame, inference_detail="auto")
                content_parts.append(image_part)

        self.agent.chat_context.add_message(
            role=ChatRole.USER,
            content=content_parts if len(content_parts) > 1 else instructions
        )

        await self._generate_and_synthesize(instructions, handle, context_prefix=context_prefix)
    
    finally:
        if wait_for_playback and self.speech_understanding:
            if 'vad' in original_handlers:
                self.speech_understanding._on_vad_event = original_handlers['vad']
            if 'stt' in original_handlers:
                self.speech_understanding._on_stt_transcript = original_handlers['stt']
        
        if not handle.done():
            handle._mark_done()

Generate a reply using instructions and current chat context.

Args

instructions
Instructions to add to chat context
wait_for_playback
If True, disable VAD/STT during response
handle
Utterance handle
frames
Optional video frames for vision
async def say(self,
message: str,
handle: UtteranceHandle,
audio_data: bytes | bytearray | Iterable[bytes] | AsyncIterator[bytes] | None = None) ‑> None
Expand source code
async def say(
    self,
    message: str,
    handle: UtteranceHandle,
    audio_data: bytes | bytearray | Iterable[bytes] | AsyncIterator[bytes] | None = None,
) -> None:
    """
    Direct TTS synthesis (for initial messages).

    Args:
        message: Message to synthesize
        handle: Utterance handle to track
        audio_data: Optional pre-synthesized PCM bytes. When provided,
            bypasses TTS synthesis entirely and streams these bytes to
            the agent audio track. Chunker and TTS text filter are
            also skipped on this path.
    """
    if not self.speech_generation:
        handle._mark_done()
        return

    playback_done = asyncio.Event()

    def _on_playback_done(_data: Any = None) -> None:
        playback_done.set()

    self.speech_generation.on("last_audio_byte", _on_playback_done)
    self.speech_generation.on("synthesis_interrupted", _on_playback_done)

    try:
        if audio_data is not None:
            metrics_collector.set_agent_response(message, emit_transport=True)
            await self.speech_generation.synthesize_audio_bytes(
                audio_data, message, handle=handle
            )
            await playback_done.wait()
            return

        if not (self.hooks and self.hooks.has_tts_stream_hook()):
            tts = self.speech_generation.tts
            emit = not bool(getattr(tts, "supports_word_timestamps", False))
            metrics_collector.set_agent_response(message, emit_transport=emit)

        if self._chunker is not None or self._text_filter is not None:
            async def _string_iter() -> AsyncIterator[str]:
                yield message
            text_source: AsyncIterator[Any] = _string_iter()
            if self._text_filter is not None:
                text_source = self._text_filter.filter(text_source)
            if self._chunker is not None:
                text_source = _pipe_through_chunk_stream(
                    text_source,
                    self._chunker,
                    language=self._chunking_language,
                )
            await self.speech_generation.synthesize(text_source)
        else:
            await self.speech_generation.synthesize(message)

        await playback_done.wait()
    finally:
        self.speech_generation.off("last_audio_byte", _on_playback_done)
        self.speech_generation.off("synthesis_interrupted", _on_playback_done)
        handle._mark_done()

Direct TTS synthesis (for initial messages).

Args

message
Message to synthesize
handle
Utterance handle to track
audio_data
Optional pre-synthesized PCM bytes. When provided, bypasses TTS synthesis entirely and streams these bytes to the agent audio track. Chunker and TTS text filter are also skipped on this path.
def set_audio_track(self, audio_track: Any) ‑> None
Expand source code
def set_audio_track(self, audio_track: Any) -> None:
    """Set audio track for TTS output"""
    if self.speech_generation:
        self.speech_generation.set_audio_track(audio_track)
        
    if audio_track is not None and hasattr(audio_track, "interrupt_fade_duration"):
        audio_track.interrupt_fade_duration = self.interrupt_fade_duration

Set audio track for TTS output

def set_voice_mail_detector(self, detector: VoiceMailDetector | None) ‑> None
Expand source code
def set_voice_mail_detector(self, detector: VoiceMailDetector | None) -> None:
    """Configure voicemail detection"""
    self.voice_mail_detector = detector
    self.voice_mail_detection_done = False
    self._vmd_buffer = ""

Configure voicemail detection

async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    """Start all components"""
    if self.speech_understanding:
        await self.speech_understanding.start()
    if self.content_generation:
        await self.content_generation.start()
    if self.speech_generation:
        await self.speech_generation.start()
    
    logger.info("PipelineOrchestrator started")

Start all components

Inherited members