Package videosdk.plugins.openai

Sub-modules

videosdk.plugins.openai.llm
videosdk.plugins.openai.realtime_api
videosdk.plugins.openai.stt
videosdk.plugins.openai.tts

Classes

class OpenAILLM (*,
api_key: str | None = None,
model: str = 'gpt-4o-mini',
base_url: str | None = None,
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_completion_tokens: int | None = None)
Expand source code
class OpenAILLM(LLM):
    
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "gpt-4o-mini",
        base_url: str | None = None,
        temperature: float = 0.7,
        tool_choice: ToolChoice = "auto",
        max_completion_tokens: int | None = None,
    ) -> None:
        """Initialize the OpenAI LLM plugin.

        Args:
            api_key (Optional[str], optional): OpenAI API key. Defaults to None.
            model (str): The model to use for the LLM plugin. Defaults to "gpt-4o".
            base_url (Optional[str], optional): The base URL for the OpenAI API. Defaults to None.
            temperature (float): The temperature to use for the LLM plugin. Defaults to 0.7.
            tool_choice (ToolChoice): The tool choice to use for the LLM plugin. Defaults to "auto".
            max_completion_tokens (Optional[int], optional): The maximum completion tokens to use for the LLM plugin. Defaults to None.
        """
        super().__init__()
        self.api_key = api_key or os.getenv("OPENAI_API_KEY")
        if not self.api_key:
            raise ValueError("OpenAI API key must be provided either through api_key parameter or OPENAI_API_KEY environment variable")
        
        self.model = model
        self.temperature = temperature
        self.tool_choice = tool_choice
        self.max_completion_tokens = max_completion_tokens
        self._cancelled = False
        
        self._client = openai.AsyncOpenAI(
            api_key=self.api_key,
            base_url=base_url or None,
            max_retries=0,
            http_client=httpx.AsyncClient(
                timeout=httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0),
                follow_redirects=True,
                limits=httpx.Limits(
                    max_connections=50,
                    max_keepalive_connections=50,
                    keepalive_expiry=120,
                ),
            ),
        )

    @staticmethod
    def azure(
        *,
        model: str = "gpt-4o-mini",
        azure_endpoint: str | None = None,
        azure_deployment: str | None = None,
        api_version: str | None = None,
        api_key: str | None = None,
        azure_ad_token: str | None = None,
        organization: str | None = None,
        project: str | None = None,
        base_url: str | None = None,
        temperature: float = 0.7,
        tool_choice: ToolChoice = "auto",
        max_completion_tokens: int | None = None,
        timeout: httpx.Timeout | None = None,
    ) -> "OpenAILLM":
        """
        Create a new instance of Azure OpenAI LLM.

        This automatically infers the following arguments from their corresponding environment variables if they are not provided:
        - `api_key` from `AZURE_OPENAI_API_KEY`
        - `organization` from `OPENAI_ORG_ID`
        - `project` from `OPENAI_PROJECT_ID`
        - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN`
        - `api_version` from `OPENAI_API_VERSION`
        - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT`
        - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name)
        """
        
        azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT")
        azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT")
        api_version = api_version or os.getenv("OPENAI_API_VERSION")
        api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY")
        azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN")
        organization = organization or os.getenv("OPENAI_ORG_ID")
        project = project or os.getenv("OPENAI_PROJECT_ID")
        
        if not azure_deployment:
            azure_deployment = model
        
        if not azure_endpoint:
            raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable")
        
        if not api_key and not azure_ad_token:
            raise ValueError("Either API key or Azure AD token must be provided")
        
        azure_client = openai.AsyncAzureOpenAI(
            max_retries=0,
            azure_endpoint=azure_endpoint,
            azure_deployment=azure_deployment,
            api_version=api_version,
            api_key=api_key,
            azure_ad_token=azure_ad_token,
            organization=organization,
            project=project,
            base_url=base_url,
            timeout=timeout
            if timeout
            else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0),
        )
        
        instance = OpenAILLM(
            model=model,
            temperature=temperature,
            tool_choice=tool_choice,
            max_completion_tokens=max_completion_tokens,
        )
        instance._client = azure_client
        return instance

    async def chat(
        self,
        messages: ChatContext,
        tools: list[FunctionTool] | None = None,
        **kwargs: Any
    ) -> AsyncIterator[LLMResponse]:
        """
        Implement chat functionality using OpenAI's chat completion API
        
        Args:
            messages: ChatContext containing conversation history
            tools: Optional list of function tools available to the model
            **kwargs: Additional arguments passed to the OpenAI API
            
        Yields:
            LLMResponse objects containing the model's responses
        """
        self._cancelled = False
        
        def _format_content(content: Union[str, List[ChatContent]]):
            if isinstance(content, str):
                return content

            formatted_parts = []
            for part in content:
                if isinstance(part, str):
                    formatted_parts.append({"type": "text", "text": part})
                elif isinstance(part, ImageContent):
                    image_url_data = {"url": part.to_data_url()}
                    if part.inference_detail != "auto":
                        image_url_data["detail"] = part.inference_detail
                    formatted_parts.append(
                        {
                            "type": "image_url",
                            "image_url": image_url_data,
                        }
                    )
            return formatted_parts

        completion_params = {
            "model": self.model,
            "messages": [
                {
                    "role": msg.role.value,
                    "content": _format_content(msg.content),
                    **({"name": msg.name} if hasattr(msg, "name") else {}),
                }
                if isinstance(msg, ChatMessage)
                else {
                    "role": "assistant",
                    "content": None,
                    "function_call": {"name": msg.name, "arguments": msg.arguments},
                }
                if isinstance(msg, FunctionCall)
                else {
                    "role": "function",
                    "name": msg.name,
                    "content": msg.output,
                }
                if isinstance(msg, FunctionCallOutput)
                else None
                for msg in messages.items
                if msg is not None
            ],
            "temperature": self.temperature,
            "stream": True,
            "max_tokens": self.max_completion_tokens,
        }
        if tools:
            formatted_tools = []
            for tool in tools:
                if not is_function_tool(tool):
                    continue
                try:
                    tool_schema = build_openai_schema(tool)
                    formatted_tools.append(tool_schema)
                except Exception as e:
                    self.emit("error", f"Failed to format tool {tool}: {e}")
                    continue
            
            if formatted_tools:
                completion_params["functions"] = formatted_tools
                completion_params["function_call"] = self.tool_choice
        completion_params.update(kwargs)
        try:
            response_stream = await self._client.chat.completions.create(**completion_params)
            
            current_content = ""
            current_function_call = None

            async for chunk in response_stream:
                if self._cancelled:
                    break
                
                if not chunk.choices:
                    continue
                    
                delta = chunk.choices[0].delta
                if delta.function_call:
                    if current_function_call is None:
                        current_function_call = {
                            "name": delta.function_call.name or "",
                            "arguments": delta.function_call.arguments or ""
                        }
                    else:
                        if delta.function_call.name:
                            current_function_call["name"] += delta.function_call.name
                        if delta.function_call.arguments:
                            current_function_call["arguments"] += delta.function_call.arguments
                elif current_function_call is not None:  
                    try:
                        args = json.loads(current_function_call["arguments"])
                        current_function_call["arguments"] = args
                    except json.JSONDecodeError:
                        self.emit("error", f"Failed to parse function arguments: {current_function_call['arguments']}")
                        current_function_call["arguments"] = {}
                    
                    yield LLMResponse(
                        content="",
                        role=ChatRole.ASSISTANT,
                        metadata={"function_call": current_function_call}
                    )
                    current_function_call = None
                
                elif delta.content is not None:
                    current_content = delta.content
                    yield LLMResponse(
                        content=current_content,
                        role=ChatRole.ASSISTANT
                    )

        except Exception as e:
            if not self._cancelled:
                self.emit("error", e)
            raise

    async def cancel_current_generation(self) -> None:
        self._cancelled = True

    async def aclose(self) -> None:
        """Cleanup resources by closing the HTTP client"""
        await self.cancel_current_generation()
        if self._client:
            await self._client.close()

Base class for LLM implementations.

Initialize the OpenAI LLM plugin.

Args

api_key : Optional[str], optional
OpenAI API key. Defaults to None.
model : str
The model to use for the LLM plugin. Defaults to "gpt-4o".
base_url : Optional[str], optional
The base URL for the OpenAI API. Defaults to None.
temperature : float
The temperature to use for the LLM plugin. Defaults to 0.7.
tool_choice : ToolChoice
The tool choice to use for the LLM plugin. Defaults to "auto".
max_completion_tokens : Optional[int], optional
The maximum completion tokens to use for the LLM plugin. Defaults to None.

Ancestors

  • videosdk.agents.llm.llm.LLM
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Static methods

