Module agents.pipeline_orchestrator

Classes

class PipelineOrchestrator (agent: Agent | None = None,
stt: STT | None = None,
llm: LLM | None = None,
tts: TTS | None = None,
vad: VAD | None = None,
turn_detector: EOU | None = None,
denoise: Denoise | None = None,
avatar: Any | None = None,
mode: "Literal['ADAPTIVE', 'DEFAULT']" = 'DEFAULT',
min_speech_wait_timeout: tuple[float, float] = (0.5, 0.8),
interrupt_mode: "Literal['VAD_ONLY', 'STT_ONLY', 'HYBRID']" = 'HYBRID',
interrupt_min_duration: float = 0.5,
interrupt_min_words: int = 2,
false_interrupt_pause_duration: float = 2.0,
resume_on_false_interrupt: bool = False,
conversational_graph: Any | None = None,
max_context_items: int | None = None,
voice_mail_detector: VoiceMailDetector | None = None,
hooks: "'PipelineHooks | None'" = None)
Expand source code
class PipelineOrchestrator(EventEmitter[Literal[
    "transcript_ready", 
    "content_generated", 
    "synthesis_complete",
    "voicemail_result",
    "error"
]]):
    """
    Orchestrates the execution of speech understanding, content generation, and speech generation components.
    
    Supports various component chains:
    1. Full Chain: VAD → STT → TurnD → LLM → TTS
    2. No TTS: VAD → STT → TurnD → LLM (text output)
    3. No STT: LLM → TTS (text input)
    4. Hybrid: VAD → STT → [User Processing] → TTS
    
    Events:
    - transcript_ready: Transcript is ready for processing
    - content_generated: LLM has generated content
    - synthesis_complete: TTS synthesis is complete
    - voicemail_result: Voicemail detection result
    - error: Error occurred in pipeline
    """
    
    def __init__(
        self,
        agent: Agent | None = None,
        stt: STT | None = None,
        llm: LLM | None = None,
        tts: TTS | None = None,
        vad: VAD | None = None,
        turn_detector: EOU | None = None,
        denoise: Denoise | None = None,
        avatar: Any | None = None,
        mode: Literal["ADAPTIVE", "DEFAULT"] = "DEFAULT",
        min_speech_wait_timeout: tuple[float, float] = (0.5, 0.8),
        interrupt_mode: Literal["VAD_ONLY", "STT_ONLY", "HYBRID"] = "HYBRID",
        interrupt_min_duration: float = 0.5,
        interrupt_min_words: int = 2,
        false_interrupt_pause_duration: float = 2.0,
        resume_on_false_interrupt: bool = False,
        conversational_graph: Any | None = None,
        max_context_items: int | None = None,
        voice_mail_detector: VoiceMailDetector | None = None,
        hooks: "PipelineHooks | None" = None,
    ) -> None:
        super().__init__()
        
        self.agent = agent
        self.avatar = avatar
        self.conversational_graph = conversational_graph
        self.voice_mail_detector = voice_mail_detector
        self.voice_mail_detection_done = False
        self._vmd_buffer = ""
        self._vmd_check_task: asyncio.Task | None = None
        self.hooks = hooks
        
        # Interruption configuration
        self.interrupt_mode = interrupt_mode
        self.interrupt_min_duration = interrupt_min_duration
        self.interrupt_min_words = interrupt_min_words
        self.false_interrupt_pause_duration = false_interrupt_pause_duration
        self.resume_on_false_interrupt = resume_on_false_interrupt
        
        # Interruption state
        self._generation_id = 0
        self._is_interrupted = False
        self._is_in_false_interrupt_pause = False
        self._false_interrupt_paused_speech = False
        self._false_interrupt_timer: asyncio.TimerHandle | None = None
        self._is_user_speaking = False
        self._interruption_check_task: asyncio.Task | None = None
        
        # Component modules
        self.speech_understanding: SpeechUnderstanding | None = None
        self.content_generation: ContentGeneration | None = None
        self.speech_generation: SpeechGeneration | None = None
        
        # Initialize components based on what's available
        if stt or vad or turn_detector:
            self.speech_understanding = SpeechUnderstanding(
                agent=agent,
                stt=stt,
                vad=vad,
                turn_detector=turn_detector,
                denoise=denoise,
                mode=mode,
                min_speech_wait_timeout=min_speech_wait_timeout[0],
                max_speech_wait_timeout=min_speech_wait_timeout[1],
                hooks=hooks,
            )
            # Setup event listeners with sync wrappers
            self.speech_understanding.on("transcript_final", self._wrap_async(self._on_transcript_final))
            self.speech_understanding.on("transcript_preflight", self._wrap_async(self._on_transcript_preflight))
            self.speech_understanding.on("transcript_interim", self._wrap_async(self._on_transcript_interim))
            self.speech_understanding.on("speech_started", self._wrap_async(self._on_speech_started))
            self.speech_understanding.on("speech_stopped", self._wrap_async(self._on_speech_stopped))
            self.speech_understanding.on("turn_resumed", self._wrap_async(self._on_turn_resumed))
        
        if llm:
            self.content_generation = ContentGeneration(
                agent=agent,
                llm=llm,
                conversational_graph=conversational_graph,
                max_context_items=max_context_items,
            )
            # Setup event listeners
            self.content_generation.on("generation_started", lambda data: logger.info("Content generation started"))
            self.content_generation.on("generation_chunk", lambda data: None)
            self.content_generation.on("generation_complete", lambda data: logger.info("Content generation complete"))
        
        if tts:
            self.speech_generation = SpeechGeneration(
                agent=agent,
                tts=tts,
                avatar=avatar,
                hooks=hooks,
            )
            # Setup event listeners
            self.speech_generation.on("synthesis_started", lambda data: logger.info("Speech synthesis started"))
            self.speech_generation.on("first_audio_byte", lambda data: logger.info("First audio byte ready"))
            self.speech_generation.on("last_audio_byte", lambda data: logger.info("Synthesis complete"))
            self.speech_generation.on("synthesis_interrupted", lambda data: logger.info("Synthesis interrupted"))
        
        # Generation tasks
        self._current_generation_task: asyncio.Task | None = None
        self._partial_response = ""
        
        # Preemptive generation
        self._preemptive_generation_task: asyncio.Task | None = None
        self._preemptive_authorized = asyncio.Event()
        self._preemptive_cancelled = False
    
    def _wrap_async(self, async_func):
        """
        Wrap an async function to be compatible with EventEmitter's sync-only handlers.
        
        Args:
            async_func: The async function to wrap
            
        Returns:
            A sync function that schedules the async function as a task
        """
        def sync_wrapper(*args, **kwargs):
            asyncio.create_task(async_func(*args, **kwargs))
        return sync_wrapper
    
    def set_audio_track(self, audio_track: Any) -> None:
        """Set audio track for TTS output"""
        if self.speech_generation:
            self.speech_generation.set_audio_track(audio_track)
    
    def set_voice_mail_detector(self, detector: VoiceMailDetector | None) -> None:
        """Configure voicemail detection"""
        self.voice_mail_detector = detector
        self.voice_mail_detection_done = False
        self._vmd_buffer = ""
    
    async def start(self) -> None:
        """Start all components"""
        if self.speech_understanding:
            await self.speech_understanding.start()
        if self.content_generation:
            await self.content_generation.start()
        if self.speech_generation:
            await self.speech_generation.start()
        
        logger.info("PipelineOrchestrator started")
    
    async def process_audio(self, audio_data: bytes) -> None:
        """
        Process incoming audio through the pipeline.
        
        Args:
            audio_data: Raw audio bytes
        """
        if self.speech_understanding:
            await self.speech_understanding.process_audio(audio_data)
    
    async def process_text(self, text: str) -> None:
        """
        Process text input directly (bypasses STT).

        Args:
            text: User text input
        """
        if not self.agent:
            logger.warning("No agent available for text processing")
            return

        metrics_collector.start_turn()
        metrics_collector.set_user_transcript(text)

        self.agent.chat_context.add_message(
            role=ChatRole.USER,
            content=text
        )

        if not self.speech_understanding:
            metrics_collector.on_user_speech_start()
            metrics_collector.on_user_speech_end()
            
            if not self.speech_generation:
                metrics_collector.on_agent_speech_start()

        if self.content_generation:
            self.agent.session._emit_agent_state(AgentState.THINKING)
            full_response = ""
            async for response_chunk in self.content_generation.generate(text):
                if response_chunk.content:
                    full_response += response_chunk.content

            if full_response:
                self.agent.chat_context.add_message(
                    role=ChatRole.ASSISTANT,
                    content=full_response
                )

                if self.hooks and self.hooks.has_llm_hooks():
                    modified = await self.hooks.trigger_llm({"text": full_response})
                    if modified is not None:
                        full_response = modified

                self.emit("content_generated", {"text": full_response})

                if self.speech_generation:
                    await self.speech_generation.synthesize(full_response)
                else:
                    # No TTS - complete the turn after LLM
                    self.agent.session._emit_agent_state(AgentState.IDLE)
                    self.agent.session._emit_user_state(UserState.IDLE)
                    metrics_collector.on_agent_speech_end()
                    metrics_collector.set_agent_response(full_response)
                    metrics_collector.complete_turn()

                self.emit("synthesis_complete", {})
    
    def get_latest_transcript(self) -> str:
        """Get the latest accumulated transcript (for hybrid scenarios)"""
        if self.speech_understanding:
            return self.speech_understanding._accumulated_transcript
        return ""
    
    async def _on_transcript_final(self, data: dict) -> None:
        """Handle final transcript from speech understanding"""
        text = data["text"]
        is_preemptive = data.get("is_preemptive", False)

        agent_state = self.agent.session.agent_state if self.agent and self.agent.session else None
        is_agent_active = agent_state in (AgentState.SPEAKING, AgentState.THINKING)

        if is_agent_active:
            word_count = len(text.strip().split()) if text.strip() else 0
            logger.info(f"[orchestrator] Final transcript while agent is {agent_state.value} | word_count={word_count}, min_words={self.interrupt_min_words}")

            if word_count < self.interrupt_min_words:
                logger.info(f"[orchestrator] Transcript '{text}' below min_words threshold ({word_count} < {self.interrupt_min_words}), dropping")
                return

            # Word count threshold met — check if utterance is interruptible
            if self.agent.session.current_utterance and not self.agent.session.current_utterance.is_interruptible:
                logger.info(f"[orchestrator] Transcript '{text}' meets min_words but utterance is not interruptible, dropping")
                return

            # Interrupt the current pipeline and start a new turn
            logger.info(f"[orchestrator] Word count {word_count} >= min_words {self.interrupt_min_words}, interrupting pipeline")
            await self._interrupt_pipeline()

            metrics_collector.start_turn()
            metrics_collector.set_user_transcript(text)

        if self.voice_mail_detector and not self.voice_mail_detection_done and text.strip():
            self._vmd_buffer += f" {text}"
            if not self._vmd_check_task:
                logger.info("Starting Voice Mail Detection Timer")
                self._vmd_check_task = asyncio.create_task(self._run_vmd_check())
        
        self.emit("transcript_ready", {
            "text": text,
            "is_final": True,
            "is_preemptive": is_preemptive,
            "metadata": data.get("metadata", {})
        })
        
        if is_preemptive:
            await self._handle_preemptive_final(text)
        else:
            await self._process_final_transcript(text)
    
    async def _on_transcript_preflight(self, data: dict) -> None:
        """Handle preflight transcript for preemptive generation"""
        preflight_text = data["text"]

        if self.agent and self.content_generation:
            user_text = preflight_text
            if self.agent.knowledge_base:
                kb_context = await self.agent.knowledge_base.process_query(preflight_text)
                if kb_context:
                    user_text = f"{kb_context}\n\nUser: {preflight_text}"
            
            self.agent.chat_context.add_message(
                role=ChatRole.USER,
                content=user_text
            )
            
            if self.agent.session:
                if self.agent.session.current_utterance and not self.agent.session.current_utterance.done():
                    self.agent.session.current_utterance.interrupt()
                
                handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}")
                self.agent.session.current_utterance = handle
            else:
                handle = UtteranceHandle(utterance_id="utt_fallback")
                handle._mark_done()
            
            self._preemptive_authorized.clear()
            self._preemptive_cancelled = False
            if metrics_collector:
                metrics_collector.on_stt_preflight_end()
            self._preemptive_generation_task = asyncio.create_task(
                self._generate_and_synthesize(user_text, handle, wait_for_authorization=True)
            )
    
    async def _handle_preemptive_final(self, final_text: str) -> None:
        """Handle final transcript when preemptive generation is active"""
        if not self.speech_understanding:
            return
        
        if self.speech_understanding.check_preemptive_match(final_text):
            logger.info("Preemptive generation MATCH - authorizing playback")
            
            self._preemptive_authorized.set()
            
            if self._preemptive_generation_task:
                try:
                    await asyncio.wait_for(self._preemptive_generation_task, timeout=30.0)
                    logger.info("Preemptive generation completed successfully")
                except asyncio.TimeoutError:
                    logger.error("Preemptive playback timeout")
                except Exception as e:
                    logger.error(f"Error in preemptive playback: {e}")
        else:
            logger.info("Preemptive generation MISMATCH - cancelling")
            
            await self._cancel_preemptive_generation()
            
            if self.agent and self.agent.chat_context.messages:
                if self.agent.chat_context.messages[-1].role == ChatRole.USER:
                    self.agent.chat_context.messages.pop()
            
            await self._process_final_transcript(final_text)
        
        self.speech_understanding.clear_preemptive_state()
        self._preemptive_generation_task = None
    
    async def _on_transcript_interim(self, data: dict) -> None:
        """Handle interim transcript — check for STT-based interruption"""
        text = data.get("text", "")
        if text and text.strip():
            await self.handle_stt_event(text)
    
    async def _on_speech_started(self, data: dict) -> None:
        """Handle speech started event"""
        logger.info("[orchestrator] _on_speech_started fired")
        self._is_user_speaking = True

        agent_state = self.agent.session.agent_state if self.agent and self.agent.session else None
        if agent_state == AgentState.SPEAKING:
            if self._interruption_check_task is None or self._interruption_check_task.done():
                logger.info("User started speaking during agent response, initiating interruption monitoring")
                self._interruption_check_task = asyncio.create_task(
                    self._monitor_interruption_duration()
                )
    
    async def _on_speech_stopped(self, data: dict) -> None:
        """Handle speech stopped event"""
        logger.info("[orchestrator] _on_speech_stopped fired")
        self._is_user_speaking = False

        if self._interruption_check_task is not None and not self._interruption_check_task.done():
            logger.info("User stopped speaking, cancelling interruption check")
            self._interruption_check_task.cancel()
    
    async def _on_turn_resumed(self, data: dict) -> None:
        """Handle turn resumed event"""
        await self._cancel_preemptive_generation()
    
    async def _process_final_transcript(self, user_text: str) -> None:
        """Process final transcript through the full pipeline"""
        if not self.agent:
            logger.warning("No agent available")
            return
        
        if self.hooks and self.hooks.has_user_turn_start_hooks():
            await self.hooks.trigger_user_turn_start(user_text)

        final_user_text = user_text
        if self.agent.knowledge_base:
            kb_context = await self.agent.knowledge_base.process_query(user_text)
            if kb_context:
                final_user_text = f"{kb_context}\n\nUser: {user_text}"

        if self.conversational_graph:
            final_user_text = self.conversational_graph.handle_input(user_text)

        if not metrics_collector.current_turn:
            metrics_collector.start_turn()
        metrics_collector.set_user_transcript(user_text)

        self.agent.chat_context.add_message(
            role=ChatRole.USER,
            content=final_user_text
        )
        
        if self.agent.session:
            if self.agent.session.current_utterance and not self.agent.session.current_utterance.done():
                if self.agent.session.current_utterance.is_interruptible:
                    self.agent.session.current_utterance.interrupt()
                else:
                    logger.info("Current utterance is not interruptible")
            
            handle = UtteranceHandle(utterance_id=f"utt_{uuid.uuid4().hex[:8]}")
            self.agent.session.current_utterance = handle
        else:
            handle = UtteranceHandle(utterance_id="utt_fallback")
            handle._mark_done()

        if self._current_generation_task and not self._current_generation_task.done():
            self._current_generation_task.cancel()
        self._current_generation_task = asyncio.create_task(self._generate_and_synthesize(final_user_text, handle))
    
    
    async def _generate_and_synthesize(
        self, 
        user_text: str, 
        handle: UtteranceHandle,
        wait_for_authorization: bool = False
    ) -> None:
        """Generate LLM response and synthesize with TTS"""
        self._generation_id += 1
        my_generation_id = self._generation_id
        self._is_interrupted = False
        full_response = ""
        self._partial_response = ""
        
        try:
            if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled:
                await self.agent.session.start_thinking_audio()

            if self.speech_generation:
                self.speech_generation.reset_interrupt()    
            
            if not self.content_generation:
                logger.warning("No content generation available")
                return
            
            if not self.speech_generation:
                logger.warning("No speech generation available")
                metrics_collector.on_agent_speech_start()
                
            self.agent.session._emit_agent_state(AgentState.THINKING)
            llm_stream = self.content_generation.generate(user_text)
            
            q = asyncio.Queue(maxsize=50)
            
            async def collector():
                """Collect LLM chunks"""
                response_parts = []

                def _safe_set_agent_response(text: str) -> None:
                    """Only set agent response if this collector still owns the current generation."""
                    if text and self._generation_id == my_generation_id and metrics_collector.current_turn:
                        metrics_collector.set_agent_response(text)

                try:
                    async for chunk in llm_stream:
                        if handle.interrupted or (wait_for_authorization and self._preemptive_cancelled):
                            logger.info("LLM collection interrupted")
                            await q.put(None)
                            full = "".join(response_parts)
                            _safe_set_agent_response(full)
                            return full

                        content = chunk.content if hasattr(chunk, "content") else chunk
                        metadata = chunk.metadata if hasattr(chunk, "metadata") else chunk

                        if content:
                            response_parts.append(content)
                            await q.put(content)

                        self._partial_response = "".join(response_parts)

                    if not handle.interrupted:
                        await q.put(None)

                    if self.conversational_graph and metadata.get("graph_response"):
                        _ = await self.conversational_graph.handle_decision(self.agent, metadata.get("graph_response"))

                    full = "".join(response_parts)
                    _safe_set_agent_response(full)
                    return full

                except asyncio.CancelledError:
                    logger.info("LLM collection cancelled")
                    await q.put(None)
                    full = "".join(response_parts)
                    _safe_set_agent_response(full)
                    return full
            
            async def tts_consumer():
                """Consume LLM chunks and send to TTS"""
                if wait_for_authorization:
                    try:
                        await asyncio.wait_for(
                            self._preemptive_authorized.wait(),
                            timeout=10.0
                        )
                        
                        if self._preemptive_cancelled:
                            logger.info("Preemptive generation cancelled during authorization wait")
                            return
                    
                    except asyncio.TimeoutError:
                        logger.error("Authorization timeout - cancelling preemptive generation")
                        self._preemptive_cancelled = True
                        return
                
                async def tts_stream_gen():
                    """Generate TTS stream from queue"""
                    while True:
                        if handle.interrupted or (wait_for_authorization and self._preemptive_cancelled):
                            break
                        
                        try:
                            chunk = await asyncio.wait_for(q.get(), timeout=0.1)
                            if chunk is None:
                                break
                            yield chunk
                            
                        except asyncio.TimeoutError:
                            if handle.interrupted or (wait_for_authorization and self._preemptive_cancelled):
                                break
                            continue
                
                if self.speech_generation:
                    try:
                        await self.speech_generation.synthesize(tts_stream_gen())
                    except asyncio.CancelledError:
                        if self.speech_generation:
                            await self.speech_generation.interrupt()
            
            collector_task = asyncio.create_task(collector())
            tts_task = asyncio.create_task(tts_consumer())
            
            try:
                await asyncio.gather(collector_task, tts_task, return_exceptions=True)
            except asyncio.CancelledError:
                if not collector_task.done():
                    collector_task.cancel()
                if not tts_task.done():
                    tts_task.cancel()
            
            if not collector_task.cancelled() and not self._is_interrupted:
                full_response = collector_task.result()
            else:
                full_response = self._partial_response

            if self._is_interrupted or self._generation_id != my_generation_id:
                logger.info("[orchestrator] Skipping post-gather updates — turn was interrupted or generation superseded")
            elif full_response and self.agent:
                self.agent.chat_context.add_message(
                    role=ChatRole.ASSISTANT,
                    content=full_response
                )
                if self.hooks and self.hooks.has_llm_hooks():
                    modified = await self.hooks.trigger_llm({"text": full_response})
                    if modified is not None:
                        full_response = modified

                self.emit("content_generated", {"text": full_response})

                # For no-TTS modes, complete the turn here since there's no speech_generation
                if not self.speech_generation:
                    self.agent.session._emit_user_state(UserState.IDLE)
                    self.agent.session._emit_agent_state(AgentState.IDLE)
                    metrics_collector.on_agent_speech_end()
                    metrics_collector.complete_turn()

        finally:
            if self.hooks and self.hooks.has_user_turn_end_hooks():
                await self.hooks.trigger_user_turn_end()

            if not handle.done():
                handle._mark_done()

            if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled:
                await self.agent.session.stop_thinking_audio()
    
    async def say(self, message: str, handle: UtteranceHandle) -> None:
        """
        Direct TTS synthesis (for initial messages).
        
        Args:
            message: Message to synthesize
            handle: Utterance handle to track
        """
        if self.speech_generation:
            try:
                metrics_collector.set_agent_response(message)
                await self.speech_generation.synthesize(message)
            finally:
                handle._mark_done()
    
    async def reply_with_context(
        self,
        instructions: str,
        wait_for_playback: bool,
        handle: UtteranceHandle,
        frames: list[av.VideoFrame] | None = None
    ) -> None:
        """
        Generate a reply using instructions and current chat context.
        
        Args:
            instructions: Instructions to add to chat context
            wait_for_playback: If True, disable VAD/STT during response
            handle: Utterance handle
            frames: Optional video frames for vision
        """
        if not self.agent:
            handle._mark_done()
            return
        
        original_handlers = {}
        if wait_for_playback and self.speech_understanding:
            if self.speech_understanding.vad:
                original_handlers['vad'] = self.speech_understanding._on_vad_event
                self.speech_understanding._on_vad_event = lambda x: None
            
            if self.speech_understanding.stt:
                original_handlers['stt'] = self.speech_understanding._on_stt_transcript
                self.speech_understanding._on_stt_transcript = lambda x: None
        
        try:
            final_instructions = instructions
            if self.agent.knowledge_base:
                kb_context = await self.agent.knowledge_base.process_query(instructions)
                if kb_context:
                    final_instructions = f"{kb_context}\n\nUser: {instructions}"
            
            content_parts = [final_instructions]
            if frames:
                for frame in frames:
                    image_part = ImageContent(image=frame, inference_detail="auto")
                    content_parts.append(image_part)
            
            self.agent.chat_context.add_message(
                role=ChatRole.USER,
                content=content_parts if len(content_parts) > 1 else final_instructions
            )
            
            await self._generate_and_synthesize(final_instructions, handle)
        
        finally:
            if wait_for_playback and self.speech_understanding:
                if 'vad' in original_handlers:
                    self.speech_understanding._on_vad_event = original_handlers['vad']
                if 'stt' in original_handlers:
                    self.speech_understanding._on_stt_transcript = original_handlers['stt']
            
            if not handle.done():
                handle._mark_done()
    
    async def _monitor_interruption_duration(self) -> None:
        """Monitor user speech duration during agent response"""
        if self.interrupt_mode not in ("VAD_ONLY", "HYBRID"):
            return
        
        try:
            await asyncio.sleep(self.interrupt_min_duration)
            
            if self.agent and self.agent.session and self.agent.session.current_utterance:
                if self.agent.session.current_utterance.is_interruptible:
                    logger.info(f"User speech duration exceeded {self.interrupt_min_duration}s threshold, triggering interruption")
                    await self._trigger_interruption()
        
        except asyncio.CancelledError:
            logger.debug("Interruption monitoring cancelled")
    
    async def handle_stt_event(self, text: str) -> bool:
        """Handle STT event for interruption (word-based).
        
        Only processes interruptions when the agent is actively speaking or generating.
        Respects interrupt_min_words and interrupt_mode configuration.
        
        Returns:
            True if interruption was triggered, False otherwise.
        """
        if not text or not text.strip():
            return False
        
        # Only consider interruption when agent is actively speaking or thinking
        agent_state = self.agent.session.agent_state if self.agent and self.agent.session else None
        if agent_state not in (AgentState.SPEAKING, AgentState.THINKING):
            return False
        
        word_count = len(text.strip().split())
        
        # Debug: log exact state of current_utterance
        if self.agent and self.agent.session and self.agent.session.current_utterance:
            cu = self.agent.session.current_utterance
            logger.info(f"[orchestrator] handle_stt_event: word_count={word_count}, min_words={self.interrupt_min_words}, mode={self.interrupt_mode}, utterance_id={cu.id}, interruptible={cu.is_interruptible}, done={cu.done()}")
        else:
            logger.info(f"[orchestrator] handle_stt_event: word_count={word_count}, min_words={self.interrupt_min_words}, mode={self.interrupt_mode}, no current_utterance")
        
        if self.resume_on_false_interrupt and self._is_in_false_interrupt_pause and word_count >= self.interrupt_min_words:
            logger.info(f"STT transcript received while in paused state, confirming real interruption")
            self._cancel_false_interrupt_timer()
            self._is_in_false_interrupt_pause = False
            self._false_interrupt_paused_speech = False
            await self._interrupt_pipeline()
            return True
        
        if self.interrupt_mode in ("STT_ONLY", "HYBRID"):
            if word_count >= self.interrupt_min_words:
                if self.agent and self.agent.session and self.agent.session.current_utterance:
                    if self.agent.session.current_utterance.is_interruptible:
                        logger.info(f"[orchestrator] Word count {word_count} >= min_words {self.interrupt_min_words}, triggering interruption")
                        await self._trigger_interruption()
                        return True
                    else:
                        logger.info(f"[orchestrator] Word count threshold met but utterance is not interruptible")
            else:
                logger.info(f"[orchestrator] Word count {word_count} < min_words {self.interrupt_min_words}, ignoring")
        
        return False
    
    async def _trigger_interruption(self) -> None:
        """Trigger interruption with optional pause/resume support"""
        if self._is_interrupted:
            return
        
        if self.agent and self.agent.session and self.agent.session.current_utterance:
            if not self.agent.session.current_utterance.is_interruptible:
                logger.info("Interruption disabled for current utterance")
                return
        
        self._is_interrupted = True
        
        can_resume = self.resume_on_false_interrupt and self.speech_generation and self.speech_generation.can_pause()
        
        if can_resume:
            logger.info("Pausing TTS for potential resume")
            self._false_interrupt_paused_speech = True
            self._is_in_false_interrupt_pause = True
            if self.speech_generation:
                await self.speech_generation.pause()
            self._start_false_interrupt_timer()
        else:
            logger.info("Performing full interruption")
            await self._interrupt_pipeline()
    
    def _start_false_interrupt_timer(self):
        """Start timer to detect false interrupts"""
        if self._false_interrupt_timer:
            self._false_interrupt_timer.cancel()
        
        if self.false_interrupt_pause_duration is None:
            return
        
        logger.info(f"Starting false interrupt timer for {self.false_interrupt_pause_duration}s")
        loop = asyncio.get_event_loop()
        self._false_interrupt_timer = loop.call_later(
            self.false_interrupt_pause_duration,
            lambda: asyncio.create_task(self._on_false_interrupt_timeout())
        )
    
    def _cancel_false_interrupt_timer(self):
        """Cancel false interrupt timer"""
        if self._false_interrupt_timer:
            logger.info("Cancelling false interrupt timer")
            self._false_interrupt_timer.cancel()
            self._false_interrupt_timer = None
    
    async def _on_false_interrupt_timeout(self):
        """Handle false interrupt timeout"""
        logger.info(f"False interrupt timeout reached after {self.false_interrupt_pause_duration}s")
        self._false_interrupt_timer = None
        
        if self._is_user_speaking:
            logger.info("User still speaking - confirming real interruption")
            self._is_in_false_interrupt_pause = False
            self._false_interrupt_paused_speech = False
            await self._interrupt_pipeline()
            return
        
        if self._is_in_false_interrupt_pause and self.speech_generation and self.speech_generation.can_pause():
            logger.info("Resuming agent speech - false interruption detected")
            self._is_interrupted = False
            self._is_in_false_interrupt_pause = False
            self._false_interrupt_paused_speech = False
            await self.speech_generation.resume()
    
    async def _interrupt_pipeline(self) -> None:
        """Interrupt all components"""
        self.agent.session._emit_agent_state(AgentState.LISTENING)
        self._is_interrupted = True
        self._cancel_false_interrupt_timer()
        self._is_in_false_interrupt_pause = False
        self._false_interrupt_paused_speech = False
        metrics_collector.on_interrupted()

        if self.agent and self.agent.session and self.agent.session.current_utterance:
            if self.agent.session.current_utterance.is_interruptible:
                self.agent.session.current_utterance.interrupt()
        
        if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled:
            await self.agent.session.stop_thinking_audio()
        
        if self.speech_generation:
            await self.speech_generation.interrupt()
        
        if self.content_generation:
            await self.content_generation.cancel()

        if self._current_generation_task and not self._current_generation_task.done():
            self._current_generation_task.cancel()

        if self._partial_response and metrics_collector.current_turn:
            if not metrics_collector.current_turn.agent_speech:
                metrics_collector.set_agent_response(self._partial_response)

        if self._partial_response and self.agent:
            self.agent.chat_context.add_message(
                role=ChatRole.ASSISTANT,
                content=self._partial_response
            )

        if metrics_collector.current_turn:
            logger.info("[orchestrator] Completing interrupted turn")
            metrics_collector.complete_turn()

        self._partial_response = ""

    async def interrupt(self) -> None:
        """Public method to interrupt the pipeline"""
        await self._interrupt_pipeline()
    
    async def _cancel_preemptive_generation(self) -> None:
        """Cancel preemptive generation"""
        logger.info("Cancelling preemptive generation")
        self._preemptive_cancelled = True
        self._preemptive_authorized.set() 
        
        if self._preemptive_generation_task and not self._preemptive_generation_task.done():
            self._preemptive_generation_task.cancel()
            try:
                await self._preemptive_generation_task
            except asyncio.CancelledError:
                logger.info("Preemptive task cancelled successfully")

        self._preemptive_generation_task = None
        
        if self.content_generation:
            await self.content_generation.cancel()

        if self._current_generation_task and not self._current_generation_task.done():
            self._current_generation_task.cancel()     
             
        if self.speech_generation:
            await self.speech_generation.interrupt()
        
        if self.speech_understanding:
            self.speech_understanding.clear_preemptive_state()
        
        logger.info("Preemptive generation cancelled")
    
    async def _run_vmd_check(self) -> None:
        """Run voicemail detection check"""
        try:
            if not self.voice_mail_detector:
                return
            
            await asyncio.sleep(self.voice_mail_detector.duration)
            
            is_voicemail = await self.voice_mail_detector.detect(self._vmd_buffer.strip())
            self.voice_mail_detection_done = True
            
            if is_voicemail:
                await self._interrupt_pipeline()
            
            self.emit("voicemail_result", {"is_voicemail": is_voicemail})
        
        except Exception as e:
            logger.error(f"Error in VMD check: {e}")
            self.voice_mail_detection_done = True
            self.emit("voicemail_result", {"is_voicemail": False})
        
        finally:
            self._vmd_check_task = None
            self._vmd_buffer = ""
    
    async def cleanup(self) -> None:
        """Cleanup all components"""
        logger.info("Cleaning up pipeline orchestrator")
        
        if self._vmd_check_task and not self._vmd_check_task.done():
            self._vmd_check_task.cancel()
        
        if self._false_interrupt_timer:
            self._false_interrupt_timer.cancel()
        
        if self.speech_understanding:
            await self.speech_understanding.cleanup()
        
        if self.content_generation:
            await self.content_generation.cleanup()
        
        if self.speech_generation:
            await self.speech_generation.cleanup()
        
        self.agent = None
        self.avatar = None
        self.conversational_graph = None
        self.voice_mail_detector = None
        
        logger.info("Pipeline orchestrator cleaned up")

