Module agents.realtime_pipeline

Classes

class RealTimePipeline (model: RealtimeBaseModel,
avatar: Any | None = None,
denoise: Denoise | None = None)
Expand source code
class RealTimePipeline(Pipeline, EventEmitter[Literal["realtime_start", "realtime_end","user_audio_input_data", "user_speech_started", "realtime_model_transcription"]]):
    """
    RealTime pipeline implementation that processes data in real-time.
    Inherits from Pipeline base class and adds realtime-specific events.
    """
    
    def __init__(
        self,
        model: RealtimeBaseModel,
        avatar: Any | None = None,
        denoise: Denoise | None = None,
    ) -> None:
        """
        Initialize the realtime pipeline.
        
        Args:
            model: Instance of RealtimeBaseModel to process data
            config: Configuration dictionary with settings like:
                   - response_modalities: List of enabled modalities
                   - silence_threshold_ms: Silence threshold in milliseconds
        """
        self.model = model
        self.model.audio_track = None
        self.agent = None
        self.avatar = avatar
        self.vision = False
        self._vision_lock = asyncio.Lock()
        self.denoise = denoise
        super().__init__()
        self.model.on("error", self.on_model_error)
        self.model.on("realtime_model_transcription", self.on_realtime_model_transcription)
        self.model.on("agent_speech_ended", self._on_agent_speech_ended)
    
    def set_agent(self, agent: Agent) -> None:
        self.agent = agent
        if hasattr(self.model, 'set_agent'):
            self.model.set_agent(agent)

    def _configure_components(self) -> None:
        """Configure pipeline components with the loop"""
        if self.loop:
            self.model.loop = self.loop
            job_context = get_current_job_context()
            
            if job_context and job_context.room:
                requested_vision = getattr(job_context.room, 'vision', False)
                self.vision = requested_vision
                
                model_name = self.model.__class__.__name__
                if requested_vision and model_name != 'GeminiRealtime' and model_name != "OpenAIRealtime":
                    logger.warning(f"Vision mode requested but {model_name} doesn't support video input. Only GeminiRealtime supports vision. Disabling vision.")
                    self.vision = False
                
                if self.avatar:
                    self.model.audio_track = getattr(job_context.room, 'agent_audio_track', None) or job_context.room.audio_track
                elif self.audio_track:
                     self.model.audio_track = self.audio_track

    async def start(self, **kwargs: Any) -> None:
        """
        Start the realtime pipeline processing.
        Overrides the abstract start method from Pipeline base class.
        
        Args:
            **kwargs: Additional arguments for pipeline configuration
        """
        await self.model.connect()
        self.model.on("user_speech_started", self.on_user_speech_started)
        self.model.on("user_speech_ended", lambda data: asyncio.create_task(self.on_user_speech_ended(data)))
        self.model.on("agent_speech_started", lambda data: asyncio.create_task(self.on_agent_speech_started(data)))
        self.model.on("agent_speech_ended", self._on_agent_speech_ended)

    async def send_message(self, message: str, handle: UtteranceHandle) -> None:
        """
        Send a message through the realtime pipeline and track the utterance handle.
        """
        self._current_utterance_handle = handle
        try:
            await self.model.send_message(message)
        except Exception as e:
            logger.error(f"Error sending message: {e}")
            handle._mark_done()

    async def send_text_message(self, message: str) -> None:
        """
        Send a text message through the realtime model.
        This method specifically handles text-only input when modalities is ["text"].
        """
        if hasattr(self.model, 'send_text_message'):
            await self.model.send_text_message(message)
        else:
            await self.model.send_message(message)
    
    def _on_agent_speech_ended(self, data: dict) -> None:
        """
        Handle agent speech ended event and mark utterance as done, forwarding to agent if handler exists.
        """
        if self._current_utterance_handle and not self._current_utterance_handle.done():
            self._current_utterance_handle._mark_done()
        if self.agent and hasattr(self.agent, 'on_agent_speech_ended'):
            self.agent.on_agent_speech_ended(data)
    
    async def on_audio_delta(self, audio_data: bytes):
        """
        Handle incoming audio data from the user
        """
        if self.denoise:
            audio_data = await self.denoise.denoise(audio_data)
        await self.model.handle_audio_input(audio_data)

    async def on_video_delta(self, video_data: av.VideoFrame):
        """Handle incoming video data from the user"""
        if self._vision_lock.locked():
            logger.info("Vision lock is locked, skipping video data")
            return
            
        if self.vision:
            self._recent_frames.append(video_data)
            if len(self._recent_frames) > self._max_frames_buffer:
                self._recent_frames.pop(0)
            await self.model.handle_video_input(video_data)

    def on_user_speech_started(self, data: dict) -> None:
        """
        Handle user speech started event
        """
        self._notify_speech_started()
        # self.interrupt() # Not sure yet whether this affects utterance handling.
        if self.agent.session:
            self.agent.session._emit_user_state(UserState.SPEAKING)
            self.agent.session._emit_agent_state(AgentState.LISTENING)
            
    def interrupt(self) -> None:
        """
        Interrupt the realtime pipeline
        """
        if self.model:
            asyncio.create_task(self.model.interrupt())
        if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled:
            asyncio.create_task(self.agent.session.stop_thinking_audio())
        if self._current_utterance_handle and not self._current_utterance_handle.done():
            self._current_utterance_handle.interrupt()
        if self.avatar and hasattr(self.avatar, 'interrupt'):
            asyncio.create_task(self.avatar.interrupt())

    async def leave(self) -> None:
        """
        Leave the realtime pipeline.
        """
        if self.room is not None:
            await self.room.leave()

    def on_model_error(self, error: Exception):
        """
        Handle errors emitted from the model and send to realtime metrics cascading_metrics_collector.
        """
        error_data = {"message": str(error), "timestamp": time.time()}
        realtime_metrics_collector.set_realtime_model_error(error_data)
        logger.error(f"Realtime model error: {error_data}")

    def on_realtime_model_transcription(self, data: dict) -> None:
        """
        Handle realtime model transcription event
        """
        try:
            self.emit("realtime_model_transcription", data)
        except Exception:
            logger.error(f"Realtime model transcription: {data}")
    

    async def cleanup(self):
        """Cleanup resources"""
        logger.info("Cleaning up realtime pipeline")
        if hasattr(self, 'room') and self.room is not None:
            try:
                await self.room.leave()
            except Exception as e:
                logger.error(f"Error while leaving room during cleanup: {e}")
            try:
                if hasattr(self.room, 'cleanup'):
                    await self.room.cleanup()
            except Exception as e:
                logger.error(f"Error while cleaning up room: {e}")
            self.room = None
        
        if hasattr(self, 'model') and self.model is not None:
            try:
                await self.model.aclose()
            except Exception as e:
                logger.error(f"Error while closing model during cleanup: {e}")
            self.model = None
        
        if self._current_utterance_handle:
            self._current_utterance_handle.interrupt()
            self._current_utterance_handle = None
        
        if hasattr(self, 'avatar') and self.avatar is not None:
            try:
                if hasattr(self.avatar, 'cleanup'):
                    await self.avatar.cleanup()
                elif hasattr(self.avatar, 'aclose'):
                    await self.avatar.aclose()
            except Exception as e:
                logger.error(f"Error while cleaning up avatar: {e}")
            self.avatar = None
        
        if hasattr(self, 'denoise') and self.denoise is not None:
            try:
                await self.denoise.aclose()
            except Exception as e:
                logger.error(f"Error while cleaning up denoise: {e}")
            self.denoise = None
        
        self.agent = None
        self.vision = False
        self.model = None
        self.avatar = None
        self.denoise = None
        self._current_utterance_handle = None
        
        logger.info("Realtime pipeline cleaned up")
        await super().cleanup()

    async def on_user_speech_ended(self, data: dict) -> None:
        """
        Handle agent turn started event
        """
        if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled:
            await self.agent.session.start_thinking_audio()

    async def on_agent_speech_started(self, data: dict) -> None:
        """
        Handle agent speech started event
        """
        if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled:
            await self.agent.session.stop_thinking_audio()

    async def reply_with_context(self, instructions: str, wait_for_playback: bool, handle: UtteranceHandle, frames: list[av.VideoFrame] | None = None) -> None:
        """
        Generate a reply using instructions and optional frames.
        
        Args:
            instructions: Instructions/text to send to the model
            wait_for_playback: If True, wait for playback to complete (for realtime, this is handled by the model)
            handle: UtteranceHandle to track the utterance
            frames: Optional list of VideoFrame objects to include in the reply
        """
        self._current_utterance_handle = handle
        
        if frames and hasattr(self.model, 'send_message_with_frames'):
            async with self._vision_lock:
                await self.model.send_message_with_frames(instructions, frames)
        elif hasattr(self.model, 'send_text_message'):
            await self.model.send_text_message(instructions)
        else:
            await self.model.send_message(instructions)