def azure(*,
model: str = 'gpt-4o-mini',
azure_endpoint: str | None = None,
azure_deployment: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
azure_ad_token: str | None = None,
organization: str | None = None,
project: str | None = None,
base_url: str | None = None,
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_completion_tokens: int | None = None,
timeout: httpx.Timeout | None = None) ‑> OpenAILLM
Expand source code
@staticmethod
def azure(
    *,
    model: str = "gpt-4o-mini",
    azure_endpoint: str | None = None,
    azure_deployment: str | None = None,
    api_version: str | None = None,
    api_key: str | None = None,
    azure_ad_token: str | None = None,
    organization: str | None = None,
    project: str | None = None,
    base_url: str | None = None,
    temperature: float = 0.7,
    tool_choice: ToolChoice = "auto",
    max_completion_tokens: int | None = None,
    timeout: httpx.Timeout | None = None,
) -> "OpenAILLM":
    """
    Create a new instance of Azure OpenAI LLM.

    This automatically infers the following arguments from their corresponding environment variables if they are not provided:
    - `api_key` from `AZURE_OPENAI_API_KEY`
    - `organization` from `OPENAI_ORG_ID`
    - `project` from `OPENAI_PROJECT_ID`
    - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN`
    - `api_version` from `OPENAI_API_VERSION`
    - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT`
    - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name)
    """
    
    azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT")
    azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT")
    api_version = api_version or os.getenv("OPENAI_API_VERSION")
    api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY")
    azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN")
    organization = organization or os.getenv("OPENAI_ORG_ID")
    project = project or os.getenv("OPENAI_PROJECT_ID")
    
    if not azure_deployment:
        azure_deployment = model
    
    if not azure_endpoint:
        raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable")
    
    if not api_key and not azure_ad_token:
        raise ValueError("Either API key or Azure AD token must be provided")
    
    azure_client = openai.AsyncAzureOpenAI(
        max_retries=0,
        azure_endpoint=azure_endpoint,
        azure_deployment=azure_deployment,
        api_version=api_version,
        api_key=api_key,
        azure_ad_token=azure_ad_token,
        organization=organization,
        project=project,
        base_url=base_url,
        timeout=timeout
        if timeout
        else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0),
    )
    
    instance = OpenAILLM(
        model=model,
        temperature=temperature,
        tool_choice=tool_choice,
        max_completion_tokens=max_completion_tokens,
    )
    instance._client = azure_client
    return instance

Create a new instance of Azure OpenAI LLM.

