Package videosdk.plugins.openai
Sub-modules
videosdk.plugins.openai.llm
videosdk.plugins.openai.realtime_api
videosdk.plugins.openai.stt
videosdk.plugins.openai.tts
Classes
class OpenAILLM (*,
api_key: str | None = None,
model: str = 'gpt-4o-mini',
base_url: str | None = None,
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_completion_tokens: int | None = None)-
Expand source code
class OpenAILLM(LLM): def __init__( self, *, api_key: str | None = None, model: str = "gpt-4o-mini", base_url: str | None = None, temperature: float = 0.7, tool_choice: ToolChoice = "auto", max_completion_tokens: int | None = None, ) -> None: """Initialize the OpenAI LLM plugin. Args: api_key (Optional[str], optional): OpenAI API key. Defaults to None. model (str): The model to use for the LLM plugin. Defaults to "gpt-4o". base_url (Optional[str], optional): The base URL for the OpenAI API. Defaults to None. temperature (float): The temperature to use for the LLM plugin. Defaults to 0.7. tool_choice (ToolChoice): The tool choice to use for the LLM plugin. Defaults to "auto". max_completion_tokens (Optional[int], optional): The maximum completion tokens to use for the LLM plugin. Defaults to None. """ super().__init__() self.api_key = api_key or os.getenv("OPENAI_API_KEY") if not self.api_key: raise ValueError("OpenAI API key must be provided either through api_key parameter or OPENAI_API_KEY environment variable") self.model = model self.temperature = temperature self.tool_choice = tool_choice self.max_completion_tokens = max_completion_tokens self._cancelled = False self._client = openai.AsyncOpenAI( api_key=self.api_key, base_url=base_url or None, max_retries=0, http_client=httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ), ) @staticmethod def azure( *, model: str = "gpt-4o-mini", azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, temperature: float = 0.7, tool_choice: ToolChoice = "auto", max_completion_tokens: int | None = None, timeout: httpx.Timeout | None = None, ) -> "OpenAILLM": """ Create a new instance of Azure OpenAI LLM. This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name) """ azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT") api_version = api_version or os.getenv("OPENAI_API_VERSION") api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN") organization = organization or os.getenv("OPENAI_ORG_ID") project = project or os.getenv("OPENAI_PROJECT_ID") if not azure_deployment: azure_deployment = model if not azure_endpoint: raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable") if not api_key and not azure_ad_token: raise ValueError("Either API key or Azure AD token must be provided") azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, organization=organization, project=project, base_url=base_url, timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), ) instance = OpenAILLM( model=model, temperature=temperature, tool_choice=tool_choice, max_completion_tokens=max_completion_tokens, ) instance._client = azure_client return instance async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Implement chat functionality using OpenAI's chat completion API Args: messages: ChatContext containing conversation history tools: Optional list of function tools available to the model **kwargs: Additional arguments passed to the OpenAI API Yields: LLMResponse objects containing the model's responses """ self._cancelled = False def _format_content(content: Union[str, List[ChatContent]]): if isinstance(content, str): return content formatted_parts = [] for part in content: if isinstance(part, str): formatted_parts.append({"type": "text", "text": part}) elif isinstance(part, ImageContent): image_url_data = {"url": part.to_data_url()} if part.inference_detail != "auto": image_url_data["detail"] = part.inference_detail formatted_parts.append( { "type": "image_url", "image_url": image_url_data, } ) return formatted_parts completion_params = { "model": self.model, "messages": [ { "role": msg.role.value, "content": _format_content(msg.content), **({"name": msg.name} if hasattr(msg, "name") else {}), } if isinstance(msg, ChatMessage) else { "role": "assistant", "content": None, "function_call": {"name": msg.name, "arguments": msg.arguments}, } if isinstance(msg, FunctionCall) else { "role": "function", "name": msg.name, "content": msg.output, } if isinstance(msg, FunctionCallOutput) else None for msg in messages.items if msg is not None ], "temperature": self.temperature, "stream": True, "max_tokens": self.max_completion_tokens, } if tools: formatted_tools = [] for tool in tools: if not is_function_tool(tool): continue try: tool_schema = build_openai_schema(tool) formatted_tools.append(tool_schema) except Exception as e: self.emit("error", f"Failed to format tool {tool}: {e}") continue if formatted_tools: completion_params["functions"] = formatted_tools completion_params["function_call"] = self.tool_choice completion_params.update(kwargs) try: response_stream = await self._client.chat.completions.create(**completion_params) current_content = "" current_function_call = None async for chunk in response_stream: if self._cancelled: break if not chunk.choices: continue delta = chunk.choices[0].delta if delta.function_call: if current_function_call is None: current_function_call = { "name": delta.function_call.name or "", "arguments": delta.function_call.arguments or "" } else: if delta.function_call.name: current_function_call["name"] += delta.function_call.name if delta.function_call.arguments: current_function_call["arguments"] += delta.function_call.arguments elif current_function_call is not None: try: args = json.loads(current_function_call["arguments"]) current_function_call["arguments"] = args except json.JSONDecodeError: self.emit("error", f"Failed to parse function arguments: {current_function_call['arguments']}") current_function_call["arguments"] = {} yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"function_call": current_function_call} ) current_function_call = None elif delta.content is not None: current_content = delta.content yield LLMResponse( content=current_content, role=ChatRole.ASSISTANT ) except Exception as e: if not self._cancelled: self.emit("error", e) raise async def cancel_current_generation(self) -> None: self._cancelled = True async def aclose(self) -> None: """Cleanup resources by closing the HTTP client""" await self.cancel_current_generation() if self._client: await self._client.close()
Base class for LLM implementations.
Initialize the OpenAI LLM plugin.
Args
api_key
:Optional[str]
, optional- OpenAI API key. Defaults to None.
model
:str
- The model to use for the LLM plugin. Defaults to "gpt-4o".
base_url
:Optional[str]
, optional- The base URL for the OpenAI API. Defaults to None.
temperature
:float
- The temperature to use for the LLM plugin. Defaults to 0.7.
tool_choice
:ToolChoice
- The tool choice to use for the LLM plugin. Defaults to "auto".
max_completion_tokens
:Optional[int]
, optional- The maximum completion tokens to use for the LLM plugin. Defaults to None.
Ancestors
- videosdk.agents.llm.llm.LLM
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Static methods
def azure(*,
model: str = 'gpt-4o-mini',
azure_endpoint: str | None = None,
azure_deployment: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
azure_ad_token: str | None = None,
organization: str | None = None,
project: str | None = None,
base_url: str | None = None,
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_completion_tokens: int | None = None,
timeout: httpx.Timeout | None = None) ‑> OpenAILLM-
Expand source code
@staticmethod def azure( *, model: str = "gpt-4o-mini", azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, temperature: float = 0.7, tool_choice: ToolChoice = "auto", max_completion_tokens: int | None = None, timeout: httpx.Timeout | None = None, ) -> "OpenAILLM": """ Create a new instance of Azure OpenAI LLM. This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name) """ azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT") api_version = api_version or os.getenv("OPENAI_API_VERSION") api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN") organization = organization or os.getenv("OPENAI_ORG_ID") project = project or os.getenv("OPENAI_PROJECT_ID") if not azure_deployment: azure_deployment = model if not azure_endpoint: raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable") if not api_key and not azure_ad_token: raise ValueError("Either API key or Azure AD token must be provided") azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, organization=organization, project=project, base_url=base_url, timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), ) instance = OpenAILLM( model=model, temperature=temperature, tool_choice=tool_choice, max_completion_tokens=max_completion_tokens, ) instance._client = azure_client return instance
Create a new instance of Azure OpenAI LLM.
This automatically infers the following arguments from their corresponding environment variables if they are not provided: -
api_key
fromAZURE_OPENAI_API_KEY
-organization
fromOPENAI_ORG_ID
-project
fromOPENAI_PROJECT_ID
-azure_ad_token
fromAZURE_OPENAI_AD_TOKEN
-api_version
fromOPENAI_API_VERSION
-azure_endpoint
fromAZURE_OPENAI_ENDPOINT
-azure_deployment
fromAZURE_OPENAI_DEPLOYMENT
(if not provided, usesmodel
as deployment name)
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Cleanup resources by closing the HTTP client""" await self.cancel_current_generation() if self._client: await self._client.close()
Cleanup resources by closing the HTTP client
async def cancel_current_generation(self) ‑> None
-
Expand source code
async def cancel_current_generation(self) -> None: self._cancelled = True
Cancel the current LLM generation if active.
Raises
NotImplementedError
- This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]-
Expand source code
async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Implement chat functionality using OpenAI's chat completion API Args: messages: ChatContext containing conversation history tools: Optional list of function tools available to the model **kwargs: Additional arguments passed to the OpenAI API Yields: LLMResponse objects containing the model's responses """ self._cancelled = False def _format_content(content: Union[str, List[ChatContent]]): if isinstance(content, str): return content formatted_parts = [] for part in content: if isinstance(part, str): formatted_parts.append({"type": "text", "text": part}) elif isinstance(part, ImageContent): image_url_data = {"url": part.to_data_url()} if part.inference_detail != "auto": image_url_data["detail"] = part.inference_detail formatted_parts.append( { "type": "image_url", "image_url": image_url_data, } ) return formatted_parts completion_params = { "model": self.model, "messages": [ { "role": msg.role.value, "content": _format_content(msg.content), **({"name": msg.name} if hasattr(msg, "name") else {}), } if isinstance(msg, ChatMessage) else { "role": "assistant", "content": None, "function_call": {"name": msg.name, "arguments": msg.arguments}, } if isinstance(msg, FunctionCall) else { "role": "function", "name": msg.name, "content": msg.output, } if isinstance(msg, FunctionCallOutput) else None for msg in messages.items if msg is not None ], "temperature": self.temperature, "stream": True, "max_tokens": self.max_completion_tokens, } if tools: formatted_tools = [] for tool in tools: if not is_function_tool(tool): continue try: tool_schema = build_openai_schema(tool) formatted_tools.append(tool_schema) except Exception as e: self.emit("error", f"Failed to format tool {tool}: {e}") continue if formatted_tools: completion_params["functions"] = formatted_tools completion_params["function_call"] = self.tool_choice completion_params.update(kwargs) try: response_stream = await self._client.chat.completions.create(**completion_params) current_content = "" current_function_call = None async for chunk in response_stream: if self._cancelled: break if not chunk.choices: continue delta = chunk.choices[0].delta if delta.function_call: if current_function_call is None: current_function_call = { "name": delta.function_call.name or "", "arguments": delta.function_call.arguments or "" } else: if delta.function_call.name: current_function_call["name"] += delta.function_call.name if delta.function_call.arguments: current_function_call["arguments"] += delta.function_call.arguments elif current_function_call is not None: try: args = json.loads(current_function_call["arguments"]) current_function_call["arguments"] = args except json.JSONDecodeError: self.emit("error", f"Failed to parse function arguments: {current_function_call['arguments']}") current_function_call["arguments"] = {} yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"function_call": current_function_call} ) current_function_call = None elif delta.content is not None: current_content = delta.content yield LLMResponse( content=current_content, role=ChatRole.ASSISTANT ) except Exception as e: if not self._cancelled: self.emit("error", e) raise
Implement chat functionality using OpenAI's chat completion API
Args
messages
- ChatContext containing conversation history
tools
- Optional list of function tools available to the model
**kwargs
- Additional arguments passed to the OpenAI API
Yields
LLMResponse objects containing the model's responses
class OpenAIRealtime (*,
api_key: str | None = None,
model: str,
config: OpenAIRealtimeConfig | None = None,
base_url: str | None = None)-
Expand source code
class OpenAIRealtime(RealtimeBaseModel[OpenAIEventTypes]): """OpenAI's realtime model implementation.""" def __init__( self, *, api_key: str | None = None, model: str, config: OpenAIRealtimeConfig | None = None, base_url: str | None = None, ) -> None: """ Initialize OpenAI realtime model. Args: api_key: OpenAI API key. If not provided, will attempt to read from OPENAI_API_KEY env var model: The OpenAI model identifier to use (e.g. 'gpt-4', 'gpt-3.5-turbo') config: Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID to use for audio output - temperature: Sampling temperature for responses - turn_detection: Settings for detecting user speech turns - input_audio_transcription: Settings for audio transcription - tool_choice: How tools should be selected ('auto' or 'none') - modalities: List of enabled modalities ('text', 'audio') base_url: Base URL for OpenAI API. Defaults to 'https://api.openai.com/v1' Raises: ValueError: If no API key is provided and none found in environment variables """ super().__init__() self.model = model self.api_key = api_key or os.getenv("OPENAI_API_KEY") self.base_url = base_url or OPENAI_BASE_URL if not self.api_key: self.emit( "error", "OpenAI API key must be provided or set in OPENAI_API_KEY environment variable", ) raise ValueError( "OpenAI API key must be provided or set in OPENAI_API_KEY environment variable" ) self._http_session: Optional[aiohttp.ClientSession] = None self._session: Optional[OpenAISession] = None self._closing = False self._instructions: Optional[str] = None self._tools: Optional[List[FunctionTool]] = [] self.loop = None self.audio_track: Optional[CustomAudioStreamTrack] = None self._formatted_tools: Optional[List[Dict[str, Any]]] = None self.config: OpenAIRealtimeConfig = config or OpenAIRealtimeConfig() self.input_sample_rate = 48000 self.target_sample_rate = 16000 self._agent_speaking = False def set_agent(self, agent: Agent) -> None: self._instructions = agent.instructions self._tools = agent.tools self.tools_formatted = self._format_tools_for_session(self._tools) self._formatted_tools = self.tools_formatted async def connect(self) -> None: headers = {"Agent": "VideoSDK Agents"} headers["Authorization"] = f"Bearer {self.api_key}" headers["OpenAI-Beta"] = "realtime=v1" url = self.process_base_url(self.base_url, self.model) self._session = await self._create_session(url, headers) await self._handle_websocket(self._session) await self.send_first_session_update() async def handle_audio_input(self, audio_data: bytes) -> None: """Handle incoming audio data from the user""" if self._session and not self._closing and "audio" in self.config.modalities: audio_data = np.frombuffer(audio_data, dtype=np.int16) audio_data = signal.resample( audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate), ) audio_data = audio_data.astype(np.int16).tobytes() base64_audio_data = base64.b64encode(audio_data).decode("utf-8") audio_event = { "type": "input_audio_buffer.append", "audio": base64_audio_data, } await self.send_event(audio_event) async def _ensure_http_session(self) -> aiohttp.ClientSession: """Ensure we have an HTTP session""" if not self._http_session: self._http_session = aiohttp.ClientSession() return self._http_session async def _create_session(self, url: str, headers: dict) -> OpenAISession: """Create a new WebSocket session""" http_session = await self._ensure_http_session() ws = await http_session.ws_connect( url, headers=headers, autoping=True, heartbeat=10, autoclose=False, timeout=30, ) msg_queue: asyncio.Queue = asyncio.Queue() tasks: list[asyncio.Task] = [] self._closing = False return OpenAISession(ws=ws, msg_queue=msg_queue, tasks=tasks) async def send_message(self, message: str) -> None: """Send a message to the OpenAI realtime API""" await self.send_event( { "type": "conversation.item.create", "item": { "type": "message", "role": "assistant", "content": [ { "type": "text", "text": "Repeat the user's exact message back to them:" + message + "DO NOT ADD ANYTHING ELSE", } ], }, } ) await self.create_response() async def create_response(self) -> None: """Create a response to the OpenAI realtime API""" if not self._session: self.emit("error", "No active WebSocket session") raise RuntimeError("No active WebSocket session") response_event = { "type": "response.create", "event_id": str(uuid.uuid4()), "response": { "instructions": self._instructions, "metadata": {"client_event_id": str(uuid.uuid4())}, }, } await self.send_event(response_event) async def _handle_websocket(self, session: OpenAISession) -> None: """Start WebSocket send/receive tasks""" session.tasks.extend( [ asyncio.create_task(self._send_loop(session), name="send_loop"), asyncio.create_task(self._receive_loop(session), name="receive_loop"), ] ) async def _send_loop(self, session: OpenAISession) -> None: """Send messages from queue to WebSocket""" try: while not self._closing: msg = await session.msg_queue.get() if isinstance(msg, dict): await session.ws.send_json(msg) else: await session.ws.send_str(str(msg)) except asyncio.CancelledError: pass finally: await self._cleanup_session(session) async def _receive_loop(self, session: OpenAISession) -> None: """Receive and process WebSocket messages""" try: while not self._closing: msg = await session.ws.receive() if msg.type == aiohttp.WSMsgType.CLOSED: self.emit("error", f"WebSocket closed with reason: {msg.extra}") break elif msg.type == aiohttp.WSMsgType.ERROR: self.emit("error", f"WebSocket error: {msg.data}") break elif msg.type == aiohttp.WSMsgType.TEXT: await self._handle_message(json.loads(msg.data)) except Exception as e: self.emit("error", f"WebSocket receive error: {str(e)}") finally: await self._cleanup_session(session) async def _handle_message(self, data: dict) -> None: """Handle incoming WebSocket messages""" try: event_type = data.get("type") if event_type == "input_audio_buffer.speech_started": await self._handle_speech_started(data) elif event_type == "input_audio_buffer.speech_stopped": await self._handle_speech_stopped(data) elif event_type == "response.created": await self._handle_response_created(data) elif event_type == "response.output_item.added": await self._handle_output_item_added(data) elif event_type == "response.content_part.added": await self._handle_content_part_added(data) elif event_type == "response.text.delta": await self._handle_text_delta(data) elif event_type == "response.audio.delta": await self._handle_audio_delta(data) elif event_type == "response.audio_transcript.delta": await self._handle_audio_transcript_delta(data) elif event_type == "response.done": await self._handle_response_done(data) elif event_type == "error": await self._handle_error(data) elif event_type == "response.function_call_arguments.delta": await self._handle_function_call_arguments_delta(data) elif event_type == "response.function_call_arguments.done": await self._handle_function_call_arguments_done(data) elif event_type == "response.output_item.done": await self._handle_output_item_done(data) elif event_type == "conversation.item.input_audio_transcription.completed": await self._handle_input_audio_transcription_completed(data) elif event_type == "response.text.done": await self._handle_text_done(data) except Exception as e: self.emit("error", f"Error handling event {event_type}: {str(e)}") async def _handle_speech_started(self, data: dict) -> None: """Handle speech detection start""" if "audio" in self.config.modalities: self.emit("user_speech_started", {"type": "done"}) await self.interrupt() if self.audio_track: self.audio_track.interrupt() await realtime_metrics_collector.set_user_speech_start() async def _handle_speech_stopped(self, data: dict) -> None: """Handle speech detection end""" await realtime_metrics_collector.set_user_speech_end() async def _handle_response_created(self, data: dict) -> None: """Handle initial response creation""" response_id = data.get("response", {}).get("id") async def _handle_output_item_added(self, data: dict) -> None: """Handle new output item addition""" async def _handle_output_item_done(self, data: dict) -> None: """Handle output item done""" try: item = data.get("item", {}) if ( item.get("type") == "function_call" and item.get("status") == "completed" ): name = item.get("name") arguments = json.loads(item.get("arguments", "{}")) if name and self._tools: for tool in self._tools: tool_info = get_tool_info(tool) if tool_info.name == name: try: await realtime_metrics_collector.add_tool_call(name) result = await tool(**arguments) await self.send_event( { "type": "conversation.item.create", "item": { "type": "function_call_output", "call_id": item.get("call_id"), "output": json.dumps(result), }, } ) await self.send_event( { "type": "response.create", "event_id": str(uuid.uuid4()), "response": { "instructions": self._instructions, "metadata": { "client_event_id": str(uuid.uuid4()) }, }, } ) except Exception as e: self.emit( "error", f"Error executing function {name}: {e}" ) break except Exception as e: self.emit("error", f"Error handling output item done: {e}") async def _handle_content_part_added(self, data: dict) -> None: """Handle new content part""" async def _handle_text_delta(self, data: dict) -> None: """Handle text delta chunk""" pass async def _handle_audio_delta(self, data: dict) -> None: """Handle audio chunk""" if "audio" not in self.config.modalities: return try: if not self._agent_speaking: await realtime_metrics_collector.set_agent_speech_start() self._agent_speaking = True base64_audio_data = base64.b64decode(data.get("delta")) if base64_audio_data: if self.audio_track and self.loop: asyncio.create_task( self.audio_track.add_new_bytes(base64_audio_data) ) except Exception as e: self.emit("error", f"Error handling audio delta: {e}") traceback.print_exc() async def interrupt(self) -> None: """Interrupt the current response and flush audio""" if self._session and not self._closing: cancel_event = {"type": "response.cancel", "event_id": str(uuid.uuid4())} await self.send_event(cancel_event) await realtime_metrics_collector.set_interrupted() if self.audio_track: self.audio_track.interrupt() if self._agent_speaking: await realtime_metrics_collector.set_agent_speech_end(timeout=1.0) self._agent_speaking = False async def _handle_audio_transcript_delta(self, data: dict) -> None: """Handle transcript chunk""" delta_content = data.get("delta", "") if not hasattr(self, "_current_audio_transcript"): self._current_audio_transcript = "" self._current_audio_transcript += delta_content async def _handle_input_audio_transcription_completed(self, data: dict) -> None: """Handle input audio transcription completion for user transcript""" transcript = data.get("transcript", "") if transcript: await realtime_metrics_collector.set_user_transcript(transcript) try: self.emit( "realtime_model_transcription", {"role": "user", "text": transcript, "is_final": True}, ) except Exception: pass async def _handle_response_done(self, data: dict) -> None: """Handle response completion for agent transcript""" if ( hasattr(self, "_current_audio_transcript") and self._current_audio_transcript ): await realtime_metrics_collector.set_agent_response( self._current_audio_transcript ) global_event_emitter.emit( "text_response", {"text": self._current_audio_transcript, "type": "done"}, ) try: self.emit( "realtime_model_transcription", { "role": "agent", "text": self._current_audio_transcript, "is_final": True, }, ) except Exception: pass self._current_audio_transcript = "" await realtime_metrics_collector.set_agent_speech_end(timeout=1.0) self._agent_speaking = False pass async def _handle_function_call_arguments_delta(self, data: dict) -> None: """Handle function call arguments delta""" async def _handle_function_call_arguments_done(self, data: dict) -> None: """Handle function call arguments done""" async def _handle_error(self, data: dict) -> None: """Handle error events""" async def _cleanup_session(self, session: OpenAISession) -> None: """Clean up session resources""" if self._closing: return self._closing = True for task in session.tasks: if not task.done(): task.cancel() try: await asyncio.wait_for(task, timeout=1.0) # Add timeout except (asyncio.CancelledError, asyncio.TimeoutError): pass if not session.ws.closed: try: await session.ws.close() except Exception: pass async def send_event(self, event: Dict[str, Any]) -> None: """Send an event to the WebSocket""" if self._session and not self._closing: await self._session.msg_queue.put(event) async def aclose(self) -> None: """Cleanup all resources""" if self._closing: return self._closing = True if self._session: await self._cleanup_session(self._session) if self._http_session and not self._http_session.closed: await self._http_session.close() async def send_first_session_update(self) -> None: """Send initial session update with default values after connection""" if not self._session: return turn_detection = None input_audio_transcription = None if "audio" in self.config.modalities: turn_detection = ( self.config.turn_detection.model_dump( by_alias=True, exclude_unset=True, exclude_defaults=True, ) if self.config.turn_detection else None ) input_audio_transcription = ( self.config.input_audio_transcription.model_dump( by_alias=True, exclude_unset=True, exclude_defaults=True, ) if self.config.input_audio_transcription else None ) session_update = { "type": "session.update", "session": { "model": self.model, "instructions": self._instructions or "You are a helpful assistant that can answer questions and help with tasks.", "temperature": self.config.temperature, "tool_choice": self.config.tool_choice, "tools": self._formatted_tools or [], "modalities": self.config.modalities, "max_response_output_tokens": "inf", }, } if "audio" in self.config.modalities: session_update["session"]["voice"] = self.config.voice session_update["session"]["input_audio_format"] = DEFAULT_INPUT_AUDIO_FORMAT session_update["session"][ "output_audio_format" ] = DEFAULT_OUTPUT_AUDIO_FORMAT if turn_detection: session_update["session"]["turn_detection"] = turn_detection if input_audio_transcription: session_update["session"][ "input_audio_transcription" ] = input_audio_transcription # Send the event await self.send_event(session_update) def process_base_url(self, url: str, model: str) -> str: if url.startswith("http"): url = url.replace("http", "ws", 1) parsed_url = urlparse(url) query_params = parse_qs(parsed_url.query) if not parsed_url.path or parsed_url.path.rstrip("/") in ["", "/v1", "/openai"]: path = parsed_url.path.rstrip("/") + "/realtime" else: path = parsed_url.path if "model" not in query_params: query_params["model"] = [model] new_query = urlencode(query_params, doseq=True) new_url = urlunparse( (parsed_url.scheme, parsed_url.netloc, path, "", new_query, "") ) return new_url def _format_tools_for_session( self, tools: List[FunctionTool] ) -> List[Dict[str, Any]]: """Format tools for OpenAI session update""" oai_tools = [] for tool in tools: if not is_function_tool(tool): continue try: tool_schema = build_openai_schema(tool) oai_tools.append(tool_schema) except Exception as e: self.emit("error", f"Failed to format tool {tool}: {e}") continue return oai_tools async def send_text_message(self, message: str) -> None: """Send a text message to the OpenAI realtime API""" if not self._session: self.emit("error", "No active WebSocket session") raise RuntimeError("No active WebSocket session") await self.send_event( { "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": [{"type": "input_text", "text": message}], }, } ) await self.create_response()
OpenAI's realtime model implementation.
Initialize OpenAI realtime model.
Args
api_key
- OpenAI API key. If not provided, will attempt to read from OPENAI_API_KEY env var
model
- The OpenAI model identifier to use (e.g. 'gpt-4', 'gpt-3.5-turbo')
config
- Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID to use for audio output - temperature: Sampling temperature for responses - turn_detection: Settings for detecting user speech turns - input_audio_transcription: Settings for audio transcription - tool_choice: How tools should be selected ('auto' or 'none') - modalities: List of enabled modalities ('text', 'audio')
base_url
- Base URL for OpenAI API. Defaults to 'https://api.openai.com/v1'
Raises
ValueError
- If no API key is provided and none found in environment variables
Ancestors
- videosdk.agents.realtime_base_model.RealtimeBaseModel
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
- abc.ABC
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Cleanup all resources""" if self._closing: return self._closing = True if self._session: await self._cleanup_session(self._session) if self._http_session and not self._http_session.closed: await self._http_session.close()
Cleanup all resources
async def connect(self) ‑> None
-
Expand source code
async def connect(self) -> None: headers = {"Agent": "VideoSDK Agents"} headers["Authorization"] = f"Bearer {self.api_key}" headers["OpenAI-Beta"] = "realtime=v1" url = self.process_base_url(self.base_url, self.model) self._session = await self._create_session(url, headers) await self._handle_websocket(self._session) await self.send_first_session_update()
async def create_response(self) ‑> None
-
Expand source code
async def create_response(self) -> None: """Create a response to the OpenAI realtime API""" if not self._session: self.emit("error", "No active WebSocket session") raise RuntimeError("No active WebSocket session") response_event = { "type": "response.create", "event_id": str(uuid.uuid4()), "response": { "instructions": self._instructions, "metadata": {"client_event_id": str(uuid.uuid4())}, }, } await self.send_event(response_event)
Create a response to the OpenAI realtime API
async def handle_audio_input(self, audio_data: bytes) ‑> None
-
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None: """Handle incoming audio data from the user""" if self._session and not self._closing and "audio" in self.config.modalities: audio_data = np.frombuffer(audio_data, dtype=np.int16) audio_data = signal.resample( audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate), ) audio_data = audio_data.astype(np.int16).tobytes() base64_audio_data = base64.b64encode(audio_data).decode("utf-8") audio_event = { "type": "input_audio_buffer.append", "audio": base64_audio_data, } await self.send_event(audio_event)
Handle incoming audio data from the user
async def interrupt(self) ‑> None
-
Expand source code
async def interrupt(self) -> None: """Interrupt the current response and flush audio""" if self._session and not self._closing: cancel_event = {"type": "response.cancel", "event_id": str(uuid.uuid4())} await self.send_event(cancel_event) await realtime_metrics_collector.set_interrupted() if self.audio_track: self.audio_track.interrupt() if self._agent_speaking: await realtime_metrics_collector.set_agent_speech_end(timeout=1.0) self._agent_speaking = False
Interrupt the current response and flush audio
def process_base_url(self, url: str, model: str) ‑> str
-
Expand source code
def process_base_url(self, url: str, model: str) -> str: if url.startswith("http"): url = url.replace("http", "ws", 1) parsed_url = urlparse(url) query_params = parse_qs(parsed_url.query) if not parsed_url.path or parsed_url.path.rstrip("/") in ["", "/v1", "/openai"]: path = parsed_url.path.rstrip("/") + "/realtime" else: path = parsed_url.path if "model" not in query_params: query_params["model"] = [model] new_query = urlencode(query_params, doseq=True) new_url = urlunparse( (parsed_url.scheme, parsed_url.netloc, path, "", new_query, "") ) return new_url
async def send_event(self, event: Dict[str, Any]) ‑> None
-
Expand source code
async def send_event(self, event: Dict[str, Any]) -> None: """Send an event to the WebSocket""" if self._session and not self._closing: await self._session.msg_queue.put(event)
Send an event to the WebSocket
async def send_first_session_update(self) ‑> None
-
Expand source code
async def send_first_session_update(self) -> None: """Send initial session update with default values after connection""" if not self._session: return turn_detection = None input_audio_transcription = None if "audio" in self.config.modalities: turn_detection = ( self.config.turn_detection.model_dump( by_alias=True, exclude_unset=True, exclude_defaults=True, ) if self.config.turn_detection else None ) input_audio_transcription = ( self.config.input_audio_transcription.model_dump( by_alias=True, exclude_unset=True, exclude_defaults=True, ) if self.config.input_audio_transcription else None ) session_update = { "type": "session.update", "session": { "model": self.model, "instructions": self._instructions or "You are a helpful assistant that can answer questions and help with tasks.", "temperature": self.config.temperature, "tool_choice": self.config.tool_choice, "tools": self._formatted_tools or [], "modalities": self.config.modalities, "max_response_output_tokens": "inf", }, } if "audio" in self.config.modalities: session_update["session"]["voice"] = self.config.voice session_update["session"]["input_audio_format"] = DEFAULT_INPUT_AUDIO_FORMAT session_update["session"][ "output_audio_format" ] = DEFAULT_OUTPUT_AUDIO_FORMAT if turn_detection: session_update["session"]["turn_detection"] = turn_detection if input_audio_transcription: session_update["session"][ "input_audio_transcription" ] = input_audio_transcription # Send the event await self.send_event(session_update)
Send initial session update with default values after connection
async def send_message(self, message: str) ‑> None
-
Expand source code
async def send_message(self, message: str) -> None: """Send a message to the OpenAI realtime API""" await self.send_event( { "type": "conversation.item.create", "item": { "type": "message", "role": "assistant", "content": [ { "type": "text", "text": "Repeat the user's exact message back to them:" + message + "DO NOT ADD ANYTHING ELSE", } ], }, } ) await self.create_response()
Send a message to the OpenAI realtime API
async def send_text_message(self, message: str) ‑> None
-
Expand source code
async def send_text_message(self, message: str) -> None: """Send a text message to the OpenAI realtime API""" if not self._session: self.emit("error", "No active WebSocket session") raise RuntimeError("No active WebSocket session") await self.send_event( { "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": [{"type": "input_text", "text": message}], }, } ) await self.create_response()
Send a text message to the OpenAI realtime API
def set_agent(self, agent: Agent) ‑> None
-
Expand source code
def set_agent(self, agent: Agent) -> None: self._instructions = agent.instructions self._tools = agent.tools self.tools_formatted = self._format_tools_for_session(self._tools) self._formatted_tools = self.tools_formatted
class OpenAIRealtimeConfig (voice: str = 'alloy',
temperature: float = 0.8,
turn_detection: TurnDetection | None = <factory>,
input_audio_transcription: InputAudioTranscription | None = <factory>,
tool_choice: ToolChoice | None = 'auto',
modalities: list[str] = <factory>)-
Expand source code
@dataclass class OpenAIRealtimeConfig: """Configuration for the OpenAI realtime API Args: voice: Voice ID for audio output. Default is 'alloy' temperature: Controls randomness in response generation. Higher values (e.g. 0.8) make output more random, lower values make it more deterministic. Default is 0.8 turn_detection: Configuration for detecting user speech turns. Contains settings for: - type: Detection type ('server_vad') - threshold: Voice activity detection threshold (0.0-1.0) - prefix_padding_ms: Padding before speech start (ms) - silence_duration_ms: Silence duration to mark end (ms) - create_response: Whether to generate response on turn - interrupt_response: Whether to allow interruption input_audio_transcription: Configuration for audio transcription. Contains: - model: Model to use for transcription tool_choice: How tools should be selected ('auto' or 'none'). Default is 'auto' modalities: List of enabled response types ["text", "audio"]. Default includes both """ voice: str = DEFAULT_VOICE temperature: float = DEFAULT_TEMPERATURE turn_detection: TurnDetection | None = field( default_factory=lambda: DEFAULT_TURN_DETECTION ) input_audio_transcription: InputAudioTranscription | None = field( default_factory=lambda: DEFAULT_INPUT_AUDIO_TRANSCRIPTION ) tool_choice: ToolChoice | None = DEFAULT_TOOL_CHOICE modalities: list[str] = field(default_factory=lambda: ["text", "audio"])
Configuration for the OpenAI realtime API
Args
voice
- Voice ID for audio output. Default is 'alloy'
temperature
- Controls randomness in response generation. Higher values (e.g. 0.8) make output more random, lower values make it more deterministic. Default is 0.8
turn_detection
- Configuration for detecting user speech turns. Contains settings for: - type: Detection type ('server_vad') - threshold: Voice activity detection threshold (0.0-1.0) - prefix_padding_ms: Padding before speech start (ms) - silence_duration_ms: Silence duration to mark end (ms) - create_response: Whether to generate response on turn - interrupt_response: Whether to allow interruption
input_audio_transcription
- Configuration for audio transcription. Contains: - model: Model to use for transcription
tool_choice
- How tools should be selected ('auto' or 'none'). Default is 'auto'
modalities
- List of enabled response types ["text", "audio"]. Default includes both
Instance variables
var input_audio_transcription : openai.types.beta.realtime.session.InputAudioTranscription | None
var modalities : list[str]
var temperature : float
var tool_choice : Literal['auto', 'required', 'none'] | None
var turn_detection : openai.types.beta.realtime.session.TurnDetection | None
var voice : str
class OpenAISTT (*,
api_key: str | None = None,
model: str = 'gpt-4o-mini-transcribe',
base_url: str | None = None,
prompt: str | None = None,
language: str = 'en',
turn_detection: dict | None = None,
enable_streaming: bool = True,
silence_threshold: float = 0.01,
silence_duration: float = 0.8)-
Expand source code
class OpenAISTT(BaseSTT): def __init__( self, *, api_key: str | None = None, model: str = "gpt-4o-mini-transcribe", base_url: str | None = None, prompt: str | None = None, language: str = "en", turn_detection: dict | None = None, enable_streaming: bool = True, silence_threshold: float = 0.01, silence_duration: float = 0.8, ) -> None: """Initialize the OpenAI STT plugin. Args: api_key (Optional[str], optional): OpenAI API key. Defaults to None. model (str): The model to use for the STT plugin. Defaults to "whisper-1". base_url (Optional[str], optional): The base URL for the OpenAI API. Defaults to None. prompt (Optional[str], optional): The prompt for the STT plugin. Defaults to None. language (str): The language to use for the STT plugin. Defaults to "en". turn_detection (dict | None): The turn detection for the STT plugin. Defaults to None. """ super().__init__() self.api_key = api_key or os.getenv("OPENAI_API_KEY") if not self.api_key: raise ValueError("OpenAI API key must be provided either through api_key parameter or OPENAI_API_KEY environment variable") self.model = model self.language = language self.prompt = prompt self.turn_detection = turn_detection or { "type": "server_vad", "threshold": 0.5, "prefix_padding_ms": 300, "silence_duration_ms": 500, } self.enable_streaming = enable_streaming # Custom VAD parameters for non-streaming mode self.silence_threshold_bytes = int(silence_threshold * 32767) self.silence_duration_frames = int(silence_duration * 48000) # input_sample_rate self.client = openai.AsyncClient( max_retries=0, api_key=self.api_key, base_url=base_url or None, http_client=httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ), ) self._session: Optional[aiohttp.ClientSession] = None self._ws: Optional[aiohttp.ClientWebSocketResponse] = None self._ws_task: Optional[asyncio.Task] = None self._current_text = "" self._last_interim_at = 0 self.input_sample_rate = 48000 self.target_sample_rate = 16000 self._audio_buffer = bytearray() # Custom VAD state for non-streaming mode self._is_speaking = False self._silence_frames = 0 @staticmethod def azure( *, model: str = "gpt-4o-mini-transcribe", language: str = "en", prompt: str | None = None, turn_detection: dict | None = None, azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, enable_streaming: bool = False, timeout: httpx.Timeout | None = None, ) -> "OpenAISTT": """ Create a new instance of Azure OpenAI STT. This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name) """ # Get values from environment variables if not provided azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT") api_version = api_version or os.getenv("OPENAI_API_VERSION") api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN") organization = organization or os.getenv("OPENAI_ORG_ID") project = project or os.getenv("OPENAI_PROJECT_ID") # If azure_deployment is not provided, use model as the deployment name if not azure_deployment: azure_deployment = model if not azure_endpoint: raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable") if not api_key and not azure_ad_token: raise ValueError("Either API key or Azure AD token must be provided") azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, organization=organization, project=project, base_url=base_url, timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), ) instance = OpenAISTT( model=model, language=language, prompt=prompt, turn_detection=turn_detection, enable_streaming=enable_streaming, ) instance.client = azure_client return instance async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """Process audio frames and send to OpenAI based on enabled mode""" if not self.enable_streaming: await self._transcribe_non_streaming(audio_frames) return if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: audio_data = np.frombuffer(audio_frames, dtype=np.int16) audio_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate)) audio_data = audio_data.astype(np.int16).tobytes() audio_data = base64.b64encode(audio_data).decode("utf-8") message = { "type": "input_audio_buffer.append", "audio": audio_data, } await self._ws.send_json(message) except Exception as e: print(f"Error in process_audio: {str(e)}") self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = None async def _transcribe_non_streaming(self, audio_frames: bytes) -> None: """HTTP-based transcription using OpenAI audio/transcriptions API with custom VAD""" if not audio_frames: return self._audio_buffer.extend(audio_frames) # Custom VAD logic similar to other STT implementations is_silent_chunk = self._is_silent(audio_frames) if not is_silent_chunk: if not self._is_speaking: self._is_speaking = True global_event_emitter.emit("speech_started") self._silence_frames = 0 else: if self._is_speaking: self._silence_frames += len(audio_frames) // 4 # Approximate frame count if self._silence_frames > self.silence_duration_frames: global_event_emitter.emit("speech_stopped") await self._process_audio_buffer() self._is_speaking = False self._silence_frames = 0 def _is_silent(self, audio_chunk: bytes) -> bool: """Simple VAD: check if the max amplitude is below a threshold.""" audio_data = np.frombuffer(audio_chunk, dtype=np.int16) return np.max(np.abs(audio_data)) < self.silence_threshold_bytes async def _process_audio_buffer(self) -> None: """Process the accumulated audio buffer with OpenAI transcription""" if not self._audio_buffer: return audio_data = bytes(self._audio_buffer) self._audio_buffer.clear() wav_bytes = self._audio_frames_to_wav_bytes(audio_data) try: resp = await self.client.audio.transcriptions.create( file=("audio.wav", wav_bytes, "audio/wav"), model=self.model, language=self.language, prompt=self.prompt or openai.NOT_GIVEN, ) text = getattr(resp, "text", "") if text and self._transcript_callback: await self._transcript_callback(STTResponse( event_type=SpeechEventType.FINAL, data=SpeechData(text=text, language=self.language), metadata={"model": self.model} )) except Exception as e: print(f"OpenAI transcription error: {str(e)}") self.emit("error", str(e)) def _audio_frames_to_wav_bytes(self, audio_frames: bytes) -> bytes: """Convert audio frames to WAV bytes""" pcm = np.frombuffer(audio_frames, dtype=np.int16) resampled = signal.resample(pcm, int(len(pcm) * self.target_sample_rate / self.input_sample_rate)) resampled = resampled.astype(np.int16) buf = io.BytesIO() with wave.open(buf, "wb") as wf: wf.setnchannels(1) # Mono wf.setsampwidth(2) # 16-bit PCM wf.setframerate(self.target_sample_rate) wf.writeframes(resampled.tobytes()) return buf.getvalue() async def _listen_for_responses(self) -> None: """Background task to listen for WebSocket responses""" if not self._ws: return try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: data = msg.json() responses = self._handle_ws_message(data) for response in responses: if self._transcript_callback: await self._transcript_callback(response) elif msg.type == aiohttp.WSMsgType.ERROR: error = f"WebSocket error: {self._ws.exception()}" print(error) self.emit("error", error) break elif msg.type == aiohttp.WSMsgType.CLOSED: print("WebSocket connection closed") break except Exception as e: error = f"Error in WebSocket listener: {str(e)}" print(error) self.emit("error", error) finally: if self._ws: await self._ws.close() self._ws = None async def _connect_ws(self) -> None: """Establish WebSocket connection with OpenAI's Realtime API""" if not self._session: self._session = aiohttp.ClientSession() config = { "type": "transcription_session.update", "session": { "input_audio_format": "pcm16", "input_audio_transcription": { "model": self.model, "prompt": self.prompt or "", "language": self.language if self.language else None, }, "turn_detection": self.turn_detection, "input_audio_noise_reduction": { "type": "near_field" }, "include": ["item.input_audio_transcription.logprobs"] } } query_params = { "intent": "transcription", } headers = { "User-Agent": "VideoSDK", "Authorization": f"Bearer {self.api_key}", "OpenAI-Beta": "realtime=v1", } base_url = str(self.client.base_url).rstrip('/') ws_url = f"{base_url}/realtime?{urlencode(query_params)}" if ws_url.startswith("http"): ws_url = ws_url.replace("http", "ws", 1) try: self._ws = await self._session.ws_connect(ws_url, headers=headers) initial_response = await self._ws.receive_json() if initial_response.get("type") != "transcription_session.created": raise Exception(f"Expected session creation, got: {initial_response}") await self._ws.send_json(config) update_response = await self._ws.receive_json() if update_response.get("type") != "transcription_session.updated": raise Exception(f"Configuration update failed: {update_response}") except Exception as e: print(f"Error connecting to WebSocket: {str(e)}") if self._ws: await self._ws.close() self._ws = None raise def _handle_ws_message(self, msg: dict) -> list[STTResponse]: """Handle incoming WebSocket messages and generate STT responses""" responses = [] try: msg_type = msg.get("type") if msg_type == "conversation.item.input_audio_transcription.delta": delta = msg.get("delta", "") if delta: self._current_text += delta current_time = asyncio.get_event_loop().time() if current_time - self._last_interim_at > 0.5: responses.append(STTResponse( event_type=SpeechEventType.INTERIM, data=SpeechData( text=self._current_text, language=self.language, ), metadata={"model": self.model} )) self._last_interim_at = current_time elif msg_type == "conversation.item.input_audio_transcription.completed": transcript = msg.get("transcript", "") if transcript: responses.append(STTResponse( event_type=SpeechEventType.FINAL, data=SpeechData( text=transcript, language=self.language, ), metadata={"model": self.model} )) self._current_text = "" elif msg_type == "input_audio_buffer.speech_started": global_event_emitter.emit("speech_started") elif msg_type == "input_audio_buffer.speech_stopped": global_event_emitter.emit("speech_stopped") except Exception as e: print(f"Error handling WebSocket message: {str(e)}") return responses async def aclose(self) -> None: """Cleanup resources""" self._audio_buffer.clear() if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: await self._ws.close() self._ws = None if self._session: await self._session.close() self._session = None await self.client.close() async def _ensure_ws_connection(self): """Ensure WebSocket is connected, reconnect if necessary""" if not self._ws or self._ws.closed: await self._connect_ws()
Base class for Speech-to-Text implementations
Initialize the OpenAI STT plugin.
Args
api_key
:Optional[str]
, optional- OpenAI API key. Defaults to None.
model
:str
- The model to use for the STT plugin. Defaults to "whisper-1".
base_url
:Optional[str]
, optional- The base URL for the OpenAI API. Defaults to None.
prompt
:Optional[str]
, optional- The prompt for the STT plugin. Defaults to None.
language
:str
- The language to use for the STT plugin. Defaults to "en".
turn_detection
:dict | None
- The turn detection for the STT plugin. Defaults to None.
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Static methods
def azure(*,
model: str = 'gpt-4o-mini-transcribe',
language: str = 'en',
prompt: str | None = None,
turn_detection: dict | None = None,
azure_endpoint: str | None = None,
azure_deployment: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
azure_ad_token: str | None = None,
organization: str | None = None,
project: str | None = None,
base_url: str | None = None,
enable_streaming: bool = False,
timeout: httpx.Timeout | None = None) ‑> OpenAISTT-
Expand source code
@staticmethod def azure( *, model: str = "gpt-4o-mini-transcribe", language: str = "en", prompt: str | None = None, turn_detection: dict | None = None, azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, enable_streaming: bool = False, timeout: httpx.Timeout | None = None, ) -> "OpenAISTT": """ Create a new instance of Azure OpenAI STT. This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name) """ # Get values from environment variables if not provided azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT") api_version = api_version or os.getenv("OPENAI_API_VERSION") api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN") organization = organization or os.getenv("OPENAI_ORG_ID") project = project or os.getenv("OPENAI_PROJECT_ID") # If azure_deployment is not provided, use model as the deployment name if not azure_deployment: azure_deployment = model if not azure_endpoint: raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable") if not api_key and not azure_ad_token: raise ValueError("Either API key or Azure AD token must be provided") azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, organization=organization, project=project, base_url=base_url, timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), ) instance = OpenAISTT( model=model, language=language, prompt=prompt, turn_detection=turn_detection, enable_streaming=enable_streaming, ) instance.client = azure_client return instance
Create a new instance of Azure OpenAI STT.
This automatically infers the following arguments from their corresponding environment variables if they are not provided: -
api_key
fromAZURE_OPENAI_API_KEY
-organization
fromOPENAI_ORG_ID
-project
fromOPENAI_PROJECT_ID
-azure_ad_token
fromAZURE_OPENAI_AD_TOKEN
-api_version
fromOPENAI_API_VERSION
-azure_endpoint
fromAZURE_OPENAI_ENDPOINT
-azure_deployment
fromAZURE_OPENAI_DEPLOYMENT
(if not provided, usesmodel
as deployment name)
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" self._audio_buffer.clear() if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: await self._ws.close() self._ws = None if self._session: await self._session.close() self._session = None await self.client.close()
Cleanup resources
async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None
-
Expand source code
async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """Process audio frames and send to OpenAI based on enabled mode""" if not self.enable_streaming: await self._transcribe_non_streaming(audio_frames) return if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: audio_data = np.frombuffer(audio_frames, dtype=np.int16) audio_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate)) audio_data = audio_data.astype(np.int16).tobytes() audio_data = base64.b64encode(audio_data).decode("utf-8") message = { "type": "input_audio_buffer.append", "audio": audio_data, } await self._ws.send_json(message) except Exception as e: print(f"Error in process_audio: {str(e)}") self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = None
Process audio frames and send to OpenAI based on enabled mode
class OpenAITTS (*,
api_key: str | None = None,
model: str = 'gpt-4o-mini-tts',
voice: str = 'ash',
speed: float = 1.0,
instructions: str | None = None,
base_url: str | None = None,
response_format: str = 'pcm')-
Expand source code
class OpenAITTS(TTS): def __init__( self, *, api_key: str | None = None, model: str = DEFAULT_MODEL, voice: str = DEFAULT_VOICE, speed: float = 1.0, instructions: str | None = None, base_url: str | None = None, response_format: str = "pcm", ) -> None: """Initialize the OpenAI TTS plugin. Args: api_key (Optional[str], optional): OpenAI API key. Defaults to None. model (str): The model to use for the TTS plugin. Defaults to "gpt-4o-mini-tts". voice (str): The voice to use for the TTS plugin. Defaults to "ash". speed (float): The speed to use for the TTS plugin. Defaults to 1.0. instructions (Optional[str], optional): Additional instructions for the TTS plugin. Defaults to None. base_url (Optional[str], optional): Custom base URL for the OpenAI API. Defaults to None. response_format (str): The response format to use for the TTS plugin. Defaults to "pcm". """ super().__init__(sample_rate=OPENAI_TTS_SAMPLE_RATE, num_channels=OPENAI_TTS_CHANNELS) self.model = model self.voice = voice self.speed = speed self.instructions = instructions self.audio_track = None self.loop = None self.response_format = response_format self._first_chunk_sent = False self._current_synthesis_task: asyncio.Task | None = None self._interrupted = False self.api_key = api_key or os.getenv("OPENAI_API_KEY") if not self.api_key: raise ValueError( "OpenAI API key must be provided either through api_key parameter or OPENAI_API_KEY environment variable") self._client = openai.AsyncClient( max_retries=0, api_key=self.api_key, base_url=base_url or None, http_client=httpx.AsyncClient( timeout=httpx.Timeout( connect=15.0, read=5.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ), ) @staticmethod def azure( *, model: str = DEFAULT_MODEL, voice: str = DEFAULT_VOICE, speed: float = 1.0, instructions: str | None = None, azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, response_format: str = "pcm", timeout: httpx.Timeout | None = None, ) -> "OpenAITTS": """ Create a new instance of Azure OpenAI TTS. This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name) """ azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT") api_version = api_version or os.getenv("OPENAI_API_VERSION") api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN") organization = organization or os.getenv("OPENAI_ORG_ID") project = project or os.getenv("OPENAI_PROJECT_ID") if not azure_deployment: azure_deployment = model if not azure_endpoint: raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable") if not api_key and not azure_ad_token: raise ValueError("Either API key or Azure AD token must be provided") azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, organization=organization, project=project, base_url=base_url, timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), ) instance = OpenAITTS( model=model, voice=voice, speed=speed, instructions=instructions, response_format=response_format, ) instance._client = azure_client return instance def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """ Convert text to speech using OpenAI's TTS API and stream to audio track Args: text: Text to convert to speech voice_id: Optional voice override **kwargs: Additional provider-specific arguments """ try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._interrupted = False if isinstance(text, AsyncIterator): async for segment in segment_text(text): if self._interrupted: break await self._synthesize_segment(segment, voice_id, **kwargs) else: if not self._interrupted: await self._synthesize_segment(text, voice_id, **kwargs) except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") async def _synthesize_segment(self, text: str, voice_id: Optional[str] = None, **kwargs: Any) -> None: """Synthesize a single text segment""" if not text.strip() or self._interrupted: return try: audio_data = b"" async with self._client.audio.speech.with_streaming_response.create( model=self.model, voice=voice_id or self.voice, input=text, speed=self.speed, response_format=self.response_format, **({"instructions": self.instructions} if self.instructions else {}), ) as response: async for chunk in response.iter_bytes(): if self._interrupted: break if chunk: audio_data += chunk if audio_data and not self._interrupted: await self._stream_audio_chunks(audio_data) except Exception as e: if not self._interrupted: self.emit("error", f"Segment synthesis failed: {str(e)}") async def _stream_audio_chunks(self, audio_bytes: bytes) -> None: """Stream audio data in chunks for smooth playback""" chunk_size = int(OPENAI_TTS_SAMPLE_RATE * OPENAI_TTS_CHANNELS * 2 * 20 / 1000) for i in range(0, len(audio_bytes), chunk_size): chunk = audio_bytes[i:i + chunk_size] if len(chunk) < chunk_size and len(chunk) > 0: padding_needed = chunk_size - len(chunk) chunk += b'\x00' * padding_needed if len(chunk) == chunk_size: if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() asyncio.create_task(self.audio_track.add_new_bytes(chunk)) await asyncio.sleep(0.001) async def aclose(self) -> None: """Cleanup resources""" await self._client.close() await super().aclose() async def interrupt(self) -> None: """Interrupt TTS synthesis""" self._interrupted = True if self._current_synthesis_task: self._current_synthesis_task.cancel() if self.audio_track: self.audio_track.interrupt()
Base class for Text-to-Speech implementations
Initialize the OpenAI TTS plugin.
Args
api_key
:Optional[str]
, optional- OpenAI API key. Defaults to None.
model
:str
- The model to use for the TTS plugin. Defaults to "gpt-4o-mini-tts".
voice
:str
- The voice to use for the TTS plugin. Defaults to "ash".
speed
:float
- The speed to use for the TTS plugin. Defaults to 1.0.
instructions
:Optional[str]
, optional- Additional instructions for the TTS plugin. Defaults to None.
base_url
:Optional[str]
, optional- Custom base URL for the OpenAI API. Defaults to None.
response_format
:str
- The response format to use for the TTS plugin. Defaults to "pcm".
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Static methods
def azure(*,
model: str = 'gpt-4o-mini-tts',
voice: str = 'ash',
speed: float = 1.0,
instructions: str | None = None,
azure_endpoint: str | None = None,
azure_deployment: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
azure_ad_token: str | None = None,
organization: str | None = None,
project: str | None = None,
base_url: str | None = None,
response_format: str = 'pcm',
timeout: httpx.Timeout | None = None) ‑> OpenAITTS-
Expand source code
@staticmethod def azure( *, model: str = DEFAULT_MODEL, voice: str = DEFAULT_VOICE, speed: float = 1.0, instructions: str | None = None, azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, response_format: str = "pcm", timeout: httpx.Timeout | None = None, ) -> "OpenAITTS": """ Create a new instance of Azure OpenAI TTS. This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name) """ azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT") api_version = api_version or os.getenv("OPENAI_API_VERSION") api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN") organization = organization or os.getenv("OPENAI_ORG_ID") project = project or os.getenv("OPENAI_PROJECT_ID") if not azure_deployment: azure_deployment = model if not azure_endpoint: raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable") if not api_key and not azure_ad_token: raise ValueError("Either API key or Azure AD token must be provided") azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, organization=organization, project=project, base_url=base_url, timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), ) instance = OpenAITTS( model=model, voice=voice, speed=speed, instructions=instructions, response_format=response_format, ) instance._client = azure_client return instance
Create a new instance of Azure OpenAI TTS.
This automatically infers the following arguments from their corresponding environment variables if they are not provided: -
api_key
fromAZURE_OPENAI_API_KEY
-organization
fromOPENAI_ORG_ID
-project
fromOPENAI_PROJECT_ID
-azure_ad_token
fromAZURE_OPENAI_AD_TOKEN
-api_version
fromOPENAI_API_VERSION
-azure_endpoint
fromAZURE_OPENAI_ENDPOINT
-azure_deployment
fromAZURE_OPENAI_DEPLOYMENT
(if not provided, usesmodel
as deployment name)
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" await self._client.close() await super().aclose()
Cleanup resources
async def interrupt(self) ‑> None
-
Expand source code
async def interrupt(self) -> None: """Interrupt TTS synthesis""" self._interrupted = True if self._current_synthesis_task: self._current_synthesis_task.cancel() if self.audio_track: self.audio_track.interrupt()
Interrupt TTS synthesis
def reset_first_audio_tracking(self) ‑> None
-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False
Reset the first audio tracking state for next TTS task
async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """ Convert text to speech using OpenAI's TTS API and stream to audio track Args: text: Text to convert to speech voice_id: Optional voice override **kwargs: Additional provider-specific arguments """ try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._interrupted = False if isinstance(text, AsyncIterator): async for segment in segment_text(text): if self._interrupted: break await self._synthesize_segment(segment, voice_id, **kwargs) else: if not self._interrupted: await self._synthesize_segment(text, voice_id, **kwargs) except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}")
Convert text to speech using OpenAI's TTS API and stream to audio track
Args
text
- Text to convert to speech
voice_id
- Optional voice override
**kwargs
- Additional provider-specific arguments