Module agents.realtime_llm_adapter
Classes
class RealtimeLLMAdapter (realtime_model: RealtimeBaseModel, agent: Agent | None = None)-
Expand source code
class RealtimeLLMAdapter(EventEmitter): """ Wraps a RealtimeBaseModel to expose an LLM-compatible interface. This allows realtime models (like OpenAI Realtime API, Gemini Live) to be used in place of standard LLM components in the pipeline architecture. Key differences from standard LLMs: - Realtime models handle their own audio I/O (STT + TTS built-in) - They maintain their own conversation state - Function calling may work differently This wrapper primarily delegates to the underlying realtime model and provides adapter methods to make it look like an LLM from the pipeline's perspective. """ def __init__(self, realtime_model: RealtimeBaseModel, agent: Agent | None = None): super().__init__() self.realtime_model = realtime_model self.agent = agent self._is_realtime = True self.audio_track = None self.loop = None self.realtime_model.on("error", lambda error: self.emit("error", error)) self.realtime_model.on("user_speech_started", lambda data: self.emit("user_speech_started", data)) self.realtime_model.on("user_speech_ended", lambda data: self.emit("user_speech_ended", data)) self.realtime_model.on("agent_speech_started", lambda data: self.emit("agent_speech_started", data)) self.realtime_model.on("agent_speech_ended", lambda data: self.emit("agent_speech_ended", data)) self.realtime_model.on("realtime_model_transcription", lambda data: self.emit("realtime_model_transcription", data)) self.realtime_model.on("llm_text_output", lambda data: self.emit("llm_text_output", data)) def set_agent(self, agent: Agent) -> None: """Set the agent for this wrapper""" self.agent = agent if hasattr(self.realtime_model, 'set_agent'): self.realtime_model.set_agent(agent) async def connect(self) -> None: """Connect the realtime model""" await self.realtime_model.connect() async def chat( self, context: ChatContext, tools: list[Any] | None = None, conversational_graph: Any | None = None, **kwargs ) -> AsyncIterator[ResponseChunk]: """ Adapter method for LLM compatibility. For realtime models, the chat method is less relevant since they handle audio I/O directly. This method exists for interface compatibility but yields minimal content since the actual response happens through audio. Args: context: Chat context (may be ignored by realtime models) tools: Available function tools conversational_graph: Optional conversational graph **kwargs: Additional arguments Yields: ResponseChunk objects (mostly empty for realtime models) """ logger.info("RealtimeLLMAdapter.chat() called - realtime models handle I/O directly") async def empty_gen(): yield ResponseChunk(content="", metadata={"realtime_mode": True}, role=ChatRole.ASSISTANT) async for chunk in empty_gen(): yield chunk async def handle_audio_input(self, audio_data: bytes) -> None: """ Process incoming audio through the realtime model. Args: audio_data: Raw audio bytes """ await self.realtime_model.handle_audio_input(audio_data) async def handle_video_input(self, video_frame: Any) -> None: """ Process incoming video through the realtime model (if supported). Args: video_frame: Video frame data """ if hasattr(self.realtime_model, 'handle_video_input'): await self.realtime_model.handle_video_input(video_frame) else: logger.warning(f"Realtime model {type(self.realtime_model).__name__} does not support video input") async def send_message(self, message: str) -> None: """ Send a text message to the realtime model. Args: message: Text message to send """ await self.realtime_model.send_message(message) async def send_text_message(self, message: str) -> None: """ Send a text-only message (for models supporting text modality). Args: message: Text message to send """ if hasattr(self.realtime_model, 'send_text_message'): await self.realtime_model.send_text_message(message) else: await self.realtime_model.send_message(message) async def send_message_with_frames(self, message: str, frames: list[Any]) -> None: """ Send a message with video frames (for vision-enabled models). Args: message: Text message frames: List of video frames """ if hasattr(self.realtime_model, 'send_message_with_frames'): await self.realtime_model.send_message_with_frames(message, frames) else: logger.warning(f"Realtime model {type(self.realtime_model).__name__} does not support frames") await self.send_message(message) async def interrupt(self) -> None: """Interrupt the realtime model's current response""" if hasattr(self.realtime_model, 'interrupt'): await self.realtime_model.interrupt() async def cancel_current_generation(self) -> None: """Cancel the current generation (LLM compatibility method)""" await self.interrupt() def on_user_speech_started(self, callback) -> None: """Register callback for user speech started event""" self.realtime_model.on("user_speech_started", callback) def on_user_speech_ended(self, callback) -> None: """Register callback for user speech ended event""" self.realtime_model.on("user_speech_ended", callback) def on_agent_speech_started(self, callback) -> None: """Register callback for agent speech started event""" self.realtime_model.on("agent_speech_started", callback) def on_agent_speech_ended(self, callback) -> None: """Register callback for agent speech ended event""" self.realtime_model.on("agent_speech_ended", callback) def on_transcription(self, callback) -> None: """Register callback for transcription events""" self.realtime_model.on("realtime_model_transcription", callback) @property def current_utterance(self): """Get current utterance handle""" return getattr(self.realtime_model, 'current_utterance', None) @current_utterance.setter def current_utterance(self, value): """Set current utterance handle""" if hasattr(self.realtime_model, 'current_utterance'): self.realtime_model.current_utterance = value async def aclose(self) -> None: """Close and cleanup the realtime model""" await self.realtime_model.aclose() async def cleanup(self) -> None: """Cleanup resources""" await self.aclose()Wraps a RealtimeBaseModel to expose an LLM-compatible interface.
This allows realtime models (like OpenAI Realtime API, Gemini Live) to be used in place of standard LLM components in the pipeline architecture.
Key differences from standard LLMs: - Realtime models handle their own audio I/O (STT + TTS built-in) - They maintain their own conversation state - Function calling may work differently
This wrapper primarily delegates to the underlying realtime model and provides adapter methods to make it look like an LLM from the pipeline's perspective.
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop current_utterance-
Expand source code
@property def current_utterance(self): """Get current utterance handle""" return getattr(self.realtime_model, 'current_utterance', None)Get current utterance handle
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Close and cleanup the realtime model""" await self.realtime_model.aclose()Close and cleanup the realtime model
async def cancel_current_generation(self) ‑> None-
Expand source code
async def cancel_current_generation(self) -> None: """Cancel the current generation (LLM compatibility method)""" await self.interrupt()Cancel the current generation (LLM compatibility method)
async def chat(self,
context: ChatContext,
tools: list[Any] | None = None,
conversational_graph: Any | None = None,
**kwargs) ‑> AsyncIterator[ResponseChunk]-
Expand source code
async def chat( self, context: ChatContext, tools: list[Any] | None = None, conversational_graph: Any | None = None, **kwargs ) -> AsyncIterator[ResponseChunk]: """ Adapter method for LLM compatibility. For realtime models, the chat method is less relevant since they handle audio I/O directly. This method exists for interface compatibility but yields minimal content since the actual response happens through audio. Args: context: Chat context (may be ignored by realtime models) tools: Available function tools conversational_graph: Optional conversational graph **kwargs: Additional arguments Yields: ResponseChunk objects (mostly empty for realtime models) """ logger.info("RealtimeLLMAdapter.chat() called - realtime models handle I/O directly") async def empty_gen(): yield ResponseChunk(content="", metadata={"realtime_mode": True}, role=ChatRole.ASSISTANT) async for chunk in empty_gen(): yield chunkAdapter method for LLM compatibility.
For realtime models, the chat method is less relevant since they handle audio I/O directly. This method exists for interface compatibility but yields minimal content since the actual response happens through audio.
Args
context- Chat context (may be ignored by realtime models)
tools- Available function tools
conversational_graph- Optional conversational graph
**kwargs- Additional arguments
Yields
ResponseChunk objects (mostly empty for realtime models)
async def cleanup(self) ‑> None-
Expand source code
async def cleanup(self) -> None: """Cleanup resources""" await self.aclose()Cleanup resources
async def connect(self) ‑> None-
Expand source code
async def connect(self) -> None: """Connect the realtime model""" await self.realtime_model.connect()Connect the realtime model
async def handle_audio_input(self, audio_data: bytes) ‑> None-
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None: """ Process incoming audio through the realtime model. Args: audio_data: Raw audio bytes """ await self.realtime_model.handle_audio_input(audio_data)Process incoming audio through the realtime model.
Args
audio_data- Raw audio bytes
async def handle_video_input(self, video_frame: Any) ‑> None-
Expand source code
async def handle_video_input(self, video_frame: Any) -> None: """ Process incoming video through the realtime model (if supported). Args: video_frame: Video frame data """ if hasattr(self.realtime_model, 'handle_video_input'): await self.realtime_model.handle_video_input(video_frame) else: logger.warning(f"Realtime model {type(self.realtime_model).__name__} does not support video input")Process incoming video through the realtime model (if supported).
Args
video_frame- Video frame data
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Interrupt the realtime model's current response""" if hasattr(self.realtime_model, 'interrupt'): await self.realtime_model.interrupt()Interrupt the realtime model's current response
def on_agent_speech_ended(self, callback) ‑> None-
Expand source code
def on_agent_speech_ended(self, callback) -> None: """Register callback for agent speech ended event""" self.realtime_model.on("agent_speech_ended", callback)Register callback for agent speech ended event
def on_agent_speech_started(self, callback) ‑> None-
Expand source code
def on_agent_speech_started(self, callback) -> None: """Register callback for agent speech started event""" self.realtime_model.on("agent_speech_started", callback)Register callback for agent speech started event
def on_transcription(self, callback) ‑> None-
Expand source code
def on_transcription(self, callback) -> None: """Register callback for transcription events""" self.realtime_model.on("realtime_model_transcription", callback)Register callback for transcription events
def on_user_speech_ended(self, callback) ‑> None-
Expand source code
def on_user_speech_ended(self, callback) -> None: """Register callback for user speech ended event""" self.realtime_model.on("user_speech_ended", callback)Register callback for user speech ended event
def on_user_speech_started(self, callback) ‑> None-
Expand source code
def on_user_speech_started(self, callback) -> None: """Register callback for user speech started event""" self.realtime_model.on("user_speech_started", callback)Register callback for user speech started event
async def send_message(self, message: str) ‑> None-
Expand source code
async def send_message(self, message: str) -> None: """ Send a text message to the realtime model. Args: message: Text message to send """ await self.realtime_model.send_message(message)Send a text message to the realtime model.
Args
message- Text message to send
async def send_message_with_frames(self, message: str, frames: list[Any]) ‑> None-
Expand source code
async def send_message_with_frames(self, message: str, frames: list[Any]) -> None: """ Send a message with video frames (for vision-enabled models). Args: message: Text message frames: List of video frames """ if hasattr(self.realtime_model, 'send_message_with_frames'): await self.realtime_model.send_message_with_frames(message, frames) else: logger.warning(f"Realtime model {type(self.realtime_model).__name__} does not support frames") await self.send_message(message)Send a message with video frames (for vision-enabled models).
Args
message- Text message
frames- List of video frames
async def send_text_message(self, message: str) ‑> None-
Expand source code
async def send_text_message(self, message: str) -> None: """ Send a text-only message (for models supporting text modality). Args: message: Text message to send """ if hasattr(self.realtime_model, 'send_text_message'): await self.realtime_model.send_text_message(message) else: await self.realtime_model.send_message(message)Send a text-only message (for models supporting text modality).
Args
message- Text message to send
def set_agent(self, agent: Agent) ‑> None-
Expand source code
def set_agent(self, agent: Agent) -> None: """Set the agent for this wrapper""" self.agent = agent if hasattr(self.realtime_model, 'set_agent'): self.realtime_model.set_agent(agent)Set the agent for this wrapper
Inherited members