This automatically infers the following arguments from their corresponding environment variables if they are not provided: - api_key from AZURE_OPENAI_API_KEY - organization from OPENAI_ORG_ID - project from OPENAI_PROJECT_ID - azure_ad_token from AZURE_OPENAI_AD_TOKEN - api_version from OPENAI_API_VERSION - azure_endpoint from AZURE_OPENAI_ENDPOINT - azure_deployment from AZURE_OPENAI_DEPLOYMENT (if not provided, uses model as deployment name)

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources by closing the HTTP client"""
    await self.cancel_current_generation()
    if self._client:
        await self._client.close()

Cleanup resources by closing the HTTP client

async def cancel_current_generation(self) ‑> None
Expand source code
async def cancel_current_generation(self) -> None:
    self._cancelled = True

Cancel the current LLM generation if active.

Raises

NotImplementedError
This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]
Expand source code
async def chat(
    self,
    messages: ChatContext,
    tools: list[FunctionTool] | None = None,
    **kwargs: Any
) -> AsyncIterator[LLMResponse]:
    """
    Implement chat functionality using OpenAI's chat completion API
    
    Args:
        messages: ChatContext containing conversation history
        tools: Optional list of function tools available to the model
        **kwargs: Additional arguments passed to the OpenAI API
        
    Yields:
        LLMResponse objects containing the model's responses
    """
    self._cancelled = False
    
    def _format_content(content: Union[str, List[ChatContent]]):
        if isinstance(content, str):
            return content

        formatted_parts = []
        for part in content:
            if isinstance(part, str):
                formatted_parts.append({"type": "text", "text": part})
            elif isinstance(part, ImageContent):
                image_url_data = {"url": part.to_data_url()}
                if part.inference_detail != "auto":
                    image_url_data["detail"] = part.inference_detail
                formatted_parts.append(
                    {
                        "type": "image_url",
                        "image_url": image_url_data,
                    }
                )
        return formatted_parts

    completion_params = {
        "model": self.model,
        "messages": [
            {
                "role": msg.role.value,
                "content": _format_content(msg.content),
                **({"name": msg.name} if hasattr(msg, "name") else {}),
            }
            if isinstance(msg, ChatMessage)
            else {
                "role": "assistant",
                "content": None,
                "function_call": {"name": msg.name, "arguments": msg.arguments},
            }
            if isinstance(msg, FunctionCall)
            else {
                "role": "function",
                "name": msg.name,
                "content": msg.output,
            }
            if isinstance(msg, FunctionCallOutput)
            else None
            for msg in messages.items
            if msg is not None
        ],
        "temperature": self.temperature,
        "stream": True,
        "max_tokens": self.max_completion_tokens,
    }
    if tools:
        formatted_tools = []
        for tool in tools:
            if not is_function_tool(tool):
                continue
            try:
                tool_schema = build_openai_schema(tool)
                formatted_tools.append(tool_schema)
            except Exception as e:
                self.emit("error", f"Failed to format tool {tool}: {e}")
                continue
        
        if formatted_tools:
            completion_params["functions"] = formatted_tools
            completion_params["function_call"] = self.tool_choice
    completion_params.update(kwargs)
    try:
        response_stream = await self._client.chat.completions.create(**completion_params)
        
        current_content = ""
        current_function_call = None

        async for chunk in response_stream:
            if self._cancelled:
                break
            
            if not chunk.choices:
                continue
                
            delta = chunk.choices[0].delta
            if delta.function_call:
                if current_function_call is None:
                    current_function_call = {
                        "name": delta.function_call.name or "",
                        "arguments": delta.function_call.arguments or ""
                    }
                else:
                    if delta.function_call.name:
                        current_function_call["name"] += delta.function_call.name
                    if delta.function_call.arguments:
                        current_function_call["arguments"] += delta.function_call.arguments
            elif current_function_call is not None:  
                try:
                    args = json.loads(current_function_call["arguments"])
                    current_function_call["arguments"] = args
                except json.JSONDecodeError:
                    self.emit("error", f"Failed to parse function arguments: {current_function_call['arguments']}")
                    current_function_call["arguments"] = {}
                
                yield LLMResponse(
                    content="",
                    role=ChatRole.ASSISTANT,
                    metadata={"function_call": current_function_call}
                )
                current_function_call = None
            
            elif delta.content is not None:
                current_content = delta.content
                yield LLMResponse(
                    content=current_content,
                    role=ChatRole.ASSISTANT
                )

    except Exception as e:
        if not self._cancelled:
            self.emit("error", e)
        raise

Implement chat functionality using OpenAI's chat completion API

Args

messages
ChatContext containing conversation history
tools
Optional list of function tools available to the model
**kwargs
Additional arguments passed to the OpenAI API

Yields

LLMResponse objects containing the model's responses

class OpenAIRealtime (*,
api_key: str | None = None,
model: str,
config: OpenAIRealtimeConfig | None = None,
base_url: str | None = None)
Expand source code
class OpenAIRealtime(RealtimeBaseModel[OpenAIEventTypes]):
    """OpenAI's realtime model implementation."""

    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str,
        config: OpenAIRealtimeConfig | None = None,
        base_url: str | None = None,
    ) -> None:
        """
        Initialize OpenAI realtime model.

        Args:
            api_key: OpenAI API key. If not provided, will attempt to read from OPENAI_API_KEY env var
            model: The OpenAI model identifier to use (e.g. 'gpt-4', 'gpt-3.5-turbo')
            config: Optional configuration object for customizing model behavior. Contains settings for:
                   - voice: Voice ID to use for audio output
                   - temperature: Sampling temperature for responses
                   - turn_detection: Settings for detecting user speech turns
                   - input_audio_transcription: Settings for audio transcription
                   - tool_choice: How tools should be selected ('auto' or 'none')
                   - modalities: List of enabled modalities ('text', 'audio')
            base_url: Base URL for OpenAI API. Defaults to 'https://api.openai.com/v1'

        Raises:
            ValueError: If no API key is provided and none found in environment variables
        """
        super().__init__()
        self.model = model
        self.api_key = api_key or os.getenv("OPENAI_API_KEY")
        self.base_url = base_url or OPENAI_BASE_URL
        if not self.api_key:
            self.emit(
                "error",
                "OpenAI API key must be provided or set in OPENAI_API_KEY environment variable",
            )
            raise ValueError(
                "OpenAI API key must be provided or set in OPENAI_API_KEY environment variable"
            )
        self._http_session: Optional[aiohttp.ClientSession] = None
        self._session: Optional[OpenAISession] = None
        self._closing = False
        self._instructions: Optional[str] = None
        self._tools: Optional[List[FunctionTool]] = []
        self.loop = None
        self.audio_track: Optional[CustomAudioStreamTrack] = None
        self._formatted_tools: Optional[List[Dict[str, Any]]] = None
        self.config: OpenAIRealtimeConfig = config or OpenAIRealtimeConfig()
        self.input_sample_rate = 48000
        self.target_sample_rate = 16000
        self._agent_speaking = False

    def set_agent(self, agent: Agent) -> None:
        self._instructions = agent.instructions
        self._tools = agent.tools
        self.tools_formatted = self._format_tools_for_session(self._tools)
        self._formatted_tools = self.tools_formatted

    async def connect(self) -> None:
        headers = {"Agent": "VideoSDK Agents"}
        headers["Authorization"] = f"Bearer {self.api_key}"
        headers["OpenAI-Beta"] = "realtime=v1"

        url = self.process_base_url(self.base_url, self.model)

        self._session = await self._create_session(url, headers)
        await self._handle_websocket(self._session)
        await self.send_first_session_update()

    async def handle_audio_input(self, audio_data: bytes) -> None:
        """Handle incoming audio data from the user"""
        if self._session and not self._closing and "audio" in self.config.modalities:
            audio_data = np.frombuffer(audio_data, dtype=np.int16)
            audio_data = signal.resample(
                audio_data,
                int(len(audio_data) * self.target_sample_rate / self.input_sample_rate),
            )
            audio_data = audio_data.astype(np.int16).tobytes()
            base64_audio_data = base64.b64encode(audio_data).decode("utf-8")
            audio_event = {
                "type": "input_audio_buffer.append",
                "audio": base64_audio_data,
            }
            await self.send_event(audio_event)

    async def _ensure_http_session(self) -> aiohttp.ClientSession:
        """Ensure we have an HTTP session"""
        if not self._http_session:
            self._http_session = aiohttp.ClientSession()
        return self._http_session

    async def _create_session(self, url: str, headers: dict) -> OpenAISession:
        """Create a new WebSocket session"""

        http_session = await self._ensure_http_session()
        ws = await http_session.ws_connect(
            url,
            headers=headers,
            autoping=True,
            heartbeat=10,
            autoclose=False,
            timeout=30,
        )
        msg_queue: asyncio.Queue = asyncio.Queue()
        tasks: list[asyncio.Task] = []

        self._closing = False

        return OpenAISession(ws=ws, msg_queue=msg_queue, tasks=tasks)

    async def send_message(self, message: str) -> None:
        """Send a message to the OpenAI realtime API"""
        await self.send_event(
            {
                "type": "conversation.item.create",
                "item": {
                    "type": "message",
                    "role": "assistant",
                    "content": [
                        {
                            "type": "text",
                            "text": "Repeat the user's exact message back to them:"
                            + message
                            + "DO NOT ADD ANYTHING ELSE",
                        }
                    ],
                },
            }
        )
        await self.create_response()

    async def create_response(self) -> None:
        """Create a response to the OpenAI realtime API"""
        if not self._session:
            self.emit("error", "No active WebSocket session")
            raise RuntimeError("No active WebSocket session")

        response_event = {
            "type": "response.create",
            "event_id": str(uuid.uuid4()),
            "response": {
                "instructions": self._instructions,
                "metadata": {"client_event_id": str(uuid.uuid4())},
            },
        }

        await self.send_event(response_event)

    async def _handle_websocket(self, session: OpenAISession) -> None:
        """Start WebSocket send/receive tasks"""
        session.tasks.extend(
            [
                asyncio.create_task(self._send_loop(session), name="send_loop"),
                asyncio.create_task(self._receive_loop(session), name="receive_loop"),
            ]
        )

    async def _send_loop(self, session: OpenAISession) -> None:
        """Send messages from queue to WebSocket"""
        try:
            while not self._closing:
                msg = await session.msg_queue.get()
                if isinstance(msg, dict):
                    await session.ws.send_json(msg)
                else:
                    await session.ws.send_str(str(msg))
        except asyncio.CancelledError:
            pass
        finally:
            await self._cleanup_session(session)

    async def _receive_loop(self, session: OpenAISession) -> None:
        """Receive and process WebSocket messages"""
        try:
            while not self._closing:
                msg = await session.ws.receive()

                if msg.type == aiohttp.WSMsgType.CLOSED:
                    self.emit("error", f"WebSocket closed with reason: {msg.extra}")
                    break
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    self.emit("error", f"WebSocket error: {msg.data}")
                    break
                elif msg.type == aiohttp.WSMsgType.TEXT:
                    await self._handle_message(json.loads(msg.data))
        except Exception as e:
            self.emit("error", f"WebSocket receive error: {str(e)}")
        finally:
            await self._cleanup_session(session)

    async def _handle_message(self, data: dict) -> None:
        """Handle incoming WebSocket messages"""
        try:
            event_type = data.get("type")

            if event_type == "input_audio_buffer.speech_started":
                await self._handle_speech_started(data)

            elif event_type == "input_audio_buffer.speech_stopped":
                await self._handle_speech_stopped(data)

            elif event_type == "response.created":
                await self._handle_response_created(data)

            elif event_type == "response.output_item.added":
                await self._handle_output_item_added(data)

            elif event_type == "response.content_part.added":
                await self._handle_content_part_added(data)

            elif event_type == "response.text.delta":
                await self._handle_text_delta(data)

            elif event_type == "response.audio.delta":
                await self._handle_audio_delta(data)

            elif event_type == "response.audio_transcript.delta":
                await self._handle_audio_transcript_delta(data)

            elif event_type == "response.done":
                await self._handle_response_done(data)

            elif event_type == "error":
                await self._handle_error(data)

            elif event_type == "response.function_call_arguments.delta":
                await self._handle_function_call_arguments_delta(data)

            elif event_type == "response.function_call_arguments.done":
                await self._handle_function_call_arguments_done(data)

            elif event_type == "response.output_item.done":
                await self._handle_output_item_done(data)

            elif event_type == "conversation.item.input_audio_transcription.completed":
                await self._handle_input_audio_transcription_completed(data)

            elif event_type == "response.text.done":
                await self._handle_text_done(data)

        except Exception as e:
            self.emit("error", f"Error handling event {event_type}: {str(e)}")

    async def _handle_speech_started(self, data: dict) -> None:
        """Handle speech detection start"""
        if "audio" in self.config.modalities:
            self.emit("user_speech_started", {"type": "done"})
            await self.interrupt()
            if self.audio_track:
                self.audio_track.interrupt()
        await realtime_metrics_collector.set_user_speech_start()

    async def _handle_speech_stopped(self, data: dict) -> None:
        """Handle speech detection end"""
        await realtime_metrics_collector.set_user_speech_end()

    async def _handle_response_created(self, data: dict) -> None:
        """Handle initial response creation"""
        response_id = data.get("response", {}).get("id")

    async def _handle_output_item_added(self, data: dict) -> None:
        """Handle new output item addition"""

    async def _handle_output_item_done(self, data: dict) -> None:
        """Handle output item done"""
        try:
            item = data.get("item", {})
            if (
                item.get("type") == "function_call"
                and item.get("status") == "completed"
            ):
                name = item.get("name")
                arguments = json.loads(item.get("arguments", "{}"))

                if name and self._tools:
                    for tool in self._tools:
                        tool_info = get_tool_info(tool)
                        if tool_info.name == name:
                            try:
                                await realtime_metrics_collector.add_tool_call(name)
                                result = await tool(**arguments)
                                await self.send_event(
                                    {
                                        "type": "conversation.item.create",
                                        "item": {
                                            "type": "function_call_output",
                                            "call_id": item.get("call_id"),
                                            "output": json.dumps(result),
                                        },
                                    }
                                )

                                await self.send_event(
                                    {
                                        "type": "response.create",
                                        "event_id": str(uuid.uuid4()),
                                        "response": {
                                            "instructions": self._instructions,
                                            "metadata": {
                                                "client_event_id": str(uuid.uuid4())
                                            },
                                        },
                                    }
                                )

                            except Exception as e:
                                self.emit(
                                    "error", f"Error executing function {name}: {e}"
                                )
                            break
        except Exception as e:
            self.emit("error", f"Error handling output item done: {e}")

    async def _handle_content_part_added(self, data: dict) -> None:
        """Handle new content part"""

    async def _handle_text_delta(self, data: dict) -> None:
        """Handle text delta chunk"""
        pass

    async def _handle_audio_delta(self, data: dict) -> None:
        """Handle audio chunk"""
        if "audio" not in self.config.modalities:
            return

        try:
            if not self._agent_speaking:
                await realtime_metrics_collector.set_agent_speech_start()
                self._agent_speaking = True
            base64_audio_data = base64.b64decode(data.get("delta"))
            if base64_audio_data:
                if self.audio_track and self.loop:
                    asyncio.create_task(
                        self.audio_track.add_new_bytes(base64_audio_data)
                    )
        except Exception as e:
            self.emit("error", f"Error handling audio delta: {e}")
            traceback.print_exc()

    async def interrupt(self) -> None:
        """Interrupt the current response and flush audio"""
        if self._session and not self._closing:
            cancel_event = {"type": "response.cancel", "event_id": str(uuid.uuid4())}
            await self.send_event(cancel_event)
            await realtime_metrics_collector.set_interrupted()
        if self.audio_track:
            self.audio_track.interrupt()
        if self._agent_speaking:
            await realtime_metrics_collector.set_agent_speech_end(timeout=1.0)
            self._agent_speaking = False

    async def _handle_audio_transcript_delta(self, data: dict) -> None:
        """Handle transcript chunk"""
        delta_content = data.get("delta", "")
        if not hasattr(self, "_current_audio_transcript"):
            self._current_audio_transcript = ""
        self._current_audio_transcript += delta_content

    async def _handle_input_audio_transcription_completed(self, data: dict) -> None:
        """Handle input audio transcription completion for user transcript"""
        transcript = data.get("transcript", "")
        if transcript:
            await realtime_metrics_collector.set_user_transcript(transcript)
            try:
                self.emit(
                    "realtime_model_transcription",
                    {"role": "user", "text": transcript, "is_final": True},
                )
            except Exception:
                pass

    async def _handle_response_done(self, data: dict) -> None:
        """Handle response completion for agent transcript"""
        if (
            hasattr(self, "_current_audio_transcript")
            and self._current_audio_transcript
        ):
            await realtime_metrics_collector.set_agent_response(
                self._current_audio_transcript
            )
            global_event_emitter.emit(
                "text_response",
                {"text": self._current_audio_transcript, "type": "done"},
            )
            try:
                self.emit(
                    "realtime_model_transcription",
                    {
                        "role": "agent",
                        "text": self._current_audio_transcript,
                        "is_final": True,
                    },
                )
            except Exception:
                pass
            self._current_audio_transcript = ""
        await realtime_metrics_collector.set_agent_speech_end(timeout=1.0)
        self._agent_speaking = False
        pass

    async def _handle_function_call_arguments_delta(self, data: dict) -> None:
        """Handle function call arguments delta"""

    async def _handle_function_call_arguments_done(self, data: dict) -> None:
        """Handle function call arguments done"""

    async def _handle_error(self, data: dict) -> None:
        """Handle error events"""

    async def _cleanup_session(self, session: OpenAISession) -> None:
        """Clean up session resources"""
        if self._closing:
            return

        self._closing = True

        for task in session.tasks:
            if not task.done():
                task.cancel()
                try:
                    await asyncio.wait_for(task, timeout=1.0)  # Add timeout
                except (asyncio.CancelledError, asyncio.TimeoutError):
                    pass

        if not session.ws.closed:
            try:
                await session.ws.close()
            except Exception:
                pass

    async def send_event(self, event: Dict[str, Any]) -> None:
        """Send an event to the WebSocket"""
        if self._session and not self._closing:
            await self._session.msg_queue.put(event)

    async def aclose(self) -> None:
        """Cleanup all resources"""
        if self._closing:
            return

        self._closing = True

        if self._session:
            await self._cleanup_session(self._session)

        if self._http_session and not self._http_session.closed:
            await self._http_session.close()

    async def send_first_session_update(self) -> None:
        """Send initial session update with default values after connection"""
        if not self._session:
            return

        turn_detection = None
        input_audio_transcription = None

        if "audio" in self.config.modalities:
            turn_detection = (
                self.config.turn_detection.model_dump(
                    by_alias=True,
                    exclude_unset=True,
                    exclude_defaults=True,
                )
                if self.config.turn_detection
                else None
            )
            input_audio_transcription = (
                self.config.input_audio_transcription.model_dump(
                    by_alias=True,
                    exclude_unset=True,
                    exclude_defaults=True,
                )
                if self.config.input_audio_transcription
                else None
            )

        session_update = {
            "type": "session.update",
            "session": {
                "model": self.model,
                "instructions": self._instructions
                or "You are a helpful assistant that can answer questions and help with tasks.",
                "temperature": self.config.temperature,
                "tool_choice": self.config.tool_choice,
                "tools": self._formatted_tools or [],
                "modalities": self.config.modalities,
                "max_response_output_tokens": "inf",
            },
        }

        if "audio" in self.config.modalities:
            session_update["session"]["voice"] = self.config.voice
            session_update["session"]["input_audio_format"] = DEFAULT_INPUT_AUDIO_FORMAT
            session_update["session"][
                "output_audio_format"
            ] = DEFAULT_OUTPUT_AUDIO_FORMAT
            if turn_detection:
                session_update["session"]["turn_detection"] = turn_detection
            if input_audio_transcription:
                session_update["session"][
                    "input_audio_transcription"
                ] = input_audio_transcription

        # Send the event
        await self.send_event(session_update)

    def process_base_url(self, url: str, model: str) -> str:
        if url.startswith("http"):
            url = url.replace("http", "ws", 1)

        parsed_url = urlparse(url)
        query_params = parse_qs(parsed_url.query)

        if not parsed_url.path or parsed_url.path.rstrip("/") in ["", "/v1", "/openai"]:
            path = parsed_url.path.rstrip("/") + "/realtime"
        else:
            path = parsed_url.path

        if "model" not in query_params:
            query_params["model"] = [model]

        new_query = urlencode(query_params, doseq=True)
        new_url = urlunparse(
            (parsed_url.scheme, parsed_url.netloc, path, "", new_query, "")
        )

        return new_url

    def _format_tools_for_session(
        self, tools: List[FunctionTool]
    ) -> List[Dict[str, Any]]:
        """Format tools for OpenAI session update"""
        oai_tools = []
        for tool in tools:
            if not is_function_tool(tool):
                continue

            try:
                tool_schema = build_openai_schema(tool)
                oai_tools.append(tool_schema)
            except Exception as e:
                self.emit("error", f"Failed to format tool {tool}: {e}")
                continue

        return oai_tools

    async def send_text_message(self, message: str) -> None:
        """Send a text message to the OpenAI realtime API"""
        if not self._session:
            self.emit("error", "No active WebSocket session")
            raise RuntimeError("No active WebSocket session")

        await self.send_event(
            {
                "type": "conversation.item.create",
                "item": {
                    "type": "message",
                    "role": "user",
                    "content": [{"type": "input_text", "text": message}],
                },
            }
        )
        await self.create_response()

