Module agents.realtime_pipeline
Classes
class RealTimePipeline (model: RealtimeBaseModel,
avatar: Any | None = None,
denoise: Denoise | None = None)-
Expand source code
class RealTimePipeline(Pipeline, EventEmitter[Literal["realtime_start", "realtime_end","user_audio_input_data", "user_speech_started", "realtime_model_transcription"]]): """ RealTime pipeline implementation that processes data in real-time. Inherits from Pipeline base class and adds realtime-specific events. """ def __init__( self, model: RealtimeBaseModel, avatar: Any | None = None, denoise: Denoise | None = None, ) -> None: """ Initialize the realtime pipeline. Args: model: Instance of RealtimeBaseModel to process data config: Configuration dictionary with settings like: - response_modalities: List of enabled modalities - silence_threshold_ms: Silence threshold in milliseconds """ self.model = model self.model.audio_track = None self.agent = None self.avatar = avatar self.vision = False self.denoise = denoise self.background_audio: BackgroundAudioConfig | None = None self._background_audio_player: BackgroundAudio | None = None super().__init__() self.model.on("error", self.on_model_error) self.model.on("realtime_model_transcription", self.on_realtime_model_transcription) def set_agent(self, agent: Agent) -> None: self.agent = agent if hasattr(self.model, 'set_agent'): self.model.set_agent(agent) def _configure_components(self) -> None: """Configure pipeline components with the loop""" if self.loop: self.model.loop = self.loop job_context = get_current_job_context() if job_context and job_context.room: requested_vision = getattr(job_context.room, 'vision', False) self.vision = requested_vision model_name = self.model.__class__.__name__ if requested_vision and model_name != 'GeminiRealtime': logger.warning(f"Vision mode requested but {model_name} doesn't support video input. Only GeminiRealtime supports vision. Disabling vision.") self.vision = False if self.avatar: self.model.audio_track = getattr(job_context.room, 'agent_audio_track', None) or job_context.room.audio_track elif self.audio_track: self.model.audio_track = self.audio_track async def start(self, **kwargs: Any) -> None: """ Start the realtime pipeline processing. Overrides the abstract start method from Pipeline base class. Args: **kwargs: Additional arguments for pipeline configuration """ await self.model.connect() self.model.on("user_speech_started", self.on_user_speech_started) self.model.on("user_speech_ended", lambda data: asyncio.create_task(self.on_user_speech_ended(data))) self.model.on("agent_speech_started", lambda data: asyncio.create_task(self.on_agent_speech_started(data))) self.model.on("agent_speech_ended",{}) async def send_message(self, message: str) -> None: """ Send a message through the realtime model. Delegates to the model's send_message implementation. """ await self.model.send_message(message) async def send_text_message(self, message: str) -> None: """ Send a text message through the realtime model. This method specifically handles text-only input when modalities is ["text"]. """ if hasattr(self.model, 'send_text_message'): await self.model.send_text_message(message) else: await self.model.send_message(message) async def on_audio_delta(self, audio_data: bytes): """ Handle incoming audio data from the user """ if self.denoise: audio_data = await self.denoise.denoise(audio_data) await self.model.handle_audio_input(audio_data) async def on_video_delta(self, video_data: av.VideoFrame): """ Handle incoming video data from the user The model's handle_video_input is now expected to handle the av.VideoFrame. """ if self.vision and hasattr(self.model, 'handle_video_input'): await self.model.handle_video_input(video_data) def on_user_speech_started(self, data: dict) -> None: """ Handle user speech started event """ self._notify_speech_started() if self.agent.session: self.agent.session._emit_user_state(UserState.SPEAKING) self.agent.session._emit_agent_state(AgentState.LISTENING) def interrupt(self) -> None: """ Interrupt the realtime pipeline """ if self.model: asyncio.create_task(self.model.interrupt()) if self._background_audio_player: asyncio.create_task(self._background_audio_player.stop()) async def leave(self) -> None: """ Leave the realtime pipeline. """ if self.room is not None: await self.room.leave() def on_model_error(self, error: Exception): """ Handle errors emitted from the model and send to realtime metrics cascading_metrics_collector. """ error_data = {"message": str(error), "timestamp": time.time()} realtime_metrics_collector.set_realtime_model_error(error_data) logger.error(f"Realtime model error: {error_data}") def on_realtime_model_transcription(self, data: dict) -> None: """ Handle realtime model transcription event """ try: self.emit("realtime_model_transcription", data) except Exception: logger.error(f"Realtime model transcription: {data}") async def cleanup(self): """Cleanup resources""" logger.info("Cleaning up realtime pipeline") if hasattr(self, 'room') and self.room is not None: try: await self.room.leave() except Exception as e: logger.error(f"Error while leaving room during cleanup: {e}") try: if hasattr(self.room, 'cleanup'): await self.room.cleanup() except Exception as e: logger.error(f"Error while cleaning up room: {e}") self.room = None if hasattr(self, 'model') and self.model is not None: try: await self.model.aclose() except Exception as e: logger.error(f"Error while closing model during cleanup: {e}") self.model = None if self._background_audio_player: await self._stop_background_audio() if hasattr(self, 'avatar') and self.avatar is not None: try: if hasattr(self.avatar, 'cleanup'): await self.avatar.cleanup() elif hasattr(self.avatar, 'aclose'): await self.avatar.aclose() except Exception as e: logger.error(f"Error while cleaning up avatar: {e}") self.avatar = None if hasattr(self, 'denoise') and self.denoise is not None: try: await self.denoise.aclose() except Exception as e: logger.error(f"Error while cleaning up denoise: {e}") self.denoise = None self.agent = None self.vision = False self.model = None self.avatar = None self.denoise = None self.background_audio = None self._background_audio_player = None logger.info("Realtime pipeline cleaned up") await super().cleanup() async def _stop_background_audio(self): if self._background_audio_player: await self._background_audio_player.stop() self._background_audio_player = None async def on_user_speech_ended(self, data: dict) -> None: """ Handle agent turn started event """ if self.background_audio and self.model.audio_track: self._background_audio_player = BackgroundAudio(self.background_audio, self.model.audio_track) await self._background_audio_player.start() async def on_agent_speech_started(self, data: dict) -> None: """ Handle agent speech started event """ if self.background_audio: await self._stop_background_audio()RealTime pipeline implementation that processes data in real-time. Inherits from Pipeline base class and adds realtime-specific events.
Initialize the realtime pipeline.
Args
model- Instance of RealtimeBaseModel to process data
config- Configuration dictionary with settings like: - response_modalities: List of enabled modalities - silence_threshold_ms: Silence threshold in milliseconds
Ancestors
- Pipeline
- EventEmitter
- typing.Generic
- abc.ABC
Methods
async def cleanup(self)-
Expand source code
async def cleanup(self): """Cleanup resources""" logger.info("Cleaning up realtime pipeline") if hasattr(self, 'room') and self.room is not None: try: await self.room.leave() except Exception as e: logger.error(f"Error while leaving room during cleanup: {e}") try: if hasattr(self.room, 'cleanup'): await self.room.cleanup() except Exception as e: logger.error(f"Error while cleaning up room: {e}") self.room = None if hasattr(self, 'model') and self.model is not None: try: await self.model.aclose() except Exception as e: logger.error(f"Error while closing model during cleanup: {e}") self.model = None if self._background_audio_player: await self._stop_background_audio() if hasattr(self, 'avatar') and self.avatar is not None: try: if hasattr(self.avatar, 'cleanup'): await self.avatar.cleanup() elif hasattr(self.avatar, 'aclose'): await self.avatar.aclose() except Exception as e: logger.error(f"Error while cleaning up avatar: {e}") self.avatar = None if hasattr(self, 'denoise') and self.denoise is not None: try: await self.denoise.aclose() except Exception as e: logger.error(f"Error while cleaning up denoise: {e}") self.denoise = None self.agent = None self.vision = False self.model = None self.avatar = None self.denoise = None self.background_audio = None self._background_audio_player = None logger.info("Realtime pipeline cleaned up") await super().cleanup()Cleanup resources
def interrupt(self) ‑> None-
Expand source code
def interrupt(self) -> None: """ Interrupt the realtime pipeline """ if self.model: asyncio.create_task(self.model.interrupt()) if self._background_audio_player: asyncio.create_task(self._background_audio_player.stop())Interrupt the realtime pipeline
async def leave(self) ‑> None-
Expand source code
async def leave(self) -> None: """ Leave the realtime pipeline. """ if self.room is not None: await self.room.leave()Leave the realtime pipeline.
async def on_agent_speech_started(self, data: dict) ‑> None-
Expand source code
async def on_agent_speech_started(self, data: dict) -> None: """ Handle agent speech started event """ if self.background_audio: await self._stop_background_audio()Handle agent speech started event
def on_model_error(self, error: Exception)-
Expand source code
def on_model_error(self, error: Exception): """ Handle errors emitted from the model and send to realtime metrics cascading_metrics_collector. """ error_data = {"message": str(error), "timestamp": time.time()} realtime_metrics_collector.set_realtime_model_error(error_data) logger.error(f"Realtime model error: {error_data}")Handle errors emitted from the model and send to realtime metrics cascading_metrics_collector.
def on_realtime_model_transcription(self, data: dict) ‑> None-
Expand source code
def on_realtime_model_transcription(self, data: dict) -> None: """ Handle realtime model transcription event """ try: self.emit("realtime_model_transcription", data) except Exception: logger.error(f"Realtime model transcription: {data}")Handle realtime model transcription event
async def on_user_speech_ended(self, data: dict) ‑> None-
Expand source code
async def on_user_speech_ended(self, data: dict) -> None: """ Handle agent turn started event """ if self.background_audio and self.model.audio_track: self._background_audio_player = BackgroundAudio(self.background_audio, self.model.audio_track) await self._background_audio_player.start()Handle agent turn started event
def on_user_speech_started(self, data: dict) ‑> None-
Expand source code
def on_user_speech_started(self, data: dict) -> None: """ Handle user speech started event """ self._notify_speech_started() if self.agent.session: self.agent.session._emit_user_state(UserState.SPEAKING) self.agent.session._emit_agent_state(AgentState.LISTENING)Handle user speech started event
async def on_video_delta(self, video_data: av.VideoFrame)-
Expand source code
async def on_video_delta(self, video_data: av.VideoFrame): """ Handle incoming video data from the user The model's handle_video_input is now expected to handle the av.VideoFrame. """ if self.vision and hasattr(self.model, 'handle_video_input'): await self.model.handle_video_input(video_data)Handle incoming video data from the user The model's handle_video_input is now expected to handle the av.VideoFrame.
async def send_message(self, message: str) ‑> None-
Expand source code
async def send_message(self, message: str) -> None: """ Send a message through the realtime model. Delegates to the model's send_message implementation. """ await self.model.send_message(message)Send a message through the realtime model. Delegates to the model's send_message implementation.
async def send_text_message(self, message: str) ‑> None-
Expand source code
async def send_text_message(self, message: str) -> None: """ Send a text message through the realtime model. This method specifically handles text-only input when modalities is ["text"]. """ if hasattr(self.model, 'send_text_message'): await self.model.send_text_message(message) else: await self.model.send_message(message)Send a text message through the realtime model. This method specifically handles text-only input when modalities is ["text"].
def set_agent(self, agent: Agent) ‑> None-
Expand source code
def set_agent(self, agent: Agent) -> None: self.agent = agent if hasattr(self.model, 'set_agent'): self.model.set_agent(agent) async def start(self, **kwargs: Any) ‑> None-
Expand source code
async def start(self, **kwargs: Any) -> None: """ Start the realtime pipeline processing. Overrides the abstract start method from Pipeline base class. Args: **kwargs: Additional arguments for pipeline configuration """ await self.model.connect() self.model.on("user_speech_started", self.on_user_speech_started) self.model.on("user_speech_ended", lambda data: asyncio.create_task(self.on_user_speech_ended(data))) self.model.on("agent_speech_started", lambda data: asyncio.create_task(self.on_agent_speech_started(data))) self.model.on("agent_speech_ended",{})Start the realtime pipeline processing. Overrides the abstract start method from Pipeline base class.
Args
**kwargs- Additional arguments for pipeline configuration
Inherited members