Module videosdk.plugins.openai.tts
Classes
class OpenAITTS (*,
api_key: str | None = None,
model: str = 'gpt-4o-mini-tts',
voice: str | dict[str, str] = 'ash',
speed: float = 1.0,
instructions: str | None = None,
language: str | None = None,
base_url: str | None = None,
response_format: str = 'pcm',
chunked_synthesis: bool = False)-
Expand source code
class OpenAITTS(TTS): def __init__( self, *, api_key: str | None = None, model: str = DEFAULT_MODEL, voice: str | dict[str, str] = DEFAULT_VOICE, speed: float = 1.0, instructions: str | None = None, language: str | None = None, base_url: str | None = None, response_format: str = "pcm", chunked_synthesis: bool = False, ) -> None: """Initialize the OpenAI TTS plugin. Args: api_key (Optional[str], optional): OpenAI API key. Defaults to None. model (str): The model to use for the TTS plugin. Defaults to "gpt-4o-mini-tts". Built-in options: "gpt-4o-mini-tts" (recommended, supports instructions), "tts-1" (low latency), "tts-1-hd" (higher quality). voice (str | dict): Built-in voice name (e.g. "marin", "cedar", "ash", "coral") or a custom voice reference dict {"id": "voice_xxx"}. Defaults to "ash". For best quality with gpt-4o-mini-tts, use "marin" or "cedar". speed (float): The speed to use for the TTS plugin. Defaults to 1.0. instructions (Optional[str], optional): Natural-language style control ("Speak in a cheerful tone", accent hints, etc.). Only honored by gpt-4o-mini-tts; ignored by tts-1 / tts-1-hd. Defaults to None. language (Optional[str], optional): ISO language hint (e.g. "hi", "mr", "fr"). Useful for non-English input or with custom voices. Defaults to None. base_url (Optional[str], optional): Custom base URL for the OpenAI API. Defaults to None. response_format (str): The response format to use for the TTS plugin. Defaults to "pcm". chunked_synthesis (bool): When ``True``, dispatch one POST per ``FlushMarker`` boundary received from the upstream pipeline. When ``False`` (default), the entire LLM stream is accumulated into a single POST — better for prosody continuity and request economics. Set ``True`` only for very long utterances (>30s) where sub-sentence TTFB matters more than cross-sentence prosody. Defaults to False. """ super().__init__(sample_rate=OPENAI_TTS_SAMPLE_RATE, num_channels=OPENAI_TTS_CHANNELS) self.model = model self.voice = voice self.speed = speed self.instructions = instructions self.language = language self.audio_track = None self.loop = None self.response_format = response_format self.chunked_synthesis = chunked_synthesis self._first_chunk_sent = False self._current_synthesis_task: asyncio.Task | None = None self._interrupted = False self.api_key = api_key or os.getenv("OPENAI_API_KEY") if not self.api_key: raise ValueError( "OpenAI API key must be provided either through api_key parameter or OPENAI_API_KEY environment variable") self._client = openai.AsyncClient( max_retries=0, api_key=self.api_key, base_url=base_url or None, http_client=httpx.AsyncClient( timeout=httpx.Timeout( connect=15.0, read=5.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ), ) @staticmethod def azure( *, model: str = DEFAULT_MODEL, voice: str | dict[str, str] = DEFAULT_VOICE, speed: float = 1.0, instructions: str | None = None, language: str | None = None, azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, response_format: str = "pcm", chunked_synthesis: bool = False, timeout: httpx.Timeout | None = None, ) -> "OpenAITTS": """ Create a new instance of Azure OpenAI TTS. This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name) """ azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT") api_version = api_version or os.getenv("OPENAI_API_VERSION") api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN") organization = organization or os.getenv("OPENAI_ORG_ID") project = project or os.getenv("OPENAI_PROJECT_ID") if not azure_deployment: azure_deployment = model if not azure_endpoint: raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable") if not api_key and not azure_ad_token: raise ValueError("Either API key or Azure AD token must be provided") azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, organization=organization, project=project, base_url=base_url, timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), ) instance = OpenAITTS( model=model, voice=voice, speed=speed, instructions=instructions, language=language, response_format=response_format, chunked_synthesis=chunked_synthesis, ) instance._client = azure_client return instance def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str | dict[str, str]] = None, **kwargs: Any, ) -> None: """ Convert text to speech using OpenAI's TTS API and stream to audio track. Args: text: Text to convert to speech, or async iterator yielding ``str`` chunks and ``FlushMarker`` segment boundaries. voice_id: Optional voice override **kwargs: Additional provider-specific arguments """ try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") raise RuntimeError("Audio track or event loop not set") self._interrupted = False if isinstance(text, str): if not self._interrupted: await self._synthesize_segment(text, voice_id, **kwargs) return if self.chunked_synthesis: buf: list[str] = [] async for chunk in text: if self._interrupted: break if isinstance(chunk, FlushMarker): if buf: combined = "".join(buf) buf = [] if combined.strip(): await self._synthesize_segment(combined, voice_id, **kwargs) continue if chunk and chunk.strip(): buf.append(chunk) if buf and not self._interrupted: tail = "".join(buf) if tail.strip(): await self._synthesize_segment(tail, voice_id, **kwargs) return parts: list[str] = [] async for chunk in text: if self._interrupted: break if isinstance(chunk, FlushMarker): continue if chunk and chunk.strip(): parts.append(chunk) if parts and not self._interrupted: combined_text = "".join(parts) if combined_text.strip(): await self._synthesize_segment(combined_text, voice_id, **kwargs) except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") raise async def _synthesize_segment( self, text: str, voice_id: Optional[str | dict[str, str]] = None, **kwargs: Any, ) -> None: """Synthesize a single text segment. Streams audio frames to the audio track as they arrive from OpenAI's chunked HTTP response. Maintains a leftover buffer between iterations so partial bytes don't get silence-padded mid-stream — padding only applies to the final frame at end-of-response. """ if not text.strip() or self._interrupted: return # 20ms frame @ 24kHz, 16-bit, mono = 960 bytes frame_size = int( OPENAI_TTS_SAMPLE_RATE * OPENAI_TTS_CHANNELS * 2 * 20 / 1000 ) leftover = bytearray() try: async with self._client.audio.speech.with_streaming_response.create( model=self.model, voice=voice_id or self.voice, input=text, speed=self.speed, response_format=self.response_format, **({"instructions": self.instructions} if self.instructions else {}), **({"extra_body": {"language": self.language}} if self.language else {}), ) as response: async for chunk in response.iter_bytes(): if self._interrupted: break if not chunk: continue leftover.extend(chunk) # Emit complete 20ms frames as soon as they're available. while len(leftover) >= frame_size and not self._interrupted: frame = bytes(leftover[:frame_size]) del leftover[:frame_size] if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() asyncio.create_task(self.audio_track.add_new_bytes(frame)) await asyncio.sleep(0.001) # End of stream: zero-pad the final partial frame and emit. if leftover and not self._interrupted: frame = bytes(leftover) + b"\x00" * (frame_size - len(leftover)) if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() asyncio.create_task(self.audio_track.add_new_bytes(frame)) except Exception as e: if not self._interrupted: self.emit("error", f"Segment synthesis failed: {str(e)}") raise async def _stream_audio_chunks(self, audio_bytes: bytes) -> None: """Stream audio data in chunks for smooth playback""" chunk_size = int(OPENAI_TTS_SAMPLE_RATE * OPENAI_TTS_CHANNELS * 2 * 20 / 1000) for i in range(0, len(audio_bytes), chunk_size): chunk = audio_bytes[i:i + chunk_size] if len(chunk) < chunk_size and len(chunk) > 0: padding_needed = chunk_size - len(chunk) chunk += b'\x00' * padding_needed if len(chunk) == chunk_size: if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() asyncio.create_task(self.audio_track.add_new_bytes(chunk)) await asyncio.sleep(0.001) async def aclose(self) -> None: """Cleanup resources""" await self._client.close() await super().aclose() async def interrupt(self) -> None: """Interrupt TTS synthesis""" self._interrupted = True if self._current_synthesis_task: self._current_synthesis_task.cancel() if self.audio_track: self.audio_track.interrupt()Base class for Text-to-Speech implementations
Initialize the OpenAI TTS plugin.
Args
api_key:Optional[str], optional- OpenAI API key. Defaults to None.
model:str- The model to use for the TTS plugin. Defaults to "gpt-4o-mini-tts". Built-in options: "gpt-4o-mini-tts" (recommended, supports instructions), "tts-1" (low latency), "tts-1-hd" (higher quality).
voice:str | dict- Built-in voice name (e.g. "marin", "cedar", "ash", "coral") or a custom voice reference dict {"id": "voice_xxx"}. Defaults to "ash". For best quality with gpt-4o-mini-tts, use "marin" or "cedar".
speed:float- The speed to use for the TTS plugin. Defaults to 1.0.
instructions:Optional[str], optional- Natural-language style control ("Speak in a cheerful tone", accent hints, etc.). Only honored by gpt-4o-mini-tts; ignored by tts-1 / tts-1-hd. Defaults to None.
language:Optional[str], optional- ISO language hint (e.g. "hi", "mr", "fr"). Useful for non-English input or with custom voices. Defaults to None.
base_url:Optional[str], optional- Custom base URL for the OpenAI API. Defaults to None.
response_format:str- The response format to use for the TTS plugin. Defaults to "pcm".
chunked_synthesis:bool- When
True, dispatch one POST perFlushMarkerboundary received from the upstream pipeline. WhenFalse(default), the entire LLM stream is accumulated into a single POST — better for prosody continuity and request economics. SetTrueonly for very long utterances (>30s) where sub-sentence TTFB matters more than cross-sentence prosody. Defaults to False.
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Static methods
def azure(*,
model: str = 'gpt-4o-mini-tts',
voice: str | dict[str, str] = 'ash',
speed: float = 1.0,
instructions: str | None = None,
language: str | None = None,
azure_endpoint: str | None = None,
azure_deployment: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
azure_ad_token: str | None = None,
organization: str | None = None,
project: str | None = None,
base_url: str | None = None,
response_format: str = 'pcm',
chunked_synthesis: bool = False,
timeout: httpx.Timeout | None = None) ‑> OpenAITTS-
Expand source code
@staticmethod def azure( *, model: str = DEFAULT_MODEL, voice: str | dict[str, str] = DEFAULT_VOICE, speed: float = 1.0, instructions: str | None = None, language: str | None = None, azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, response_format: str = "pcm", chunked_synthesis: bool = False, timeout: httpx.Timeout | None = None, ) -> "OpenAITTS": """ Create a new instance of Azure OpenAI TTS. This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name) """ azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT") api_version = api_version or os.getenv("OPENAI_API_VERSION") api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN") organization = organization or os.getenv("OPENAI_ORG_ID") project = project or os.getenv("OPENAI_PROJECT_ID") if not azure_deployment: azure_deployment = model if not azure_endpoint: raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable") if not api_key and not azure_ad_token: raise ValueError("Either API key or Azure AD token must be provided") azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, organization=organization, project=project, base_url=base_url, timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), ) instance = OpenAITTS( model=model, voice=voice, speed=speed, instructions=instructions, language=language, response_format=response_format, chunked_synthesis=chunked_synthesis, ) instance._client = azure_client return instanceCreate a new instance of Azure OpenAI TTS.
This automatically infers the following arguments from their corresponding environment variables if they are not provided: -
api_keyfromAZURE_OPENAI_API_KEY-organizationfromOPENAI_ORG_ID-projectfromOPENAI_PROJECT_ID-azure_ad_tokenfromAZURE_OPENAI_AD_TOKEN-api_versionfromOPENAI_API_VERSION-azure_endpointfromAZURE_OPENAI_ENDPOINT-azure_deploymentfromAZURE_OPENAI_DEPLOYMENT(if not provided, usesmodelas deployment name)
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" await self._client.close() await super().aclose()Cleanup resources
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Interrupt TTS synthesis""" self._interrupted = True if self._current_synthesis_task: self._current_synthesis_task.cancel() if self.audio_track: self.audio_track.interrupt()Interrupt TTS synthesis
def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = FalseReset the first audio tracking state for next TTS task
async def synthesize(self,
text: AsyncIterator[Union[str, FlushMarker]] | str,
voice_id: Optional[str | dict[str, str]] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str | dict[str, str]] = None, **kwargs: Any, ) -> None: """ Convert text to speech using OpenAI's TTS API and stream to audio track. Args: text: Text to convert to speech, or async iterator yielding ``str`` chunks and ``FlushMarker`` segment boundaries. voice_id: Optional voice override **kwargs: Additional provider-specific arguments """ try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") raise RuntimeError("Audio track or event loop not set") self._interrupted = False if isinstance(text, str): if not self._interrupted: await self._synthesize_segment(text, voice_id, **kwargs) return if self.chunked_synthesis: buf: list[str] = [] async for chunk in text: if self._interrupted: break if isinstance(chunk, FlushMarker): if buf: combined = "".join(buf) buf = [] if combined.strip(): await self._synthesize_segment(combined, voice_id, **kwargs) continue if chunk and chunk.strip(): buf.append(chunk) if buf and not self._interrupted: tail = "".join(buf) if tail.strip(): await self._synthesize_segment(tail, voice_id, **kwargs) return parts: list[str] = [] async for chunk in text: if self._interrupted: break if isinstance(chunk, FlushMarker): continue if chunk and chunk.strip(): parts.append(chunk) if parts and not self._interrupted: combined_text = "".join(parts) if combined_text.strip(): await self._synthesize_segment(combined_text, voice_id, **kwargs) except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") raiseConvert text to speech using OpenAI's TTS API and stream to audio track.
Args
text- Text to convert to speech, or async iterator yielding
strchunks andFlushMarkersegment boundaries. voice_id- Optional voice override
**kwargs- Additional provider-specific arguments