OpenAI's realtime model implementation.

Initialize OpenAI realtime model.

Args

api_key
OpenAI API key. If not provided, will attempt to read from OPENAI_API_KEY env var
model
The OpenAI model identifier to use (e.g. 'gpt-4', 'gpt-3.5-turbo')
config
Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID to use for audio output - temperature: Sampling temperature for responses - turn_detection: Settings for detecting user speech turns - input_audio_transcription: Settings for audio transcription - tool_choice: How tools should be selected ('auto' or 'none') - modalities: List of enabled modalities ('text', 'audio')
base_url
Base URL for OpenAI API. Defaults to 'https://api.openai.com/v1'

Raises

ValueError
If no API key is provided and none found in environment variables

Ancestors

  • videosdk.agents.realtime_base_model.RealtimeBaseModel
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic
  • abc.ABC

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup all resources"""
    if self._closing:
        return

    self._closing = True

    if self._session:
        await self._cleanup_session(self._session)

    if self._http_session and not self._http_session.closed:
        await self._http_session.close()

Cleanup all resources

async def connect(self) ‑> None
Expand source code
async def connect(self) -> None:
    headers = {"Agent": "VideoSDK Agents"}
    headers["Authorization"] = f"Bearer {self.api_key}"
    headers["OpenAI-Beta"] = "realtime=v1"

    url = self.process_base_url(self.base_url, self.model)

    self._session = await self._create_session(url, headers)
    await self._handle_websocket(self._session)
    await self.send_first_session_update()
async def create_response(self) ‑> None
Expand source code
async def create_response(self) -> None:
    """Create a response to the OpenAI realtime API"""
    if not self._session:
        self.emit("error", "No active WebSocket session")
        raise RuntimeError("No active WebSocket session")

    response_event = {
        "type": "response.create",
        "event_id": str(uuid.uuid4()),
        "response": {
            "instructions": self._instructions,
            "metadata": {"client_event_id": str(uuid.uuid4())},
        },
    }

    await self.send_event(response_event)

Create a response to the OpenAI realtime API

async def handle_audio_input(self, audio_data: bytes) ‑> None
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None:
    """Handle incoming audio data from the user"""
    if self._session and not self._closing and "audio" in self.config.modalities:
        audio_data = np.frombuffer(audio_data, dtype=np.int16)
        audio_data = signal.resample(
            audio_data,
            int(len(audio_data) * self.target_sample_rate / self.input_sample_rate),
        )
        audio_data = audio_data.astype(np.int16).tobytes()
        base64_audio_data = base64.b64encode(audio_data).decode("utf-8")
        audio_event = {
            "type": "input_audio_buffer.append",
            "audio": base64_audio_data,
        }
        await self.send_event(audio_event)

Handle incoming audio data from the user

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt the current response and flush audio"""
    if self._session and not self._closing:
        cancel_event = {"type": "response.cancel", "event_id": str(uuid.uuid4())}
        await self.send_event(cancel_event)
        await realtime_metrics_collector.set_interrupted()
    if self.audio_track:
        self.audio_track.interrupt()
    if self._agent_speaking:
        await realtime_metrics_collector.set_agent_speech_end(timeout=1.0)
        self._agent_speaking = False

Interrupt the current response and flush audio

