Module agents.pipeline_utils

Helper functions for pipeline component management.

Functions

def check_mode_shift(pipeline, llm: Any, stt: Any, tts: Any) ‑> bool
Expand source code
def check_mode_shift(pipeline, llm: Any, stt: Any, tts: Any) -> bool:
    """Check if component changes trigger a mode shift."""
    if llm is not NO_CHANGE:
        is_new_llm_realtime = isinstance(llm, RealtimeBaseModel)
        if is_new_llm_realtime != pipeline._is_realtime_mode:
            return True
    if pipeline._is_realtime_mode:
        if stt is not NO_CHANGE and (pipeline.stt is None) != (stt is None):
            return True
        if tts is not NO_CHANGE and (pipeline.tts is None) != (tts is None):
            return True
    
    return False

Check if component changes trigger a mode shift.

async def cleanup_pipeline(pipeline, llm_changing: bool = False) ‑> None
Expand source code
async def cleanup_pipeline(pipeline, llm_changing: bool = False) -> None:
    """Close existing pipeline execution and all components."""
    if pipeline.orchestrator:
        logger.info("Closing existing orchestrator")
        await pipeline.orchestrator.cleanup()
        pipeline.orchestrator = None
    
    if pipeline.speech_generation:
        logger.info("Closing existing speech generation")
        await pipeline.speech_generation.cleanup()
        pipeline.speech_generation = None
    
    components = ['stt', 'tts', 'vad', 'turn_detector', 'denoise']
    if not llm_changing:
        components.append('llm')
    for attr in components:
        comp = getattr(pipeline, attr, None)
        if comp:
            logger.info(f"Closing component: {attr}")
            try:
                if hasattr(comp, 'aclose'):
                    await comp.aclose()
                elif hasattr(comp, 'cleanup'):
                    await comp.cleanup()
            except Exception as e:
                logger.warning(f"Error cleaning up component {attr}: {e}")
            setattr(pipeline, attr, None)

    if llm_changing and pipeline._realtime_model:
        logger.info("Closing previous realtime model")
        try:
            await pipeline._realtime_model.cleanup()
        except Exception as e:
            logger.warning(f"Error cleaning up realtime model: {e}")
        pipeline._realtime_model = None

Close existing pipeline execution and all components.

def register_stt_transcript_listener(stt: Any, container: Any) ‑> None
Expand source code
def register_stt_transcript_listener(stt: Any, container: Any) -> None:
    """Hook to register STT transcript listener after swap."""
    if stt is None:
        return
    if hasattr(stt, 'on_stt_transcript'):
        stt.on_stt_transcript(container._on_stt_transcript)

Hook to register STT transcript listener after swap.

async def swap_component_in_orchestrator(pipeline,
component_name: str,
new_value: Any,
orchestrator_attr: str,
lock_attr: str = None,
post_swap_hook: Callable[[Any, Any], None] = None) ‑> None
Expand source code
async def swap_component_in_orchestrator(
    pipeline,
    component_name: str,
    new_value: Any,
    orchestrator_attr: str,
    lock_attr: str = None,
    post_swap_hook: Callable[[Any, Any], None] = None
) -> None:
    """
    Generic component swap with orchestrator lock.
    
    Args:
        pipeline: Pipeline instance
        component_name: Name of the component attribute (e.g., 'stt', 'vad')
        new_value: New component instance
        orchestrator_attr: Orchestrator sub-component name (e.g., 'speech_understanding')
        lock_attr: Lock attribute name (e.g., 'stt_lock')
        post_swap_hook: Optional callback after swap: hook(new_value, container)
    """
    logger.info(f"Changing {component_name} component")
    if new_value is NO_CHANGE:
        return
    old_value = getattr(pipeline, component_name)
    
    if pipeline.orchestrator:
        container = getattr(pipeline.orchestrator, orchestrator_attr, None)
        if container:
            async def _do_swap():
                if old_value:
                    await old_value.aclose()
                setattr(pipeline, component_name, new_value)
                setattr(container, component_name, new_value)
                
                if post_swap_hook:
                    post_swap_hook(new_value, container)

            if lock_attr:
                async with getattr(container, lock_attr):
                    await _do_swap()
            else:
                await _do_swap()
            return
    
    # Fallback
    if old_value:
        await old_value.aclose()
    setattr(pipeline, component_name, new_value)