Orchestrates the execution of speech understanding, content generation, and speech generation components.

Supports various component chains: 1. Full Chain: VAD → STT → TurnD → LLM → TTS 2. No TTS: VAD → STT → TurnD → LLM (text output) 3. No STT: LLM → TTS (text input) 4. Hybrid: VAD → STT → [User Processing] → TTS

Events: - transcript_ready: Transcript is ready for processing - content_generated: LLM has generated content - synthesis_complete: TTS synthesis is complete - voicemail_result: Voicemail detection result - error: Error occurred in pipeline

Ancestors

Methods

async def cleanup(self) ‑> None
Expand source code
async def cleanup(self) -> None:
    """Cleanup all components"""
    logger.info("Cleaning up pipeline orchestrator")
    
    if self._vmd_check_task and not self._vmd_check_task.done():
        self._vmd_check_task.cancel()
    
    if self._false_interrupt_timer:
        self._false_interrupt_timer.cancel()
    
    if self.speech_understanding:
        await self.speech_understanding.cleanup()
    
    if self.content_generation:
        await self.content_generation.cleanup()
    
    if self.speech_generation:
        await self.speech_generation.cleanup()
    
    self.agent = None
    self.avatar = None
    self.conversational_graph = None
    self.voice_mail_detector = None
    
    logger.info("Pipeline orchestrator cleaned up")