def process_base_url(self, url: str, model: str) ‑> str
Expand source code
def process_base_url(self, url: str, model: str) -> str:
    if url.startswith("http"):
        url = url.replace("http", "ws", 1)

    parsed_url = urlparse(url)
    query_params = parse_qs(parsed_url.query)

    if not parsed_url.path or parsed_url.path.rstrip("/") in ["", "/v1", "/openai"]:
        path = parsed_url.path.rstrip("/") + "/realtime"
    else:
        path = parsed_url.path

    if "model" not in query_params:
        query_params["model"] = [model]

    new_query = urlencode(query_params, doseq=True)
    new_url = urlunparse(
        (parsed_url.scheme, parsed_url.netloc, path, "", new_query, "")
    )

    return new_url
async def send_event(self, event: Dict[str, Any]) ‑> None
Expand source code
async def send_event(self, event: Dict[str, Any]) -> None:
    """Send an event to the WebSocket"""
    if self._session and not self._closing:
        await self._session.msg_queue.put(event)

Send an event to the WebSocket

async def send_first_session_update(self) ‑> None
Expand source code
async def send_first_session_update(self) -> None:
    """Send initial session update with default values after connection"""
    if not self._session:
        return

    turn_detection = None
    input_audio_transcription = None

    if "audio" in self.config.modalities:
        turn_detection = (
            self.config.turn_detection.model_dump(
                by_alias=True,
                exclude_unset=True,
                exclude_defaults=True,
            )
            if self.config.turn_detection
            else None
        )
        input_audio_transcription = (
            self.config.input_audio_transcription.model_dump(
                by_alias=True,
                exclude_unset=True,
                exclude_defaults=True,
            )
            if self.config.input_audio_transcription
            else None
        )

    session_update = {
        "type": "session.update",
        "session": {
            "model": self.model,
            "instructions": self._instructions
            or "You are a helpful assistant that can answer questions and help with tasks.",
            "temperature": self.config.temperature,
            "tool_choice": self.config.tool_choice,
            "tools": self._formatted_tools or [],
            "modalities": self.config.modalities,
            "max_response_output_tokens": "inf",
        },
    }

    if "audio" in self.config.modalities:
        session_update["session"]["voice"] = self.config.voice
        session_update["session"]["input_audio_format"] = DEFAULT_INPUT_AUDIO_FORMAT
        session_update["session"][
            "output_audio_format"
        ] = DEFAULT_OUTPUT_AUDIO_FORMAT
        if turn_detection:
            session_update["session"]["turn_detection"] = turn_detection
        if input_audio_transcription:
            session_update["session"][
                "input_audio_transcription"
            ] = input_audio_transcription

    # Send the event
    await self.send_event(session_update)

Send initial session update with default values after connection

async def send_message(self, message: str) ‑> None
Expand source code
async def send_message(self, message: str) -> None:
    """Send a message to the OpenAI realtime API"""
    await self.send_event(
        {
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "assistant",
                "content": [
                    {
                        "type": "text",
                        "text": "Repeat the user's exact message back to them:"
                        + message
                        + "DO NOT ADD ANYTHING ELSE",
                    }
                ],
            },
        }
    )
    await self.create_response()

Send a message to the OpenAI realtime API

async def send_text_message(self, message: str) ‑> None
Expand source code
async def send_text_message(self, message: str) -> None:
    """Send a text message to the OpenAI realtime API"""
    if not self._session:
        self.emit("error", "No active WebSocket session")
        raise RuntimeError("No active WebSocket session")

    await self.send_event(
        {
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "user",
                "content": [{"type": "input_text", "text": message}],
            },
        }
    )
    await self.create_response()

Send a text message to the OpenAI realtime API

def set_agent(self, agent: Agent) ‑> None
Expand source code
def set_agent(self, agent: Agent) -> None:
    self._instructions = agent.instructions
    self._tools = agent.tools
    self.tools_formatted = self._format_tools_for_session(self._tools)
    self._formatted_tools = self.tools_formatted
class OpenAIRealtimeConfig (voice: str = 'alloy',
temperature: float = 0.8,
turn_detection: TurnDetection | None = <factory>,
input_audio_transcription: InputAudioTranscription | None = <factory>,
tool_choice: ToolChoice | None = 'auto',
modalities: list[str] = <factory>)
Expand source code
@dataclass
class OpenAIRealtimeConfig:
    """Configuration for the OpenAI realtime API

    Args:
        voice: Voice ID for audio output. Default is 'alloy'
        temperature: Controls randomness in response generation. Higher values (e.g. 0.8) make output more random,
                    lower values make it more deterministic. Default is 0.8
        turn_detection: Configuration for detecting user speech turns. Contains settings for:
                       - type: Detection type ('server_vad')
                       - threshold: Voice activity detection threshold (0.0-1.0)
                       - prefix_padding_ms: Padding before speech start (ms)
                       - silence_duration_ms: Silence duration to mark end (ms)
                       - create_response: Whether to generate response on turn
                       - interrupt_response: Whether to allow interruption
        input_audio_transcription: Configuration for audio transcription. Contains:
                                 - model: Model to use for transcription
        tool_choice: How tools should be selected ('auto' or 'none'). Default is 'auto'
        modalities: List of enabled response types ["text", "audio"]. Default includes both
    """

    voice: str = DEFAULT_VOICE
    temperature: float = DEFAULT_TEMPERATURE
    turn_detection: TurnDetection | None = field(
        default_factory=lambda: DEFAULT_TURN_DETECTION
    )
    input_audio_transcription: InputAudioTranscription | None = field(
        default_factory=lambda: DEFAULT_INPUT_AUDIO_TRANSCRIPTION
    )
    tool_choice: ToolChoice | None = DEFAULT_TOOL_CHOICE
    modalities: list[str] = field(default_factory=lambda: ["text", "audio"])

Configuration for the OpenAI realtime API

Args

voice
Voice ID for audio output. Default is 'alloy'
temperature
Controls randomness in response generation. Higher values (e.g. 0.8) make output more random, lower values make it more deterministic. Default is 0.8
turn_detection
Configuration for detecting user speech turns. Contains settings for: - type: Detection type ('server_vad') - threshold: Voice activity detection threshold (0.0-1.0) - prefix_padding_ms: Padding before speech start (ms) - silence_duration_ms: Silence duration to mark end (ms) - create_response: Whether to generate response on turn - interrupt_response: Whether to allow interruption
input_audio_transcription
Configuration for audio transcription. Contains: - model: Model to use for transcription
tool_choice
How tools should be selected ('auto' or 'none'). Default is 'auto'
modalities
List of enabled response types ["text", "audio"]. Default includes both

Instance variables