Generic component swap with orchestrator lock.

Args

pipeline
Pipeline instance
component_name
Name of the component attribute (e.g., 'stt', 'vad')
new_value
New component instance
orchestrator_attr
Orchestrator sub-component name (e.g., 'speech_understanding')
lock_attr
Lock attribute name (e.g., 'stt_lock')
post_swap_hook
Optional callback after swap: hook(new_value, container)
async def swap_llm(pipeline, new_llm: Any) ‑> None
Expand source code
async def swap_llm(pipeline, new_llm: Any) -> None:
    """
    Swap LLM component (handles realtime model logic).
    
    If new_llm is RealtimeBaseModel, wraps it in RealtimeLLMAdapter.
    """
    if new_llm is NO_CHANGE:
        return
    
    if pipeline.orchestrator and pipeline.orchestrator.content_generation:
        async with pipeline.orchestrator.content_generation.llm_lock:
            if pipeline.llm:
                await pipeline.llm.aclose()
            
            if isinstance(new_llm, RealtimeBaseModel):
                pipeline._realtime_model = new_llm
                pipeline.llm = RealtimeLLMAdapter(new_llm)
            else:
                pipeline._realtime_model = None
                pipeline.llm = new_llm
            
            pipeline.orchestrator.content_generation.llm = pipeline.llm
    else:
        # Direct swap
        if pipeline.llm:
            if pipeline._realtime_model:
                pipeline._realtime_model.audio_track = None
            await pipeline.llm.aclose()
        
        if isinstance(new_llm, RealtimeBaseModel):
            new_llm_wrapped = RealtimeLLMAdapter(new_llm)
            if pipeline.agent:
                new_llm_wrapped.set_agent(pipeline.agent)
            pipeline.llm = new_llm_wrapped
            pipeline._realtime_model = new_llm
        else:
            pipeline.llm = new_llm
            pipeline._realtime_model = None

Swap LLM component (handles realtime model logic).

If new_llm is RealtimeBaseModel, wraps it in RealtimeLLMAdapter.

async def swap_tts(pipeline, new_tts: Any) ‑> None
Expand source code
async def swap_tts(pipeline, new_tts: Any) -> None:
    """
    Swap TTS component (handles multiple possible locations).
    
    TTS can be in:
    - pipeline.speech_generation (Hybrid/Cascading mode)
    - pipeline.orchestrator.speech_generation
    - Direct attribute only
    """
    
    if new_tts is NO_CHANGE:
        return
    swap_done = False

    if pipeline.speech_generation:
        async with pipeline.speech_generation.tts_lock:
            if pipeline.tts:
                await pipeline.tts.aclose()
            pipeline.tts = new_tts
            pipeline.speech_generation.tts = new_tts
            pipeline._configure_components()
            swap_done = True

    if not swap_done and pipeline.orchestrator and pipeline.orchestrator.speech_generation:
        async with pipeline.orchestrator.speech_generation.tts_lock:
            if pipeline.tts:
                await pipeline.tts.aclose()
            pipeline.tts = new_tts
            pipeline.orchestrator.speech_generation.tts = new_tts
            pipeline._configure_components()
            swap_done = True
    
    # Fallback: direct swap only
    if not swap_done:
        if pipeline.tts:
            await pipeline.tts.aclose()
        pipeline.tts = new_tts
        pipeline._configure_components()

Swap TTS component (handles multiple possible locations).

TTS can be in: - pipeline.speech_generation (Hybrid/Cascading mode) - pipeline.orchestrator.speech_generation - Direct attribute only