Module agents.realtime_pipeline
Classes
class RealTimePipeline (model: RealtimeBaseModel,
avatar: Any | None = None,
denoise: Denoise | None = None)-
Expand source code
class RealTimePipeline(Pipeline, EventEmitter[Literal["realtime_start", "realtime_end","user_audio_input_data", "user_speech_started", "realtime_model_transcription"]]): """ RealTime pipeline implementation that processes data in real-time. Inherits from Pipeline base class and adds realtime-specific events. """ def __init__( self, model: RealtimeBaseModel, avatar: Any | None = None, denoise: Denoise | None = None, ) -> None: """ Initialize the realtime pipeline. Args: model: Instance of RealtimeBaseModel to process data config: Configuration dictionary with settings like: - response_modalities: List of enabled modalities - silence_threshold_ms: Silence threshold in milliseconds """ self.model = model self.model.audio_track = None self.agent = None self.avatar = avatar self.vision = False self._vision_lock = asyncio.Lock() self.denoise = denoise super().__init__() self.model.on("error", self.on_model_error) self.model.on("realtime_model_transcription", self.on_realtime_model_transcription) self.model.on("agent_speech_ended", self._on_agent_speech_ended) def set_agent(self, agent: Agent) -> None: self.agent = agent if hasattr(self.model, 'set_agent'): self.model.set_agent(agent) def _configure_components(self) -> None: """Configure pipeline components with the loop""" if self.loop: self.model.loop = self.loop job_context = get_current_job_context() if job_context and job_context.room: requested_vision = getattr(job_context.room, 'vision', False) self.vision = requested_vision model_name = self.model.__class__.__name__ if requested_vision and model_name != 'GeminiRealtime' and model_name != "OpenAIRealtime": logger.warning(f"Vision mode requested but {model_name} doesn't support video input. Only GeminiRealtime supports vision. Disabling vision.") self.vision = False if self.avatar: self.model.audio_track = getattr(job_context.room, 'agent_audio_track', None) or job_context.room.audio_track elif self.audio_track: self.model.audio_track = self.audio_track async def start(self, **kwargs: Any) -> None: """ Start the realtime pipeline processing. Overrides the abstract start method from Pipeline base class. Args: **kwargs: Additional arguments for pipeline configuration """ await self.model.connect() self.model.on("user_speech_started", self.on_user_speech_started) self.model.on("user_speech_ended", lambda data: asyncio.create_task(self.on_user_speech_ended(data))) self.model.on("agent_speech_started", lambda data: asyncio.create_task(self.on_agent_speech_started(data))) self.model.on("agent_speech_ended", self._on_agent_speech_ended) async def send_message(self, message: str, handle: UtteranceHandle) -> None: """ Send a message through the realtime pipeline and track the utterance handle. """ self._current_utterance_handle = handle try: await self.model.send_message(message) except Exception as e: logger.error(f"Error sending message: {e}") handle._mark_done() async def send_text_message(self, message: str) -> None: """ Send a text message through the realtime model. This method specifically handles text-only input when modalities is ["text"]. """ if hasattr(self.model, 'send_text_message'): await self.model.send_text_message(message) else: await self.model.send_message(message) def _on_agent_speech_ended(self, data: dict) -> None: """ Handle agent speech ended event and mark utterance as done, forwarding to agent if handler exists. """ if self._current_utterance_handle and not self._current_utterance_handle.done(): self._current_utterance_handle._mark_done() if self.agent and hasattr(self.agent, 'on_agent_speech_ended'): self.agent.on_agent_speech_ended(data) async def on_audio_delta(self, audio_data: bytes): """ Handle incoming audio data from the user """ if self.denoise: audio_data = await self.denoise.denoise(audio_data) await self.model.handle_audio_input(audio_data) async def on_video_delta(self, video_data: av.VideoFrame): """Handle incoming video data from the user""" if self._vision_lock.locked(): logger.info("Vision lock is locked, skipping video data") return if self.vision: self._recent_frames.append(video_data) if len(self._recent_frames) > self._max_frames_buffer: self._recent_frames.pop(0) await self.model.handle_video_input(video_data) def on_user_speech_started(self, data: dict) -> None: """ Handle user speech started event """ self._notify_speech_started() # self.interrupt() # Not sure yet whether this affects utterance handling. if self.agent.session: self.agent.session._emit_user_state(UserState.SPEAKING) self.agent.session._emit_agent_state(AgentState.LISTENING) def interrupt(self) -> None: """ Interrupt the realtime pipeline """ if self.model: asyncio.create_task(self.model.interrupt()) if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: asyncio.create_task(self.agent.session.stop_thinking_audio()) if self._current_utterance_handle and not self._current_utterance_handle.done(): self._current_utterance_handle.interrupt() if self.avatar and hasattr(self.avatar, 'interrupt'): asyncio.create_task(self.avatar.interrupt()) async def leave(self) -> None: """ Leave the realtime pipeline. """ if self.room is not None: await self.room.leave() def on_model_error(self, error: Exception): """ Handle errors emitted from the model and send to realtime metrics cascading_metrics_collector. """ error_data = {"message": str(error), "timestamp": time.time()} realtime_metrics_collector.set_realtime_model_error(error_data) logger.error(f"Realtime model error: {error_data}") def on_realtime_model_transcription(self, data: dict) -> None: """ Handle realtime model transcription event """ try: self.emit("realtime_model_transcription", data) except Exception: logger.error(f"Realtime model transcription: {data}") async def cleanup(self): """Cleanup resources""" logger.info("Cleaning up realtime pipeline") if hasattr(self, 'room') and self.room is not None: try: await self.room.leave() except Exception as e: logger.error(f"Error while leaving room during cleanup: {e}") try: if hasattr(self.room, 'cleanup'): await self.room.cleanup() except Exception as e: logger.error(f"Error while cleaning up room: {e}") self.room = None if hasattr(self, 'model') and self.model is not None: try: await self.model.aclose() except Exception as e: logger.error(f"Error while closing model during cleanup: {e}") self.model = None if self._current_utterance_handle: self._current_utterance_handle.interrupt() self._current_utterance_handle = None if hasattr(self, 'avatar') and self.avatar is not None: try: if hasattr(self.avatar, 'cleanup'): await self.avatar.cleanup() elif hasattr(self.avatar, 'aclose'): await self.avatar.aclose() except Exception as e: logger.error(f"Error while cleaning up avatar: {e}") self.avatar = None if hasattr(self, 'denoise') and self.denoise is not None: try: await self.denoise.aclose() except Exception as e: logger.error(f"Error while cleaning up denoise: {e}") self.denoise = None self.agent = None self.vision = False self.model = None self.avatar = None self.denoise = None self._current_utterance_handle = None logger.info("Realtime pipeline cleaned up") await super().cleanup() async def on_user_speech_ended(self, data: dict) -> None: """ Handle agent turn started event """ if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.start_thinking_audio() async def on_agent_speech_started(self, data: dict) -> None: """ Handle agent speech started event """ if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() async def reply_with_context(self, instructions: str, wait_for_playback: bool, handle: UtteranceHandle, frames: list[av.VideoFrame] | None = None) -> None: """ Generate a reply using instructions and optional frames. Args: instructions: Instructions/text to send to the model wait_for_playback: If True, wait for playback to complete (for realtime, this is handled by the model) handle: UtteranceHandle to track the utterance frames: Optional list of VideoFrame objects to include in the reply """ self._current_utterance_handle = handle if frames and hasattr(self.model, 'send_message_with_frames'): async with self._vision_lock: await self.model.send_message_with_frames(instructions, frames) elif hasattr(self.model, 'send_text_message'): await self.model.send_text_message(instructions) else: await self.model.send_message(instructions)RealTime pipeline implementation that processes data in real-time. Inherits from Pipeline base class and adds realtime-specific events.
Initialize the realtime pipeline.
Args
model- Instance of RealtimeBaseModel to process data
config- Configuration dictionary with settings like: - response_modalities: List of enabled modalities - silence_threshold_ms: Silence threshold in milliseconds
Ancestors
- Pipeline
- EventEmitter
- typing.Generic
- abc.ABC
Methods
async def cleanup(self)-
Expand source code
async def cleanup(self): """Cleanup resources""" logger.info("Cleaning up realtime pipeline") if hasattr(self, 'room') and self.room is not None: try: await self.room.leave() except Exception as e: logger.error(f"Error while leaving room during cleanup: {e}") try: if hasattr(self.room, 'cleanup'): await self.room.cleanup() except Exception as e: logger.error(f"Error while cleaning up room: {e}") self.room = None if hasattr(self, 'model') and self.model is not None: try: await self.model.aclose() except Exception as e: logger.error(f"Error while closing model during cleanup: {e}") self.model = None if self._current_utterance_handle: self._current_utterance_handle.interrupt() self._current_utterance_handle = None if hasattr(self, 'avatar') and self.avatar is not None: try: if hasattr(self.avatar, 'cleanup'): await self.avatar.cleanup() elif hasattr(self.avatar, 'aclose'): await self.avatar.aclose() except Exception as e: logger.error(f"Error while cleaning up avatar: {e}") self.avatar = None if hasattr(self, 'denoise') and self.denoise is not None: try: await self.denoise.aclose() except Exception as e: logger.error(f"Error while cleaning up denoise: {e}") self.denoise = None self.agent = None self.vision = False self.model = None self.avatar = None self.denoise = None self._current_utterance_handle = None logger.info("Realtime pipeline cleaned up") await super().cleanup()Cleanup resources
def interrupt(self) ‑> None-
Expand source code
def interrupt(self) -> None: """ Interrupt the realtime pipeline """ if self.model: asyncio.create_task(self.model.interrupt()) if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: asyncio.create_task(self.agent.session.stop_thinking_audio()) if self._current_utterance_handle and not self._current_utterance_handle.done(): self._current_utterance_handle.interrupt() if self.avatar and hasattr(self.avatar, 'interrupt'): asyncio.create_task(self.avatar.interrupt())Interrupt the realtime pipeline
async def leave(self) ‑> None-
Expand source code
async def leave(self) -> None: """ Leave the realtime pipeline. """ if self.room is not None: await self.room.leave()Leave the realtime pipeline.
async def on_agent_speech_started(self, data: dict) ‑> None-
Expand source code
async def on_agent_speech_started(self, data: dict) -> None: """ Handle agent speech started event """ if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio()Handle agent speech started event
def on_model_error(self, error: Exception)-
Expand source code
def on_model_error(self, error: Exception): """ Handle errors emitted from the model and send to realtime metrics cascading_metrics_collector. """ error_data = {"message": str(error), "timestamp": time.time()} realtime_metrics_collector.set_realtime_model_error(error_data) logger.error(f"Realtime model error: {error_data}")Handle errors emitted from the model and send to realtime metrics cascading_metrics_collector.
def on_realtime_model_transcription(self, data: dict) ‑> None-
Expand source code
def on_realtime_model_transcription(self, data: dict) -> None: """ Handle realtime model transcription event """ try: self.emit("realtime_model_transcription", data) except Exception: logger.error(f"Realtime model transcription: {data}")Handle realtime model transcription event
async def on_user_speech_ended(self, data: dict) ‑> None-
Expand source code
async def on_user_speech_ended(self, data: dict) -> None: """ Handle agent turn started event """ if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.start_thinking_audio()Handle agent turn started event
def on_user_speech_started(self, data: dict) ‑> None-
Expand source code
def on_user_speech_started(self, data: dict) -> None: """ Handle user speech started event """ self._notify_speech_started() # self.interrupt() # Not sure yet whether this affects utterance handling. if self.agent.session: self.agent.session._emit_user_state(UserState.SPEAKING) self.agent.session._emit_agent_state(AgentState.LISTENING)Handle user speech started event
async def reply_with_context(self,
instructions: str,
wait_for_playback: bool,
handle: UtteranceHandle,
frames: list[av.VideoFrame] | None = None) ‑> None-
Expand source code
async def reply_with_context(self, instructions: str, wait_for_playback: bool, handle: UtteranceHandle, frames: list[av.VideoFrame] | None = None) -> None: """ Generate a reply using instructions and optional frames. Args: instructions: Instructions/text to send to the model wait_for_playback: If True, wait for playback to complete (for realtime, this is handled by the model) handle: UtteranceHandle to track the utterance frames: Optional list of VideoFrame objects to include in the reply """ self._current_utterance_handle = handle if frames and hasattr(self.model, 'send_message_with_frames'): async with self._vision_lock: await self.model.send_message_with_frames(instructions, frames) elif hasattr(self.model, 'send_text_message'): await self.model.send_text_message(instructions) else: await self.model.send_message(instructions)Generate a reply using instructions and optional frames.
Args
instructions- Instructions/text to send to the model
wait_for_playback- If True, wait for playback to complete (for realtime, this is handled by the model)
handle- UtteranceHandle to track the utterance
frames- Optional list of VideoFrame objects to include in the reply
async def send_message(self, message: str, handle: UtteranceHandle) ‑> None-
Expand source code
async def send_message(self, message: str, handle: UtteranceHandle) -> None: """ Send a message through the realtime pipeline and track the utterance handle. """ self._current_utterance_handle = handle try: await self.model.send_message(message) except Exception as e: logger.error(f"Error sending message: {e}") handle._mark_done()Send a message through the realtime pipeline and track the utterance handle.
async def send_text_message(self, message: str) ‑> None-
Expand source code
async def send_text_message(self, message: str) -> None: """ Send a text message through the realtime model. This method specifically handles text-only input when modalities is ["text"]. """ if hasattr(self.model, 'send_text_message'): await self.model.send_text_message(message) else: await self.model.send_message(message)Send a text message through the realtime model. This method specifically handles text-only input when modalities is ["text"].
def set_agent(self, agent: Agent) ‑> None-
Expand source code
def set_agent(self, agent: Agent) -> None: self.agent = agent if hasattr(self.model, 'set_agent'): self.model.set_agent(agent) async def start(self, **kwargs: Any) ‑> None-
Expand source code
async def start(self, **kwargs: Any) -> None: """ Start the realtime pipeline processing. Overrides the abstract start method from Pipeline base class. Args: **kwargs: Additional arguments for pipeline configuration """ await self.model.connect() self.model.on("user_speech_started", self.on_user_speech_started) self.model.on("user_speech_ended", lambda data: asyncio.create_task(self.on_user_speech_ended(data))) self.model.on("agent_speech_started", lambda data: asyncio.create_task(self.on_agent_speech_started(data))) self.model.on("agent_speech_ended", self._on_agent_speech_ended)Start the realtime pipeline processing. Overrides the abstract start method from Pipeline base class.
Args
**kwargs- Additional arguments for pipeline configuration
Inherited members