var input_audio_transcription : openai.types.beta.realtime.session.InputAudioTranscription | None
var modalities : list[str]
var temperature : float
var tool_choice : Literal['auto', 'required', 'none'] | None
var turn_detection : openai.types.beta.realtime.session.TurnDetection | None
var voice : str
class OpenAISTT (*,
api_key: str | None = None,
model: str = 'gpt-4o-mini-transcribe',
base_url: str | None = None,
prompt: str | None = None,
language: str = 'en',
turn_detection: dict | None = None,
enable_streaming: bool = True,
silence_threshold: float = 0.01,
silence_duration: float = 0.8)
Expand source code
class OpenAISTT(BaseSTT):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "gpt-4o-mini-transcribe",
        base_url: str | None = None,
        prompt: str | None = None,
        language: str = "en",
        turn_detection: dict | None = None,
        enable_streaming: bool = True,
        silence_threshold: float = 0.01,
        silence_duration: float = 0.8,
    ) -> None:
        """Initialize the OpenAI STT plugin.

        Args:
            api_key (Optional[str], optional): OpenAI API key. Defaults to None.
            model (str): The model to use for the STT plugin. Defaults to "whisper-1".
            base_url (Optional[str], optional): The base URL for the OpenAI API. Defaults to None.
            prompt (Optional[str], optional): The prompt for the STT plugin. Defaults to None.
            language (str): The language to use for the STT plugin. Defaults to "en".
            turn_detection (dict | None): The turn detection for the STT plugin. Defaults to None.
        """
        super().__init__()
        
        self.api_key = api_key or os.getenv("OPENAI_API_KEY")
        if not self.api_key:
            raise ValueError("OpenAI API key must be provided either through api_key parameter or OPENAI_API_KEY environment variable")
        
        self.model = model
        self.language = language
        self.prompt = prompt
        self.turn_detection = turn_detection or {
            "type": "server_vad",
            "threshold": 0.5,
            "prefix_padding_ms": 300,
            "silence_duration_ms": 500,
        }
        self.enable_streaming = enable_streaming
        
        # Custom VAD parameters for non-streaming mode
        self.silence_threshold_bytes = int(silence_threshold * 32767)
        self.silence_duration_frames = int(silence_duration * 48000)  # input_sample_rate
        
        self.client = openai.AsyncClient(
            max_retries=0,
            api_key=self.api_key,
            base_url=base_url or None,
            http_client=httpx.AsyncClient(
                timeout=httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0),
                follow_redirects=True,
                limits=httpx.Limits(
                    max_connections=50,
                    max_keepalive_connections=50,
                    keepalive_expiry=120,
                ),
            ),
        )
        
        self._session: Optional[aiohttp.ClientSession] = None
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self._ws_task: Optional[asyncio.Task] = None
        self._current_text = ""
        self._last_interim_at = 0
        self.input_sample_rate = 48000
        self.target_sample_rate = 16000
        self._audio_buffer = bytearray()
        
        # Custom VAD state for non-streaming mode
        self._is_speaking = False
        self._silence_frames = 0
        
    @staticmethod
    def azure(
        *,
        model: str = "gpt-4o-mini-transcribe",
        language: str = "en",
        prompt: str | None = None,
        turn_detection: dict | None = None,
        azure_endpoint: str | None = None,
        azure_deployment: str | None = None,
        api_version: str | None = None,
        api_key: str | None = None,
        azure_ad_token: str | None = None,
        organization: str | None = None,
        project: str | None = None,
        base_url: str | None = None,
        enable_streaming: bool = False,
        timeout: httpx.Timeout | None = None,
    ) -> "OpenAISTT":
        """
        Create a new instance of Azure OpenAI STT.

        This automatically infers the following arguments from their corresponding environment variables if they are not provided:
        - `api_key` from `AZURE_OPENAI_API_KEY`
        - `organization` from `OPENAI_ORG_ID`
        - `project` from `OPENAI_PROJECT_ID`
        - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN`
        - `api_version` from `OPENAI_API_VERSION`
        - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT`
        - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name)
        """
        
        # Get values from environment variables if not provided
        azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT")
        azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT")
        api_version = api_version or os.getenv("OPENAI_API_VERSION")
        api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY")
        azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN")
        organization = organization or os.getenv("OPENAI_ORG_ID")
        project = project or os.getenv("OPENAI_PROJECT_ID")
        
        # If azure_deployment is not provided, use model as the deployment name
        if not azure_deployment:
            azure_deployment = model
        
        if not azure_endpoint:
            raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable")
        
        if not api_key and not azure_ad_token:
            raise ValueError("Either API key or Azure AD token must be provided")
        
        azure_client = openai.AsyncAzureOpenAI(
            max_retries=0,
            azure_endpoint=azure_endpoint,
            azure_deployment=azure_deployment,
            api_version=api_version,
            api_key=api_key,
            azure_ad_token=azure_ad_token,
            organization=organization,
            project=project,
            base_url=base_url,
            timeout=timeout
            if timeout
            else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0),
        )
        
        instance = OpenAISTT(
            model=model,
            language=language,
            prompt=prompt,
            turn_detection=turn_detection,
            enable_streaming=enable_streaming,
        )
        instance.client = azure_client
        return instance
        
    async def process_audio(
        self,
        audio_frames: bytes,
        language: Optional[str] = None,
        **kwargs: Any
    ) -> None:
        """Process audio frames and send to OpenAI based on enabled mode"""
        
        if not self.enable_streaming:
            await self._transcribe_non_streaming(audio_frames)
            return
        
        if not self._ws:
            await self._connect_ws()
            self._ws_task = asyncio.create_task(self._listen_for_responses())
            
        try:
            audio_data = np.frombuffer(audio_frames, dtype=np.int16)
            audio_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate))
            audio_data = audio_data.astype(np.int16).tobytes()
            audio_data = base64.b64encode(audio_data).decode("utf-8")
            message = {
                "type": "input_audio_buffer.append",
                "audio": audio_data,
            }
            await self._ws.send_json(message)
        except Exception as e:
            print(f"Error in process_audio: {str(e)}")
            self.emit("error", str(e))
            if self._ws:
                await self._ws.close()
                self._ws = None
                if self._ws_task:
                    self._ws_task.cancel()
                    self._ws_task = None

    async def _transcribe_non_streaming(self, audio_frames: bytes) -> None:
        """HTTP-based transcription using OpenAI audio/transcriptions API with custom VAD"""
        if not audio_frames:
            return
            
        self._audio_buffer.extend(audio_frames)
        
        # Custom VAD logic similar to other STT implementations
        is_silent_chunk = self._is_silent(audio_frames)
        
        if not is_silent_chunk:
            if not self._is_speaking:
                self._is_speaking = True
                global_event_emitter.emit("speech_started")
            self._silence_frames = 0
        else:
            if self._is_speaking:
                self._silence_frames += len(audio_frames) // 4  # Approximate frame count
                if self._silence_frames > self.silence_duration_frames:
                    global_event_emitter.emit("speech_stopped")
                    await self._process_audio_buffer()
                    self._is_speaking = False
                    self._silence_frames = 0

    def _is_silent(self, audio_chunk: bytes) -> bool:
        """Simple VAD: check if the max amplitude is below a threshold."""
        audio_data = np.frombuffer(audio_chunk, dtype=np.int16)
        return np.max(np.abs(audio_data)) < self.silence_threshold_bytes



    async def _process_audio_buffer(self) -> None:
        """Process the accumulated audio buffer with OpenAI transcription"""
        if not self._audio_buffer:
            return
            
        audio_data = bytes(self._audio_buffer)
        self._audio_buffer.clear()
        
        wav_bytes = self._audio_frames_to_wav_bytes(audio_data)
        
        try:
            resp = await self.client.audio.transcriptions.create(
                file=("audio.wav", wav_bytes, "audio/wav"),
                model=self.model,
                language=self.language,
                prompt=self.prompt or openai.NOT_GIVEN,
            )
            text = getattr(resp, "text", "")
            if text and self._transcript_callback:
                await self._transcript_callback(STTResponse(
                    event_type=SpeechEventType.FINAL,
                    data=SpeechData(text=text, language=self.language),
                    metadata={"model": self.model}
                ))
        except Exception as e:
            print(f"OpenAI transcription error: {str(e)}")
            self.emit("error", str(e))

    def _audio_frames_to_wav_bytes(self, audio_frames: bytes) -> bytes:
        """Convert audio frames to WAV bytes"""
        pcm = np.frombuffer(audio_frames, dtype=np.int16)
        resampled = signal.resample(pcm, int(len(pcm) * self.target_sample_rate / self.input_sample_rate))
        resampled = resampled.astype(np.int16)
        
        buf = io.BytesIO()
        with wave.open(buf, "wb") as wf:
            wf.setnchannels(1)  # Mono
            wf.setsampwidth(2)  # 16-bit PCM
            wf.setframerate(self.target_sample_rate)
            wf.writeframes(resampled.tobytes())
        
        return buf.getvalue()

    async def _listen_for_responses(self) -> None:
        """Background task to listen for WebSocket responses"""
        if not self._ws:
            return
            
        try:
            async for msg in self._ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = msg.json()
                    responses = self._handle_ws_message(data)
                    for response in responses:
                        if self._transcript_callback:
                            await self._transcript_callback(response)
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    error = f"WebSocket error: {self._ws.exception()}"
                    print(error)
                    self.emit("error", error)
                    break
                elif msg.type == aiohttp.WSMsgType.CLOSED:
                    print("WebSocket connection closed")
                    break
        except Exception as e:
            error = f"Error in WebSocket listener: {str(e)}"
            print(error)
            self.emit("error", error)
        finally:
            if self._ws:
                await self._ws.close()
                self._ws = None
                
    async def _connect_ws(self) -> None:
        """Establish WebSocket connection with OpenAI's Realtime API"""
        
        if not self._session:
            self._session = aiohttp.ClientSession()
            
        config = {
            "type": "transcription_session.update",
            "session": {
                "input_audio_format": "pcm16",
                "input_audio_transcription": {
                    "model": self.model,
                    "prompt": self.prompt or "",
                    "language": self.language if self.language else None,
                },
                "turn_detection": self.turn_detection,
                "input_audio_noise_reduction": {
                    "type": "near_field"
                },
                "include": ["item.input_audio_transcription.logprobs"]
            }
        }
        
        query_params = {
            "intent": "transcription",
        }
        headers = {
            "User-Agent": "VideoSDK",
            "Authorization": f"Bearer {self.api_key}",
            "OpenAI-Beta": "realtime=v1",
        }
        
        base_url = str(self.client.base_url).rstrip('/')
        ws_url = f"{base_url}/realtime?{urlencode(query_params)}"
        if ws_url.startswith("http"):
            ws_url = ws_url.replace("http", "ws", 1)

        try:
            self._ws = await self._session.ws_connect(ws_url, headers=headers)
            
            initial_response = await self._ws.receive_json()
            
            if initial_response.get("type") != "transcription_session.created":
                raise Exception(f"Expected session creation, got: {initial_response}")
            
            await self._ws.send_json(config)
            
            update_response = await self._ws.receive_json()
            
            if update_response.get("type") != "transcription_session.updated":
                raise Exception(f"Configuration update failed: {update_response}")
            
        except Exception as e:
            print(f"Error connecting to WebSocket: {str(e)}")
            if self._ws:
                await self._ws.close()
                self._ws = None
            raise
        
    def _handle_ws_message(self, msg: dict) -> list[STTResponse]:
        """Handle incoming WebSocket messages and generate STT responses"""
        responses = []
        
        try:
            msg_type = msg.get("type")
            if msg_type == "conversation.item.input_audio_transcription.delta":
                delta = msg.get("delta", "")
                if delta:
                    self._current_text += delta
                    current_time = asyncio.get_event_loop().time()
                    
                    if current_time - self._last_interim_at > 0.5:
                        responses.append(STTResponse(
                            event_type=SpeechEventType.INTERIM,
                            data=SpeechData(
                                text=self._current_text,
                                language=self.language,
                            ),
                            metadata={"model": self.model}
                        ))
                        self._last_interim_at = current_time
                        
            elif msg_type == "conversation.item.input_audio_transcription.completed":
                transcript = msg.get("transcript", "")
                if transcript:
                    responses.append(STTResponse(
                        event_type=SpeechEventType.FINAL,
                        data=SpeechData(
                            text=transcript,
                            language=self.language,
                        ),
                        metadata={"model": self.model}
                    ))
                    self._current_text = ""
            
            elif msg_type == "input_audio_buffer.speech_started":
                global_event_emitter.emit("speech_started")
            
            elif msg_type == "input_audio_buffer.speech_stopped":
                global_event_emitter.emit("speech_stopped")
                
        except Exception as e:
            print(f"Error handling WebSocket message: {str(e)}")
        
        return responses

    async def aclose(self) -> None:
        """Cleanup resources"""
        self._audio_buffer.clear()
        
        if self._ws_task:
            self._ws_task.cancel()
            try:
                await self._ws_task
            except asyncio.CancelledError:
                pass
            self._ws_task = None
            
        if self._ws:
            await self._ws.close()
            self._ws = None
            
        if self._session:
            await self._session.close()
            self._session = None
            
        await self.client.close()

    async def _ensure_ws_connection(self):
        """Ensure WebSocket is connected, reconnect if necessary"""
        if not self._ws or self._ws.closed:
            await self._connect_ws()