Cleanup all components

def get_latest_transcript(self) ‑> str
Expand source code
def get_latest_transcript(self) -> str:
    """Get the latest accumulated transcript (for hybrid scenarios)"""
    if self.speech_understanding:
        return self.speech_understanding._accumulated_transcript
    return ""

Get the latest accumulated transcript (for hybrid scenarios)

async def handle_stt_event(self, text: str) ‑> bool
Expand source code
async def handle_stt_event(self, text: str) -> bool:
    """Handle STT event for interruption (word-based).
    
    Only processes interruptions when the agent is actively speaking or generating.
    Respects interrupt_min_words and interrupt_mode configuration.
    
    Returns:
        True if interruption was triggered, False otherwise.
    """
    if not text or not text.strip():
        return False
    
    # Only consider interruption when agent is actively speaking or thinking
    agent_state = self.agent.session.agent_state if self.agent and self.agent.session else None
    if agent_state not in (AgentState.SPEAKING, AgentState.THINKING):
        return False
    
    word_count = len(text.strip().split())
    
    # Debug: log exact state of current_utterance
    if self.agent and self.agent.session and self.agent.session.current_utterance:
        cu = self.agent.session.current_utterance
        logger.info(f"[orchestrator] handle_stt_event: word_count={word_count}, min_words={self.interrupt_min_words}, mode={self.interrupt_mode}, utterance_id={cu.id}, interruptible={cu.is_interruptible}, done={cu.done()}")
    else:
        logger.info(f"[orchestrator] handle_stt_event: word_count={word_count}, min_words={self.interrupt_min_words}, mode={self.interrupt_mode}, no current_utterance")
    
    if self.resume_on_false_interrupt and self._is_in_false_interrupt_pause and word_count >= self.interrupt_min_words:
        logger.info(f"STT transcript received while in paused state, confirming real interruption")
        self._cancel_false_interrupt_timer()
        self._is_in_false_interrupt_pause = False
        self._false_interrupt_paused_speech = False
        await self._interrupt_pipeline()
        return True
    
    if self.interrupt_mode in ("STT_ONLY", "HYBRID"):
        if word_count >= self.interrupt_min_words:
            if self.agent and self.agent.session and self.agent.session.current_utterance:
                if self.agent.session.current_utterance.is_interruptible:
                    logger.info(f"[orchestrator] Word count {word_count} >= min_words {self.interrupt_min_words}, triggering interruption")
                    await self._trigger_interruption()
                    return True
                else:
                    logger.info(f"[orchestrator] Word count threshold met but utterance is not interruptible")
        else:
            logger.info(f"[orchestrator] Word count {word_count} < min_words {self.interrupt_min_words}, ignoring")
    
    return False