RealTime pipeline implementation that processes data in real-time. Inherits from Pipeline base class and adds realtime-specific events.

Initialize the realtime pipeline.

Args

model
Instance of RealtimeBaseModel to process data
config
Configuration dictionary with settings like: - response_modalities: List of enabled modalities - silence_threshold_ms: Silence threshold in milliseconds

Ancestors

Methods

async def cleanup(self)
Expand source code
async def cleanup(self):
    """Cleanup resources"""
    logger.info("Cleaning up realtime pipeline")
    if hasattr(self, 'room') and self.room is not None:
        try:
            await self.room.leave()
        except Exception as e:
            logger.error(f"Error while leaving room during cleanup: {e}")
        try:
            if hasattr(self.room, 'cleanup'):
                await self.room.cleanup()
        except Exception as e:
            logger.error(f"Error while cleaning up room: {e}")
        self.room = None
    
    if hasattr(self, 'model') and self.model is not None:
        try:
            await self.model.aclose()
        except Exception as e:
            logger.error(f"Error while closing model during cleanup: {e}")
        self.model = None
    
    if self._current_utterance_handle:
        self._current_utterance_handle.interrupt()
        self._current_utterance_handle = None
    
    if hasattr(self, 'avatar') and self.avatar is not None:
        try:
            if hasattr(self.avatar, 'cleanup'):
                await self.avatar.cleanup()
            elif hasattr(self.avatar, 'aclose'):
                await self.avatar.aclose()
        except Exception as e:
            logger.error(f"Error while cleaning up avatar: {e}")
        self.avatar = None
    
    if hasattr(self, 'denoise') and self.denoise is not None:
        try:
            await self.denoise.aclose()
        except Exception as e:
            logger.error(f"Error while cleaning up denoise: {e}")
        self.denoise = None
    
    self.agent = None
    self.vision = False
    self.model = None
    self.avatar = None
    self.denoise = None
    self._current_utterance_handle = None
    
    logger.info("Realtime pipeline cleaned up")
    await super().cleanup()