Base class for Speech-to-Text implementations

Initialize the OpenAI STT plugin.

Args

api_key : Optional[str], optional
OpenAI API key. Defaults to None.
model : str
The model to use for the STT plugin. Defaults to "whisper-1".
base_url : Optional[str], optional
The base URL for the OpenAI API. Defaults to None.
prompt : Optional[str], optional
The prompt for the STT plugin. Defaults to None.
language : str
The language to use for the STT plugin. Defaults to "en".
turn_detection : dict | None
The turn detection for the STT plugin. Defaults to None.

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Static methods

def azure(*,
model: str = 'gpt-4o-mini-transcribe',
language: str = 'en',
prompt: str | None = None,
turn_detection: dict | None = None,
azure_endpoint: str | None = None,
azure_deployment: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
azure_ad_token: str | None = None,
organization: str | None = None,
project: str | None = None,
base_url: str | None = None,
enable_streaming: bool = False,
timeout: httpx.Timeout | None = None) ‑> OpenAISTT
Expand source code
@staticmethod
def azure(
    *,
    model: str = "gpt-4o-mini-transcribe",
    language: str = "en",
    prompt: str | None = None,
    turn_detection: dict | None = None,
    azure_endpoint: str | None = None,
    azure_deployment: str | None = None,
    api_version: str | None = None,
    api_key: str | None = None,
    azure_ad_token: str | None = None,
    organization: str | None = None,
    project: str | None = None,
    base_url: str | None = None,
    enable_streaming: bool = False,
    timeout: httpx.Timeout | None = None,
) -> "OpenAISTT":
    """
    Create a new instance of Azure OpenAI STT.

    This automatically infers the following arguments from their corresponding environment variables if they are not provided:
    - `api_key` from `AZURE_OPENAI_API_KEY`
    - `organization` from `OPENAI_ORG_ID`
    - `project` from `OPENAI_PROJECT_ID`
    - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN`
    - `api_version` from `OPENAI_API_VERSION`
    - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT`
    - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name)
    """
    
    # Get values from environment variables if not provided
    azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT")
    azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT")
    api_version = api_version or os.getenv("OPENAI_API_VERSION")
    api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY")
    azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN")
    organization = organization or os.getenv("OPENAI_ORG_ID")
    project = project or os.getenv("OPENAI_PROJECT_ID")
    
    # If azure_deployment is not provided, use model as the deployment name
    if not azure_deployment:
        azure_deployment = model
    
    if not azure_endpoint:
        raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable")
    
    if not api_key and not azure_ad_token:
        raise ValueError("Either API key or Azure AD token must be provided")
    
    azure_client = openai.AsyncAzureOpenAI(
        max_retries=0,
        azure_endpoint=azure_endpoint,
        azure_deployment=azure_deployment,
        api_version=api_version,
        api_key=api_key,
        azure_ad_token=azure_ad_token,
        organization=organization,
        project=project,
        base_url=base_url,
        timeout=timeout
        if timeout
        else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0),
    )
    
    instance = OpenAISTT(
        model=model,
        language=language,
        prompt=prompt,
        turn_detection=turn_detection,
        enable_streaming=enable_streaming,
    )
    instance.client = azure_client
    return instance

Create a new instance of Azure OpenAI STT.

This automatically infers the following arguments from their corresponding environment variables if they are not provided: - api_key from AZURE_OPENAI_API_KEY - organization from OPENAI_ORG_ID - project from OPENAI_PROJECT_ID - azure_ad_token from AZURE_OPENAI_AD_TOKEN - api_version from OPENAI_API_VERSION - azure_endpoint from AZURE_OPENAI_ENDPOINT - azure_deployment from AZURE_OPENAI_DEPLOYMENT (if not provided, uses model as deployment name)

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    self._audio_buffer.clear()
    
    if self._ws_task:
        self._ws_task.cancel()
        try:
            await self._ws_task
        except asyncio.CancelledError:
            pass
        self._ws_task = None
        
    if self._ws:
        await self._ws.close()
        self._ws = None
        
    if self._session:
        await self._session.close()
        self._session = None
        
    await self.client.close()

Cleanup resources

async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None
Expand source code
async def process_audio(
    self,
    audio_frames: bytes,
    language: Optional[str] = None,
    **kwargs: Any
) -> None:
    """Process audio frames and send to OpenAI based on enabled mode"""
    
    if not self.enable_streaming:
        await self._transcribe_non_streaming(audio_frames)
        return
    
    if not self._ws:
        await self._connect_ws()
        self._ws_task = asyncio.create_task(self._listen_for_responses())
        
    try:
        audio_data = np.frombuffer(audio_frames, dtype=np.int16)
        audio_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate))
        audio_data = audio_data.astype(np.int16).tobytes()
        audio_data = base64.b64encode(audio_data).decode("utf-8")
        message = {
            "type": "input_audio_buffer.append",
            "audio": audio_data,
        }
        await self._ws.send_json(message)
    except Exception as e:
        print(f"Error in process_audio: {str(e)}")
        self.emit("error", str(e))
        if self._ws:
            await self._ws.close()
            self._ws = None
            if self._ws_task:
                self._ws_task.cancel()
                self._ws_task = None

Process audio frames and send to OpenAI based on enabled mode