Handle STT event for interruption (word-based).

Only processes interruptions when the agent is actively speaking or generating. Respects interrupt_min_words and interrupt_mode configuration.

Returns

True if interruption was triggered, False otherwise.

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Public method to interrupt the pipeline"""
    await self._interrupt_pipeline()

Public method to interrupt the pipeline

async def process_audio(self, audio_data: bytes) ‑> None
Expand source code
async def process_audio(self, audio_data: bytes) -> None:
    """
    Process incoming audio through the pipeline.
    
    Args:
        audio_data: Raw audio bytes
    """
    if self.speech_understanding:
        await self.speech_understanding.process_audio(audio_data)

Process incoming audio through the pipeline.

Args

audio_data
Raw audio bytes
async def process_text(self, text: str) ‑> None
Expand source code
async def process_text(self, text: str) -> None:
    """
    Process text input directly (bypasses STT).

    Args:
        text: User text input
    """
    if not self.agent:
        logger.warning("No agent available for text processing")
        return

    metrics_collector.start_turn()
    metrics_collector.set_user_transcript(text)

    self.agent.chat_context.add_message(
        role=ChatRole.USER,
        content=text
    )

    if not self.speech_understanding:
        metrics_collector.on_user_speech_start()
        metrics_collector.on_user_speech_end()
        
        if not self.speech_generation:
            metrics_collector.on_agent_speech_start()

    if self.content_generation:
        self.agent.session._emit_agent_state(AgentState.THINKING)
        full_response = ""
        async for response_chunk in self.content_generation.generate(text):
            if response_chunk.content:
                full_response += response_chunk.content

        if full_response:
            self.agent.chat_context.add_message(
                role=ChatRole.ASSISTANT,
                content=full_response
            )

            if self.hooks and self.hooks.has_llm_hooks():
                modified = await self.hooks.trigger_llm({"text": full_response})
                if modified is not None:
                    full_response = modified

            self.emit("content_generated", {"text": full_response})

            if self.speech_generation:
                await self.speech_generation.synthesize(full_response)
            else:
                # No TTS - complete the turn after LLM
                self.agent.session._emit_agent_state(AgentState.IDLE)
                self.agent.session._emit_user_state(UserState.IDLE)
                metrics_collector.on_agent_speech_end()
                metrics_collector.set_agent_response(full_response)
                metrics_collector.complete_turn()

            self.emit("synthesis_complete", {})

Process text input directly (bypasses STT).

Args

text
User text input
async def reply_with_context(self,
instructions: str,
wait_for_playback: bool,
handle: UtteranceHandle,
frames: list[av.VideoFrame] | None = None) ‑> None
Expand source code
async def reply_with_context(
    self,
    instructions: str,
    wait_for_playback: bool,
    handle: UtteranceHandle,
    frames: list[av.VideoFrame] | None = None
) -> None:
    """
    Generate a reply using instructions and current chat context.
    
    Args:
        instructions: Instructions to add to chat context
        wait_for_playback: If True, disable VAD/STT during response
        handle: Utterance handle
        frames: Optional video frames for vision
    """
    if not self.agent:
        handle._mark_done()
        return
    
    original_handlers = {}
    if wait_for_playback and self.speech_understanding:
        if self.speech_understanding.vad:
            original_handlers['vad'] = self.speech_understanding._on_vad_event
            self.speech_understanding._on_vad_event = lambda x: None
        
        if self.speech_understanding.stt:
            original_handlers['stt'] = self.speech_understanding._on_stt_transcript
            self.speech_understanding._on_stt_transcript = lambda x: None
    
    try:
        final_instructions = instructions
        if self.agent.knowledge_base:
            kb_context = await self.agent.knowledge_base.process_query(instructions)
            if kb_context:
                final_instructions = f"{kb_context}\n\nUser: {instructions}"
        
        content_parts = [final_instructions]
        if frames:
            for frame in frames:
                image_part = ImageContent(image=frame, inference_detail="auto")
                content_parts.append(image_part)
        
        self.agent.chat_context.add_message(
            role=ChatRole.USER,
            content=content_parts if len(content_parts) > 1 else final_instructions
        )
        
        await self._generate_and_synthesize(final_instructions, handle)
    
    finally:
        if wait_for_playback and self.speech_understanding:
            if 'vad' in original_handlers:
                self.speech_understanding._on_vad_event = original_handlers['vad']
            if 'stt' in original_handlers:
                self.speech_understanding._on_stt_transcript = original_handlers['stt']
        
        if not handle.done():
            handle._mark_done()

Generate a reply using instructions and current chat context.

Args

instructions
Instructions to add to chat context
wait_for_playback
If True, disable VAD/STT during response
handle
Utterance handle
frames
Optional video frames for vision
async def say(self, message: str, handle: UtteranceHandle) ‑> None
Expand source code
async def say(self, message: str, handle: UtteranceHandle) -> None:
    """
    Direct TTS synthesis (for initial messages).
    
    Args:
        message: Message to synthesize
        handle: Utterance handle to track
    """
    if self.speech_generation:
        try:
            metrics_collector.set_agent_response(message)
            await self.speech_generation.synthesize(message)
        finally:
            handle._mark_done()

Direct TTS synthesis (for initial messages).

Args

message
Message to synthesize
handle
Utterance handle to track
def set_audio_track(self, audio_track: Any) ‑> None
Expand source code
def set_audio_track(self, audio_track: Any) -> None:
    """Set audio track for TTS output"""
    if self.speech_generation:
        self.speech_generation.set_audio_track(audio_track)

Set audio track for TTS output

def set_voice_mail_detector(self, detector: VoiceMailDetector | None) ‑> None
Expand source code
def set_voice_mail_detector(self, detector: VoiceMailDetector | None) -> None:
    """Configure voicemail detection"""
    self.voice_mail_detector = detector
    self.voice_mail_detection_done = False
    self._vmd_buffer = ""

Configure voicemail detection

async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    """Start all components"""
    if self.speech_understanding:
        await self.speech_understanding.start()
    if self.content_generation:
        await self.content_generation.start()
    if self.speech_generation:
        await self.speech_generation.start()
    
    logger.info("PipelineOrchestrator started")

Start all components

Inherited members