Cleanup resources

def interrupt(self) ‑> None
Expand source code
def interrupt(self) -> None:
    """
    Interrupt the realtime pipeline
    """
    if self.model:
        asyncio.create_task(self.model.interrupt())
    if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled:
        asyncio.create_task(self.agent.session.stop_thinking_audio())
    if self._current_utterance_handle and not self._current_utterance_handle.done():
        self._current_utterance_handle.interrupt()
    if self.avatar and hasattr(self.avatar, 'interrupt'):
        asyncio.create_task(self.avatar.interrupt())

Interrupt the realtime pipeline

async def leave(self) ‑> None
Expand source code
async def leave(self) -> None:
    """
    Leave the realtime pipeline.
    """
    if self.room is not None:
        await self.room.leave()

Leave the realtime pipeline.

async def on_agent_speech_started(self, data: dict) ‑> None
Expand source code
async def on_agent_speech_started(self, data: dict) -> None:
    """
    Handle agent speech started event
    """
    if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled:
        await self.agent.session.stop_thinking_audio()

Handle agent speech started event

def on_model_error(self, error: Exception)
Expand source code
def on_model_error(self, error: Exception):
    """
    Handle errors emitted from the model and send to realtime metrics cascading_metrics_collector.
    """
    error_data = {"message": str(error), "timestamp": time.time()}
    realtime_metrics_collector.set_realtime_model_error(error_data)
    logger.error(f"Realtime model error: {error_data}")

Handle errors emitted from the model and send to realtime metrics cascading_metrics_collector.