class OpenAITTS (*,
api_key: str | None = None,
model: str = 'gpt-4o-mini-tts',
voice: str = 'ash',
speed: float = 1.0,
instructions: str | None = None,
base_url: str | None = None,
response_format: str = 'pcm')
Expand source code
class OpenAITTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = DEFAULT_MODEL,
        voice: str = DEFAULT_VOICE,
        speed: float = 1.0,
        instructions: str | None = None,
        base_url: str | None = None,
        response_format: str = "pcm",
    ) -> None:
        """Initialize the OpenAI TTS plugin.

        Args:
            api_key (Optional[str], optional): OpenAI API key. Defaults to None.
            model (str): The model to use for the TTS plugin. Defaults to "gpt-4o-mini-tts".
            voice (str): The voice to use for the TTS plugin. Defaults to "ash".
            speed (float): The speed to use for the TTS plugin. Defaults to 1.0.
            instructions (Optional[str], optional): Additional instructions for the TTS plugin. Defaults to None.
            base_url (Optional[str], optional): Custom base URL for the OpenAI API. Defaults to None.
            response_format (str): The response format to use for the TTS plugin. Defaults to "pcm".
        """
        super().__init__(sample_rate=OPENAI_TTS_SAMPLE_RATE, num_channels=OPENAI_TTS_CHANNELS)

        self.model = model
        self.voice = voice
        self.speed = speed
        self.instructions = instructions
        self.audio_track = None
        self.loop = None
        self.response_format = response_format
        self._first_chunk_sent = False
        self._current_synthesis_task: asyncio.Task | None = None
        self._interrupted = False

        self.api_key = api_key or os.getenv("OPENAI_API_KEY")
        if not self.api_key:
            raise ValueError(
                "OpenAI API key must be provided either through api_key parameter or OPENAI_API_KEY environment variable")

        self._client = openai.AsyncClient(
            max_retries=0,
            api_key=self.api_key,
            base_url=base_url or None,
            http_client=httpx.AsyncClient(
                timeout=httpx.Timeout(
                    connect=15.0, read=5.0, write=5.0, pool=5.0),
                follow_redirects=True,
                limits=httpx.Limits(
                    max_connections=50,
                    max_keepalive_connections=50,
                    keepalive_expiry=120,
                ),
            ),
        )

    @staticmethod
    def azure(
        *,
        model: str = DEFAULT_MODEL,
        voice: str = DEFAULT_VOICE,
        speed: float = 1.0,
        instructions: str | None = None,
        azure_endpoint: str | None = None,
        azure_deployment: str | None = None,
        api_version: str | None = None,
        api_key: str | None = None,
        azure_ad_token: str | None = None,
        organization: str | None = None,
        project: str | None = None,
        base_url: str | None = None,
        response_format: str = "pcm",
        timeout: httpx.Timeout | None = None,
    ) -> "OpenAITTS":
        """
        Create a new instance of Azure OpenAI TTS.

        This automatically infers the following arguments from their corresponding environment variables if they are not provided:
        - `api_key` from `AZURE_OPENAI_API_KEY`
        - `organization` from `OPENAI_ORG_ID`
        - `project` from `OPENAI_PROJECT_ID`
        - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN`
        - `api_version` from `OPENAI_API_VERSION`
        - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT`
        - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name)
        """
        
        azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT")
        azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT")
        api_version = api_version or os.getenv("OPENAI_API_VERSION")
        api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY")
        azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN")
        organization = organization or os.getenv("OPENAI_ORG_ID")
        project = project or os.getenv("OPENAI_PROJECT_ID")
        
        if not azure_deployment:
            azure_deployment = model
        
        if not azure_endpoint:
            raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable")
        
        if not api_key and not azure_ad_token:
            raise ValueError("Either API key or Azure AD token must be provided")
        
        azure_client = openai.AsyncAzureOpenAI(
            max_retries=0,
            azure_endpoint=azure_endpoint,
            azure_deployment=azure_deployment,
            api_version=api_version,
            api_key=api_key,
            azure_ad_token=azure_ad_token,
            organization=organization,
            project=project,
            base_url=base_url,
            timeout=timeout
            if timeout
            else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0),
        )
        
        instance = OpenAITTS(
            model=model,
            voice=voice,
            speed=speed,
            instructions=instructions,
            response_format=response_format,
        )
        instance._client = azure_client
        return instance

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        """
        Convert text to speech using OpenAI's TTS API and stream to audio track

        Args:
            text: Text to convert to speech
            voice_id: Optional voice override
            **kwargs: Additional provider-specific arguments
        """
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            self._interrupted = False

            if isinstance(text, AsyncIterator):
                async for segment in segment_text(text):
                    if self._interrupted:
                        break
                    await self._synthesize_segment(segment, voice_id, **kwargs)
            else:
                if not self._interrupted:
                    await self._synthesize_segment(text, voice_id, **kwargs)

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")

    async def _synthesize_segment(self, text: str, voice_id: Optional[str] = None, **kwargs: Any) -> None:
        """Synthesize a single text segment"""
        if not text.strip() or self._interrupted:
            return

        try:
            audio_data = b""
            async with self._client.audio.speech.with_streaming_response.create(
                model=self.model,
                voice=voice_id or self.voice,
                input=text,
                speed=self.speed,
                response_format=self.response_format,
                **({"instructions": self.instructions} if self.instructions else {}),
            ) as response:
                async for chunk in response.iter_bytes():
                    if self._interrupted:
                        break
                    if chunk:
                        audio_data += chunk

            if audio_data and not self._interrupted:
                await self._stream_audio_chunks(audio_data)

        except Exception as e:
            if not self._interrupted:
                self.emit("error", f"Segment synthesis failed: {str(e)}")

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        """Stream audio data in chunks for smooth playback"""
        chunk_size = int(OPENAI_TTS_SAMPLE_RATE *
                         OPENAI_TTS_CHANNELS * 2 * 20 / 1000)

        for i in range(0, len(audio_bytes), chunk_size):
            chunk = audio_bytes[i:i + chunk_size]

            if len(chunk) < chunk_size and len(chunk) > 0:
                padding_needed = chunk_size - len(chunk)
                chunk += b'\x00' * padding_needed

            if len(chunk) == chunk_size:
                if not self._first_chunk_sent and self._first_audio_callback:
                    self._first_chunk_sent = True
                    await self._first_audio_callback()

                asyncio.create_task(self.audio_track.add_new_bytes(chunk))
                await asyncio.sleep(0.001)

    async def aclose(self) -> None:
        """Cleanup resources"""
        await self._client.close()
        await super().aclose()

    async def interrupt(self) -> None:
        """Interrupt TTS synthesis"""
        self._interrupted = True
        if self._current_synthesis_task:
            self._current_synthesis_task.cancel()
        if self.audio_track:
            self.audio_track.interrupt()

Base class for Text-to-Speech implementations

Initialize the OpenAI TTS plugin.

Args

api_key : Optional[str], optional
OpenAI API key. Defaults to None.
model : str
The model to use for the TTS plugin. Defaults to "gpt-4o-mini-tts".
voice : str
The voice to use for the TTS plugin. Defaults to "ash".
speed : float
The speed to use for the TTS plugin. Defaults to 1.0.
instructions : Optional[str], optional
Additional instructions for the TTS plugin. Defaults to None.
base_url : Optional[str], optional
Custom base URL for the OpenAI API. Defaults to None.
response_format : str
The response format to use for the TTS plugin. Defaults to "pcm".

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Static methods

def azure(*,
model: str = 'gpt-4o-mini-tts',
voice: str = 'ash',
speed: float = 1.0,
instructions: str | None = None,
azure_endpoint: str | None = None,
azure_deployment: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
azure_ad_token: str | None = None,
organization: str | None = None,
project: str | None = None,
base_url: str | None = None,
response_format: str = 'pcm',
timeout: httpx.Timeout | None = None) ‑> OpenAITTS
Expand source code
@staticmethod
def azure(
    *,
    model: str = DEFAULT_MODEL,
    voice: str = DEFAULT_VOICE,
    speed: float = 1.0,
    instructions: str | None = None,
    azure_endpoint: str | None = None,
    azure_deployment: str | None = None,
    api_version: str | None = None,
    api_key: str | None = None,
    azure_ad_token: str | None = None,
    organization: str | None = None,
    project: str | None = None,
    base_url: str | None = None,
    response_format: str = "pcm",
    timeout: httpx.Timeout | None = None,
) -> "OpenAITTS":
    """
    Create a new instance of Azure OpenAI TTS.

    This automatically infers the following arguments from their corresponding environment variables if they are not provided:
    - `api_key` from `AZURE_OPENAI_API_KEY`
    - `organization` from `OPENAI_ORG_ID`
    - `project` from `OPENAI_PROJECT_ID`
    - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN`
    - `api_version` from `OPENAI_API_VERSION`
    - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT`
    - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name)
    """
    
    azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT")
    azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT")
    api_version = api_version or os.getenv("OPENAI_API_VERSION")
    api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY")
    azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN")
    organization = organization or os.getenv("OPENAI_ORG_ID")
    project = project or os.getenv("OPENAI_PROJECT_ID")
    
    if not azure_deployment:
        azure_deployment = model
    
    if not azure_endpoint:
        raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable")
    
    if not api_key and not azure_ad_token:
        raise ValueError("Either API key or Azure AD token must be provided")
    
    azure_client = openai.AsyncAzureOpenAI(
        max_retries=0,
        azure_endpoint=azure_endpoint,
        azure_deployment=azure_deployment,
        api_version=api_version,
        api_key=api_key,
        azure_ad_token=azure_ad_token,
        organization=organization,
        project=project,
        base_url=base_url,
        timeout=timeout
        if timeout
        else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0),
    )
    
    instance = OpenAITTS(
        model=model,
        voice=voice,
        speed=speed,
        instructions=instructions,
        response_format=response_format,
    )
    instance._client = azure_client
    return instance

Create a new instance of Azure OpenAI TTS.

This automatically infers the following arguments from their corresponding environment variables if they are not provided: - api_key from AZURE_OPENAI_API_KEY - organization from OPENAI_ORG_ID - project from OPENAI_PROJECT_ID - azure_ad_token from AZURE_OPENAI_AD_TOKEN - api_version from OPENAI_API_VERSION - azure_endpoint from AZURE_OPENAI_ENDPOINT - azure_deployment from AZURE_OPENAI_DEPLOYMENT (if not provided, uses model as deployment name)

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    await self._client.close()
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt TTS synthesis"""
    self._interrupted = True
    if self._current_synthesis_task:
        self._current_synthesis_task.cancel()
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt TTS synthesis

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[str] | str,
    voice_id: Optional[str] = None,
    **kwargs: Any,
) -> None:
    """
    Convert text to speech using OpenAI's TTS API and stream to audio track

    Args:
        text: Text to convert to speech
        voice_id: Optional voice override
        **kwargs: Additional provider-specific arguments
    """
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        self._interrupted = False

        if isinstance(text, AsyncIterator):
            async for segment in segment_text(text):
                if self._interrupted:
                    break
                await self._synthesize_segment(segment, voice_id, **kwargs)
        else:
            if not self._interrupted:
                await self._synthesize_segment(text, voice_id, **kwargs)

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")

Convert text to speech using OpenAI's TTS API and stream to audio track

Args

text
Text to convert to speech
voice_id
Optional voice override
**kwargs
Additional provider-specific arguments