Module videosdk.plugins.openai.realtime_api
Classes
class OpenAIRealtime (*,
api_key: str | None = None,
model: str,
config: OpenAIRealtimeConfig | None = None,
base_url: str | None = None)-
Expand source code
class OpenAIRealtime(RealtimeBaseModel[OpenAIEventTypes]): """OpenAI's realtime model implementation.""" def __init__( self, *, api_key: str | None = None, model: str, config: OpenAIRealtimeConfig | None = None, base_url: str | None = None, ) -> None: """ Initialize OpenAI realtime model. Args: api_key: OpenAI API key. If not provided, will attempt to read from OPENAI_API_KEY env var model: The OpenAI model identifier to use (e.g. 'gpt-4', 'gpt-3.5-turbo') config: Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID to use for audio output - temperature: Sampling temperature for responses - turn_detection: Settings for detecting user speech turns - input_audio_transcription: Settings for audio transcription - tool_choice: How tools should be selected ('auto' or 'none') - modalities: List of enabled modalities ('text', 'audio') base_url: Base URL for OpenAI API. Defaults to 'https://api.openai.com/v1' Raises: ValueError: If no API key is provided and none found in environment variables """ super().__init__() self.model = model self.api_key = api_key or os.getenv("OPENAI_API_KEY") self.base_url = base_url or OPENAI_BASE_URL if not self.api_key: self.emit( "error", "OpenAI API key must be provided or set in OPENAI_API_KEY environment variable", ) raise ValueError( "OpenAI API key must be provided or set in OPENAI_API_KEY environment variable" ) self._http_session: Optional[aiohttp.ClientSession] = None self._session: Optional[OpenAISession] = None self._closing = False self._instructions: Optional[str] = None self._tools: Optional[List[FunctionTool]] = [] self.loop = None self.audio_track: Optional[CustomAudioStreamTrack] = None self._formatted_tools: Optional[List[Dict[str, Any]]] = None self.config: OpenAIRealtimeConfig = config or OpenAIRealtimeConfig() self.input_sample_rate = 48000 self.target_sample_rate = 16000 self._agent_speaking = False def set_agent(self, agent: Agent) -> None: self._instructions = agent.instructions self._tools = agent.tools self.tools_formatted = self._format_tools_for_session(self._tools) self._formatted_tools = self.tools_formatted async def connect(self) -> None: headers = {"Agent": "VideoSDK Agents"} headers["Authorization"] = f"Bearer {self.api_key}" headers["OpenAI-Beta"] = "realtime=v1" url = self.process_base_url(self.base_url, self.model) self._session = await self._create_session(url, headers) await self._handle_websocket(self._session) await self.send_first_session_update() async def handle_audio_input(self, audio_data: bytes) -> None: """Handle incoming audio data from the user""" if self._session and not self._closing and "audio" in self.config.modalities: audio_data = np.frombuffer(audio_data, dtype=np.int16) audio_data = signal.resample( audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate), ) audio_data = audio_data.astype(np.int16).tobytes() base64_audio_data = base64.b64encode(audio_data).decode("utf-8") audio_event = { "type": "input_audio_buffer.append", "audio": base64_audio_data, } await self.send_event(audio_event) async def _ensure_http_session(self) -> aiohttp.ClientSession: """Ensure we have an HTTP session""" if not self._http_session: self._http_session = aiohttp.ClientSession() return self._http_session async def _create_session(self, url: str, headers: dict) -> OpenAISession: """Create a new WebSocket session""" http_session = await self._ensure_http_session() ws = await http_session.ws_connect( url, headers=headers, autoping=True, heartbeat=10, autoclose=False, timeout=30, ) msg_queue: asyncio.Queue = asyncio.Queue() tasks: list[asyncio.Task] = [] self._closing = False return OpenAISession(ws=ws, msg_queue=msg_queue, tasks=tasks) async def send_message(self, message: str) -> None: """Send a message to the OpenAI realtime API""" await self.send_event( { "type": "conversation.item.create", "item": { "type": "message", "role": "assistant", "content": [ { "type": "text", "text": "Repeat the user's exact message back to them:" + message + "DO NOT ADD ANYTHING ELSE", } ], }, } ) await self.create_response() async def handle_video_input(self, video_data: av.VideoFrame) -> None: if not self._session or self._closing: return try: if not video_data or not video_data.planes: return processed_jpeg = encode_image(video_data, DEFAULT_IMAGE_ENCODE_OPTIONS) if not processed_jpeg or len(processed_jpeg) < 100: logger.warning("Invalid JPEG data generated") return base64_url = self.bytes_to_base64_url(processed_jpeg) content = [{"type": "input_image", "image_url": base64_url}] conversation_event = { "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": content, }, } await self.send_event(conversation_event) except Exception as e: self.emit("error", f"Video processing error: {str(e)}") async def send_message_with_frames( self, message: Optional[str], frames: list[av.VideoFrame] ) -> None: content = [] if message: content.append({"type": "input_text", "text": message}) for frame in frames: try: processed_jpeg = encode_image(frame, DEFAULT_IMAGE_ENCODE_OPTIONS) if not processed_jpeg or len(processed_jpeg) < 100: logger.warning("Invalid JPEG data generated") continue base64_url = self.bytes_to_base64_url(processed_jpeg) content.append({"type": "input_image", "image_url": base64_url}) except Exception as e: logger.error(f"Error processing frame: {e}") if not any( item.get("type") == "input_image" or item.get("type") == "input_text" for item in content ): logger.warning("No content to send.") return conversation_event = { "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": content, }, } await self.send_event(conversation_event) await self.create_response() async def create_response(self) -> None: """Create a response to the OpenAI realtime API""" if not self._session: self.emit("error", "No active WebSocket session") raise RuntimeError("No active WebSocket session") response_event = { "type": "response.create", "event_id": str(uuid.uuid4()), "response": { "instructions": self._instructions, "metadata": {"client_event_id": str(uuid.uuid4())}, }, } await self.send_event(response_event) async def _handle_websocket(self, session: OpenAISession) -> None: """Start WebSocket send/receive tasks""" session.tasks.extend( [ asyncio.create_task(self._send_loop(session), name="send_loop"), asyncio.create_task(self._receive_loop(session), name="receive_loop"), ] ) async def _send_loop(self, session: OpenAISession) -> None: """Send messages from queue to WebSocket""" try: while not self._closing: msg = await session.msg_queue.get() if isinstance(msg, dict): await session.ws.send_json(msg) else: await session.ws.send_str(str(msg)) except asyncio.CancelledError: pass finally: await self._cleanup_session(session) async def _receive_loop(self, session: OpenAISession) -> None: """Receive and process WebSocket messages""" try: while not self._closing: msg = await session.ws.receive() if msg.type == aiohttp.WSMsgType.CLOSED: self.emit("error", f"WebSocket closed with reason: {msg.extra}") break elif msg.type == aiohttp.WSMsgType.ERROR: self.emit("error", f"WebSocket error: {msg.data}") break elif msg.type == aiohttp.WSMsgType.TEXT: await self._handle_message(json.loads(msg.data)) except Exception as e: self.emit("error", f"WebSocket receive error: {str(e)}") finally: await self._cleanup_session(session) async def _handle_message(self, data: dict) -> None: """Handle incoming WebSocket messages""" try: event_type = data.get("type") if event_type == "input_audio_buffer.speech_started": await self._handle_speech_started(data) elif event_type == "input_audio_buffer.speech_stopped": await self._handle_speech_stopped(data) elif event_type == "response.created": await self._handle_response_created(data) elif event_type == "response.output_item.added": await self._handle_output_item_added(data) elif event_type == "response.content_part.added": await self._handle_content_part_added(data) elif event_type == "response.text.delta": await self._handle_text_delta(data) elif event_type == "response.audio.delta": await self._handle_audio_delta(data) elif event_type == "response.audio_transcript.delta": await self._handle_audio_transcript_delta(data) elif event_type == "response.done": await self._handle_response_done(data) elif event_type == "error": await self._handle_error(data) elif event_type == "response.function_call_arguments.delta": await self._handle_function_call_arguments_delta(data) elif event_type == "response.function_call_arguments.done": await self._handle_function_call_arguments_done(data) elif event_type == "response.output_item.done": await self._handle_output_item_done(data) elif event_type == "conversation.item.input_audio_transcription.completed": await self._handle_input_audio_transcription_completed(data) elif event_type == "response.text.done": await self._handle_text_done(data) except Exception as e: self.emit("error", f"Error handling event {event_type}: {str(e)}") async def _handle_speech_started(self, data: dict) -> None: """Handle speech detection start""" if "audio" in self.config.modalities: self.emit("user_speech_started", {"type": "done"}) await self.interrupt() if self.audio_track: self.audio_track.interrupt() await realtime_metrics_collector.set_user_speech_start() async def _handle_speech_stopped(self, data: dict) -> None: """Handle speech detection end""" await realtime_metrics_collector.set_user_speech_end() self.emit("user_speech_ended", {}) async def _handle_response_created(self, data: dict) -> None: """Handle initial response creation""" response_id = data.get("response", {}).get("id") async def _handle_output_item_added(self, data: dict) -> None: """Handle new output item addition""" async def _handle_output_item_done(self, data: dict) -> None: """Handle output item done""" try: item = data.get("item", {}) if ( item.get("type") == "function_call" and item.get("status") == "completed" ): name = item.get("name") arguments = json.loads(item.get("arguments", "{}")) if name and self._tools: for tool in self._tools: tool_info = get_tool_info(tool) if tool_info.name == name: try: await realtime_metrics_collector.add_tool_call(name) result = await tool(**arguments) await self.send_event( { "type": "conversation.item.create", "item": { "type": "function_call_output", "call_id": item.get("call_id"), "output": json.dumps(result), }, } ) await self.send_event( { "type": "response.create", "event_id": str(uuid.uuid4()), "response": { "instructions": self._instructions, "metadata": { "client_event_id": str(uuid.uuid4()) }, }, } ) except Exception as e: self.emit( "error", f"Error executing function {name}: {e}" ) break except Exception as e: self.emit("error", f"Error handling output item done: {e}") async def _handle_content_part_added(self, data: dict) -> None: """Handle new content part""" async def _handle_text_delta(self, data: dict) -> None: """Handle text delta chunk""" pass async def _handle_audio_delta(self, data: dict) -> None: """Handle audio chunk""" if "audio" not in self.config.modalities: return try: if not self._agent_speaking: await realtime_metrics_collector.set_agent_speech_start() self._agent_speaking = True self.emit("agent_speech_started", {}) base64_audio_data = base64.b64decode(data.get("delta")) if base64_audio_data: if self.audio_track and self.loop: asyncio.create_task( self.audio_track.add_new_bytes(base64_audio_data) ) except Exception as e: self.emit("error", f"Error handling audio delta: {e}") traceback.print_exc() async def interrupt(self) -> None: """Interrupt the current response and flush audio""" if self._session and not self._closing: cancel_event = {"type": "response.cancel", "event_id": str(uuid.uuid4())} await self.send_event(cancel_event) await realtime_metrics_collector.set_interrupted() if self.audio_track: self.audio_track.interrupt() if self._agent_speaking: self.emit("agent_speech_ended", {}) await realtime_metrics_collector.set_agent_speech_end(timeout=1.0) self._agent_speaking = False async def _handle_audio_transcript_delta(self, data: dict) -> None: """Handle transcript chunk""" delta_content = data.get("delta", "") if not hasattr(self, "_current_audio_transcript"): self._current_audio_transcript = "" self._current_audio_transcript += delta_content async def _handle_input_audio_transcription_completed(self, data: dict) -> None: """Handle input audio transcription completion for user transcript""" transcript = data.get("transcript", "") if transcript: await realtime_metrics_collector.set_user_transcript(transcript) try: self.emit( "realtime_model_transcription", {"role": "user", "text": transcript, "is_final": True}, ) except Exception: pass async def _handle_response_done(self, data: dict) -> None: """Handle response completion for agent transcript""" if ( hasattr(self, "_current_audio_transcript") and self._current_audio_transcript ): await realtime_metrics_collector.set_agent_response( self._current_audio_transcript ) global_event_emitter.emit( "text_response", {"text": self._current_audio_transcript, "type": "done"}, ) try: self.emit( "realtime_model_transcription", { "role": "agent", "text": self._current_audio_transcript, "is_final": True, }, ) except Exception: pass self._current_audio_transcript = "" self.emit("agent_speech_ended", {}) await realtime_metrics_collector.set_agent_speech_end(timeout=1.0) self._agent_speaking = False pass async def _handle_function_call_arguments_delta(self, data: dict) -> None: """Handle function call arguments delta""" async def _handle_function_call_arguments_done(self, data: dict) -> None: """Handle function call arguments done""" async def _handle_error(self, data: dict) -> None: """Handle error events""" async def _cleanup_session(self, session: OpenAISession) -> None: """Clean up session resources""" if self._closing: return self._closing = True for task in session.tasks: if not task.done(): task.cancel() try: await asyncio.wait_for(task, timeout=1.0) # Add timeout except (asyncio.CancelledError, asyncio.TimeoutError): pass if not session.ws.closed: try: await session.ws.close() except Exception: pass async def send_event(self, event: Dict[str, Any]) -> None: """Send an event to the WebSocket""" if self._session and not self._closing: await self._session.msg_queue.put(event) async def aclose(self) -> None: """Cleanup all resources""" if self._closing: return self._closing = True if self._session: await self._cleanup_session(self._session) if self._http_session and not self._http_session.closed: await self._http_session.close() async def send_first_session_update(self) -> None: """Send initial session update with default values after connection""" if not self._session: return turn_detection = None input_audio_transcription = None if "audio" in self.config.modalities: turn_detection = ( self.config.turn_detection.model_dump( by_alias=True, exclude_unset=True, exclude_defaults=True, ) if self.config.turn_detection else None ) input_audio_transcription = ( self.config.input_audio_transcription.model_dump( by_alias=True, exclude_unset=True, exclude_defaults=True, ) if self.config.input_audio_transcription else None ) session_update = { "type": "session.update", "session": { "model": self.model, "instructions": self._instructions or "You are a helpful assistant that can answer questions and help with tasks.", "temperature": self.config.temperature, "tool_choice": self.config.tool_choice, "tools": self._formatted_tools or [], "modalities": self.config.modalities, "max_response_output_tokens": "inf", }, } if "audio" in self.config.modalities: session_update["session"]["voice"] = self.config.voice session_update["session"]["input_audio_format"] = DEFAULT_INPUT_AUDIO_FORMAT session_update["session"][ "output_audio_format" ] = DEFAULT_OUTPUT_AUDIO_FORMAT if turn_detection: session_update["session"]["turn_detection"] = turn_detection if input_audio_transcription: session_update["session"][ "input_audio_transcription" ] = input_audio_transcription # Send the event await self.send_event(session_update) def process_base_url(self, url: str, model: str) -> str: if url.startswith("http"): url = url.replace("http", "ws", 1) parsed_url = urlparse(url) query_params = parse_qs(parsed_url.query) if not parsed_url.path or parsed_url.path.rstrip("/") in ["", "/v1", "/openai"]: path = parsed_url.path.rstrip("/") + "/realtime" else: path = parsed_url.path if "model" not in query_params: query_params["model"] = [model] new_query = urlencode(query_params, doseq=True) new_url = urlunparse( (parsed_url.scheme, parsed_url.netloc, path, "", new_query, "") ) return new_url def _format_tools_for_session( self, tools: List[FunctionTool] ) -> List[Dict[str, Any]]: """Format tools for OpenAI session update""" oai_tools = [] for tool in tools: if not is_function_tool(tool): continue try: tool_schema = build_openai_schema(tool) oai_tools.append(tool_schema) except Exception as e: self.emit("error", f"Failed to format tool {tool}: {e}") continue return oai_tools async def send_text_message(self, message: str) -> None: """Send a text message to the OpenAI realtime API""" if not self._session: self.emit("error", "No active WebSocket session") raise RuntimeError("No active WebSocket session") await self.send_event( { "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": [{"type": "input_text", "text": message}], }, } ) await self.create_response() def bytes_to_base64_url(self, image_bytes: bytes, fmt: str = "jpeg") -> str: mime = f"image/{fmt.lower()}" encoded = base64.b64encode(image_bytes).decode("utf-8") return f"data:{mime};base64,{encoded}"OpenAI's realtime model implementation.
Initialize OpenAI realtime model.
Args
api_key- OpenAI API key. If not provided, will attempt to read from OPENAI_API_KEY env var
model- The OpenAI model identifier to use (e.g. 'gpt-4', 'gpt-3.5-turbo')
config- Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID to use for audio output - temperature: Sampling temperature for responses - turn_detection: Settings for detecting user speech turns - input_audio_transcription: Settings for audio transcription - tool_choice: How tools should be selected ('auto' or 'none') - modalities: List of enabled modalities ('text', 'audio')
base_url- Base URL for OpenAI API. Defaults to 'https://api.openai.com/v1'
Raises
ValueError- If no API key is provided and none found in environment variables
Ancestors
- videosdk.agents.realtime_base_model.RealtimeBaseModel
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
- abc.ABC
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup all resources""" if self._closing: return self._closing = True if self._session: await self._cleanup_session(self._session) if self._http_session and not self._http_session.closed: await self._http_session.close()Cleanup all resources
def bytes_to_base64_url(self, image_bytes: bytes, fmt: str = 'jpeg') ‑> str-
Expand source code
def bytes_to_base64_url(self, image_bytes: bytes, fmt: str = "jpeg") -> str: mime = f"image/{fmt.lower()}" encoded = base64.b64encode(image_bytes).decode("utf-8") return f"data:{mime};base64,{encoded}" async def connect(self) ‑> None-
Expand source code
async def connect(self) -> None: headers = {"Agent": "VideoSDK Agents"} headers["Authorization"] = f"Bearer {self.api_key}" headers["OpenAI-Beta"] = "realtime=v1" url = self.process_base_url(self.base_url, self.model) self._session = await self._create_session(url, headers) await self._handle_websocket(self._session) await self.send_first_session_update() async def create_response(self) ‑> None-
Expand source code
async def create_response(self) -> None: """Create a response to the OpenAI realtime API""" if not self._session: self.emit("error", "No active WebSocket session") raise RuntimeError("No active WebSocket session") response_event = { "type": "response.create", "event_id": str(uuid.uuid4()), "response": { "instructions": self._instructions, "metadata": {"client_event_id": str(uuid.uuid4())}, }, } await self.send_event(response_event)Create a response to the OpenAI realtime API
async def handle_audio_input(self, audio_data: bytes) ‑> None-
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None: """Handle incoming audio data from the user""" if self._session and not self._closing and "audio" in self.config.modalities: audio_data = np.frombuffer(audio_data, dtype=np.int16) audio_data = signal.resample( audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate), ) audio_data = audio_data.astype(np.int16).tobytes() base64_audio_data = base64.b64encode(audio_data).decode("utf-8") audio_event = { "type": "input_audio_buffer.append", "audio": base64_audio_data, } await self.send_event(audio_event)Handle incoming audio data from the user
async def handle_video_input(self, video_data: av.VideoFrame) ‑> None-
Expand source code
async def handle_video_input(self, video_data: av.VideoFrame) -> None: if not self._session or self._closing: return try: if not video_data or not video_data.planes: return processed_jpeg = encode_image(video_data, DEFAULT_IMAGE_ENCODE_OPTIONS) if not processed_jpeg or len(processed_jpeg) < 100: logger.warning("Invalid JPEG data generated") return base64_url = self.bytes_to_base64_url(processed_jpeg) content = [{"type": "input_image", "image_url": base64_url}] conversation_event = { "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": content, }, } await self.send_event(conversation_event) except Exception as e: self.emit("error", f"Video processing error: {str(e)}") async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Interrupt the current response and flush audio""" if self._session and not self._closing: cancel_event = {"type": "response.cancel", "event_id": str(uuid.uuid4())} await self.send_event(cancel_event) await realtime_metrics_collector.set_interrupted() if self.audio_track: self.audio_track.interrupt() if self._agent_speaking: self.emit("agent_speech_ended", {}) await realtime_metrics_collector.set_agent_speech_end(timeout=1.0) self._agent_speaking = FalseInterrupt the current response and flush audio
def process_base_url(self, url: str, model: str) ‑> str-
Expand source code
def process_base_url(self, url: str, model: str) -> str: if url.startswith("http"): url = url.replace("http", "ws", 1) parsed_url = urlparse(url) query_params = parse_qs(parsed_url.query) if not parsed_url.path or parsed_url.path.rstrip("/") in ["", "/v1", "/openai"]: path = parsed_url.path.rstrip("/") + "/realtime" else: path = parsed_url.path if "model" not in query_params: query_params["model"] = [model] new_query = urlencode(query_params, doseq=True) new_url = urlunparse( (parsed_url.scheme, parsed_url.netloc, path, "", new_query, "") ) return new_url async def send_event(self, event: Dict[str, Any]) ‑> None-
Expand source code
async def send_event(self, event: Dict[str, Any]) -> None: """Send an event to the WebSocket""" if self._session and not self._closing: await self._session.msg_queue.put(event)Send an event to the WebSocket
async def send_first_session_update(self) ‑> None-
Expand source code
async def send_first_session_update(self) -> None: """Send initial session update with default values after connection""" if not self._session: return turn_detection = None input_audio_transcription = None if "audio" in self.config.modalities: turn_detection = ( self.config.turn_detection.model_dump( by_alias=True, exclude_unset=True, exclude_defaults=True, ) if self.config.turn_detection else None ) input_audio_transcription = ( self.config.input_audio_transcription.model_dump( by_alias=True, exclude_unset=True, exclude_defaults=True, ) if self.config.input_audio_transcription else None ) session_update = { "type": "session.update", "session": { "model": self.model, "instructions": self._instructions or "You are a helpful assistant that can answer questions and help with tasks.", "temperature": self.config.temperature, "tool_choice": self.config.tool_choice, "tools": self._formatted_tools or [], "modalities": self.config.modalities, "max_response_output_tokens": "inf", }, } if "audio" in self.config.modalities: session_update["session"]["voice"] = self.config.voice session_update["session"]["input_audio_format"] = DEFAULT_INPUT_AUDIO_FORMAT session_update["session"][ "output_audio_format" ] = DEFAULT_OUTPUT_AUDIO_FORMAT if turn_detection: session_update["session"]["turn_detection"] = turn_detection if input_audio_transcription: session_update["session"][ "input_audio_transcription" ] = input_audio_transcription # Send the event await self.send_event(session_update)Send initial session update with default values after connection
async def send_message(self, message: str) ‑> None-
Expand source code
async def send_message(self, message: str) -> None: """Send a message to the OpenAI realtime API""" await self.send_event( { "type": "conversation.item.create", "item": { "type": "message", "role": "assistant", "content": [ { "type": "text", "text": "Repeat the user's exact message back to them:" + message + "DO NOT ADD ANYTHING ELSE", } ], }, } ) await self.create_response()Send a message to the OpenAI realtime API
async def send_message_with_frames(self, message: Optional[str], frames: list[av.VideoFrame]) ‑> None-
Expand source code
async def send_message_with_frames( self, message: Optional[str], frames: list[av.VideoFrame] ) -> None: content = [] if message: content.append({"type": "input_text", "text": message}) for frame in frames: try: processed_jpeg = encode_image(frame, DEFAULT_IMAGE_ENCODE_OPTIONS) if not processed_jpeg or len(processed_jpeg) < 100: logger.warning("Invalid JPEG data generated") continue base64_url = self.bytes_to_base64_url(processed_jpeg) content.append({"type": "input_image", "image_url": base64_url}) except Exception as e: logger.error(f"Error processing frame: {e}") if not any( item.get("type") == "input_image" or item.get("type") == "input_text" for item in content ): logger.warning("No content to send.") return conversation_event = { "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": content, }, } await self.send_event(conversation_event) await self.create_response() async def send_text_message(self, message: str) ‑> None-
Expand source code
async def send_text_message(self, message: str) -> None: """Send a text message to the OpenAI realtime API""" if not self._session: self.emit("error", "No active WebSocket session") raise RuntimeError("No active WebSocket session") await self.send_event( { "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": [{"type": "input_text", "text": message}], }, } ) await self.create_response()Send a text message to the OpenAI realtime API
def set_agent(self, agent: Agent) ‑> None-
Expand source code
def set_agent(self, agent: Agent) -> None: self._instructions = agent.instructions self._tools = agent.tools self.tools_formatted = self._format_tools_for_session(self._tools) self._formatted_tools = self.tools_formatted
class OpenAIRealtimeConfig (voice: str = 'alloy',
temperature: float = 0.8,
turn_detection: TurnDetection | None = <factory>,
input_audio_transcription: InputAudioTranscription | None = <factory>,
tool_choice: ToolChoice | None = 'auto',
modalities: list[str] = <factory>)-
Expand source code
@dataclass class OpenAIRealtimeConfig: """Configuration for the OpenAI realtime API Args: voice: Voice ID for audio output. Default is 'alloy' temperature: Controls randomness in response generation. Higher values (e.g. 0.8) make output more random, lower values make it more deterministic. Default is 0.8 turn_detection: Configuration for detecting user speech turns. Contains settings for: - type: Detection type ('server_vad') - threshold: Voice activity detection threshold (0.0-1.0) - prefix_padding_ms: Padding before speech start (ms) - silence_duration_ms: Silence duration to mark end (ms) - create_response: Whether to generate response on turn - interrupt_response: Whether to allow interruption input_audio_transcription: Configuration for audio transcription. Contains: - model: Model to use for transcription tool_choice: How tools should be selected ('auto' or 'none'). Default is 'auto' modalities: List of enabled response types ["text", "audio"]. Default includes both """ voice: str = DEFAULT_VOICE temperature: float = DEFAULT_TEMPERATURE turn_detection: TurnDetection | None = field( default_factory=lambda: DEFAULT_TURN_DETECTION ) input_audio_transcription: InputAudioTranscription | None = field( default_factory=lambda: DEFAULT_INPUT_AUDIO_TRANSCRIPTION ) tool_choice: ToolChoice | None = DEFAULT_TOOL_CHOICE modalities: list[str] = field(default_factory=lambda: ["text", "audio"])Configuration for the OpenAI realtime API
Args
voice- Voice ID for audio output. Default is 'alloy'
temperature- Controls randomness in response generation. Higher values (e.g. 0.8) make output more random, lower values make it more deterministic. Default is 0.8
turn_detection- Configuration for detecting user speech turns. Contains settings for: - type: Detection type ('server_vad') - threshold: Voice activity detection threshold (0.0-1.0) - prefix_padding_ms: Padding before speech start (ms) - silence_duration_ms: Silence duration to mark end (ms) - create_response: Whether to generate response on turn - interrupt_response: Whether to allow interruption
input_audio_transcription- Configuration for audio transcription. Contains: - model: Model to use for transcription
tool_choice- How tools should be selected ('auto' or 'none'). Default is 'auto'
modalities- List of enabled response types ["text", "audio"]. Default includes both
Instance variables
var input_audio_transcription : openai.types.beta.realtime.session.InputAudioTranscription | Nonevar modalities : list[str]var temperature : floatvar tool_choice : Literal['auto', 'required', 'none'] | Nonevar turn_detection : openai.types.beta.realtime.session.TurnDetection | Nonevar voice : str
class OpenAISession (ws: aiohttp.ClientWebSocketResponse,
msg_queue: asyncio.Queue[Dict[str, Any]],
tasks: list[asyncio.Task])-
Expand source code
@dataclass class OpenAISession: """Represents an OpenAI WebSocket session""" ws: aiohttp.ClientWebSocketResponse msg_queue: asyncio.Queue[Dict[str, Any]] tasks: list[asyncio.Task]Represents an OpenAI WebSocket session
Instance variables
var msg_queue : asyncio.queues.Queue[typing.Dict[str, typing.Any]]var tasks : list[_asyncio.Task]var ws : aiohttp.client_ws.ClientWebSocketResponse