def on_realtime_model_transcription(self, data: dict) ‑> None
Expand source code
def on_realtime_model_transcription(self, data: dict) -> None:
    """
    Handle realtime model transcription event
    """
    try:
        self.emit("realtime_model_transcription", data)
    except Exception:
        logger.error(f"Realtime model transcription: {data}")

Handle realtime model transcription event

async def on_user_speech_ended(self, data: dict) ‑> None
Expand source code
async def on_user_speech_ended(self, data: dict) -> None:
    """
    Handle agent turn started event
    """
    if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled:
        await self.agent.session.start_thinking_audio()

Handle agent turn started event

def on_user_speech_started(self, data: dict) ‑> None
Expand source code
def on_user_speech_started(self, data: dict) -> None:
    """
    Handle user speech started event
    """
    self._notify_speech_started()
    # self.interrupt() # Not sure yet whether this affects utterance handling.
    if self.agent.session:
        self.agent.session._emit_user_state(UserState.SPEAKING)
        self.agent.session._emit_agent_state(AgentState.LISTENING)

Handle user speech started event

async def reply_with_context(self,
instructions: str,
wait_for_playback: bool,
handle: UtteranceHandle,
frames: list[av.VideoFrame] | None = None) ‑> None
Expand source code
async def reply_with_context(self, instructions: str, wait_for_playback: bool, handle: UtteranceHandle, frames: list[av.VideoFrame] | None = None) -> None:
    """
    Generate a reply using instructions and optional frames.
    
    Args:
        instructions: Instructions/text to send to the model
        wait_for_playback: If True, wait for playback to complete (for realtime, this is handled by the model)
        handle: UtteranceHandle to track the utterance
        frames: Optional list of VideoFrame objects to include in the reply
    """
    self._current_utterance_handle = handle
    
    if frames and hasattr(self.model, 'send_message_with_frames'):
        async with self._vision_lock:
            await self.model.send_message_with_frames(instructions, frames)
    elif hasattr(self.model, 'send_text_message'):
        await self.model.send_text_message(instructions)
    else:
        await self.model.send_message(instructions)

Generate a reply using instructions and optional frames.

Args

instructions
Instructions/text to send to the model
wait_for_playback
If True, wait for playback to complete (for realtime, this is handled by the model)
handle
UtteranceHandle to track the utterance
frames
Optional list of VideoFrame objects to include in the reply
async def send_message(self, message: str, handle: UtteranceHandle) ‑> None
Expand source code
async def send_message(self, message: str, handle: UtteranceHandle) -> None:
    """
    Send a message through the realtime pipeline and track the utterance handle.
    """
    self._current_utterance_handle = handle
    try:
        await self.model.send_message(message)
    except Exception as e:
        logger.error(f"Error sending message: {e}")
        handle._mark_done()

Send a message through the realtime pipeline and track the utterance handle.

async def send_text_message(self, message: str) ‑> None
Expand source code
async def send_text_message(self, message: str) -> None:
    """
    Send a text message through the realtime model.
    This method specifically handles text-only input when modalities is ["text"].
    """
    if hasattr(self.model, 'send_text_message'):
        await self.model.send_text_message(message)
    else:
        await self.model.send_message(message)

Send a text message through the realtime model. This method specifically handles text-only input when modalities is ["text"].

def set_agent(self, agent: Agent) ‑> None
Expand source code
def set_agent(self, agent: Agent) -> None:
    self.agent = agent
    if hasattr(self.model, 'set_agent'):
        self.model.set_agent(agent)
async def start(self, **kwargs: Any) ‑> None
Expand source code
async def start(self, **kwargs: Any) -> None:
    """
    Start the realtime pipeline processing.
    Overrides the abstract start method from Pipeline base class.
    
    Args:
        **kwargs: Additional arguments for pipeline configuration
    """
    await self.model.connect()
    self.model.on("user_speech_started", self.on_user_speech_started)
    self.model.on("user_speech_ended", lambda data: asyncio.create_task(self.on_user_speech_ended(data)))
    self.model.on("agent_speech_started", lambda data: asyncio.create_task(self.on_agent_speech_started(data)))
    self.model.on("agent_speech_ended", self._on_agent_speech_ended)

Start the realtime pipeline processing. Overrides the abstract start method from Pipeline base class.

Args

**kwargs
Additional arguments for pipeline configuration

Inherited members