Module videosdk.plugins.deepgram.tts
Classes
class DeepgramTTS (*,
api_key: str | None = None,
model: str = 'aura-2-andromeda-en',
encoding: str = 'linear16',
sample_rate: int = 24000,
base_url: str = 'wss://api.deepgram.com/v1/speak',
max_connection_age_sec: float = 300.0,
**kwargs: Any)-
Expand source code
class DeepgramTTS(TTS): def __init__( self, *, api_key: str | None = None, model: str = DEFAULT_MODEL, encoding: str = DEFAULT_ENCODING, sample_rate: int = DEEPGRAM_SAMPLE_RATE, base_url: str = API_BASE_URL, max_connection_age_sec: float = DEFAULT_CONNECTION_MAX_AGE_SEC, **kwargs: Any, ) -> None: """Initialize the Deepgram TTS plugin. Args: api_key: Deepgram API key. Falls back to ``DEEPGRAM_API_KEY`` env var. model: Deepgram TTS voice/model id. Defaults to ``aura-2-andromeda-en``. encoding: Output audio encoding. Defaults to ``linear16``. sample_rate: Output sample rate. base_url: WebSocket base URL. max_connection_age_sec: Refresh the WebSocket after this many seconds to avoid hitting Deepgram's idle/session limits. """ super().__init__(sample_rate=sample_rate, num_channels=DEEPGRAM_CHANNELS) self.model = model self.encoding = encoding self.base_url = base_url self.audio_track = None self.loop = None self._max_connection_age_sec = max_connection_age_sec self._ws_session: aiohttp.ClientSession | None = None self._ws_connection: aiohttp.ClientWebSocketResponse | None = None self._ws_connect_time: float = 0.0 self._connection_lock = asyncio.Lock() self._receive_task: asyncio.Task | None = None self._active_future: asyncio.Future[None] | None = None self._active_send_task: asyncio.Task | None = None self._interrupted = False self._first_chunk_sent = False self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY") if not self.api_key: raise ValueError( "Deepgram API key must be provided either through the 'api_key' parameter or the DEEPGRAM_API_KEY environment variable." ) def reset_first_audio_tracking(self) -> None: self._first_chunk_sent = False async def prewarm(self) -> None: """Pre-establish the Deepgram WebSocket so the first ``synthesize()`` call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly.""" try: await self._ensure_connection() except Exception as e: logger.warning(f"Deepgram TTS prewarm failed (non-fatal): {e}") async def _ensure_connection(self) -> None: async with self._connection_lock: now = asyncio.get_event_loop().time() if self._ws_connection and not self._ws_connection.closed: age = now - self._ws_connect_time if age < self._max_connection_age_sec: return logger.info(f"Refreshing Deepgram WebSocket (age={age:.1f}s)") await self._close_connection_locked() elif self._ws_connection or self._ws_session: await self._close_connection_locked() params = { "model": self.model, "encoding": self.encoding, "sample_rate": self.sample_rate, } param_string = "&".join([f"{k}={v}" for k, v in params.items()]) full_ws_url = f"{self.base_url}?{param_string}" headers = {"Authorization": f"Token {self.api_key}"} self._ws_session = aiohttp.ClientSession() self._ws_connection = await asyncio.wait_for( self._ws_session.ws_connect( full_ws_url, headers=headers, heartbeat=30.0 ), timeout=10.0, ) self._ws_connect_time = now self._receive_task = asyncio.create_task(self._receive_audio_task()) async def _close_connection_locked(self) -> None: if self._receive_task and not self._receive_task.done(): self._receive_task.cancel() try: await self._receive_task except (asyncio.CancelledError, Exception): pass self._receive_task = None if self._ws_connection and not self._ws_connection.closed: try: await self._ws_connection.close() except Exception: pass self._ws_connection = None if self._ws_session and not self._ws_session.closed: try: await self._ws_session.close() except Exception: pass self._ws_session = None async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """Synthesize text via Deepgram's streaming WebSocket API. Each ``FlushMarker`` in the input stream is forwarded to Deepgram as a ``{"type": "Flush"}`` message, letting the server emit audio for the completed sentence without waiting for end-of-stream. """ try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._interrupted = False await self._ensure_connection() if not self._ws_connection: raise RuntimeError("WebSocket connection is not available.") done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future() self._active_future = done_future send_task = asyncio.create_task(self._send_text_task(text, done_future)) self._active_send_task = send_task try: await done_future except asyncio.CancelledError: pass await send_task except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") raise finally: if self._active_future is done_future: self._active_future = None if self._active_send_task is send_task: self._active_send_task = None async def _send_text_task( self, text: Union[AsyncIterator[Union[str, FlushMarker]], str], done_future: asyncio.Future[None], ) -> None: if not self._ws_connection or self._ws_connection.closed: if not done_future.done(): done_future.set_exception(RuntimeError("WebSocket closed")) return has_sent = False try: if isinstance(text, str): if text and not self._interrupted: await self._ws_connection.send_json({"type": "Speak", "text": text}) has_sent = True else: async for chunk in text: if self._interrupted or self._ws_connection.closed: break if isinstance(chunk, FlushMarker): if has_sent: await self._ws_connection.send_json({"type": "Flush"}) continue if not chunk: continue await self._ws_connection.send_json({"type": "Speak", "text": chunk}) has_sent = True if has_sent and not self._interrupted and not self._ws_connection.closed: await self._ws_connection.send_json({"type": "Flush"}) if not done_future.done(): try: await asyncio.wait_for(asyncio.shield(done_future), timeout=30.0) except asyncio.TimeoutError: if not done_future.done(): done_future.set_result(None) except asyncio.CancelledError: pass except asyncio.CancelledError: if not done_future.done(): done_future.cancel() raise except Exception as e: if not self._interrupted: self.emit("error", f"Send task error: {str(e)}") if not done_future.done(): done_future.set_exception(e) async def _receive_audio_task(self) -> None: if not self._ws_connection: return try: while self._ws_connection and not self._ws_connection.closed: msg = await self._ws_connection.receive() if msg.type == aiohttp.WSMsgType.BINARY: fut = self._active_future if not self._interrupted and fut is not None and not fut.done(): await self._stream_audio_chunks(msg.data) elif msg.type == aiohttp.WSMsgType.TEXT: data = json.loads(msg.data) msg_type = data.get("type") if msg_type == "Error": err = data.get("description", "Unknown error") if not self._interrupted: self.emit("error", f"Deepgram error: {err}") fut = self._active_future if fut is not None and not fut.done(): fut.set_exception(RuntimeError(err)) elif msg_type == "Flushed": fut = self._active_future if fut is not None and not fut.done(): fut.set_result(None) elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING): break elif msg.type == aiohttp.WSMsgType.ERROR: raise ConnectionError(f"WebSocket error: {self._ws_connection.exception()}") except asyncio.CancelledError: raise except Exception as e: if not self._interrupted: self.emit("error", f"Receive task error: {str(e)}") fut = self._active_future if fut is not None and not fut.done(): fut.set_exception(e) async def _stream_audio_chunks(self, audio_bytes: bytes) -> None: if not audio_bytes or self._interrupted: return if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() if self.audio_track and self.loop: await self.audio_track.add_new_bytes(audio_bytes) async def interrupt(self) -> None: """Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; in-flight audio frames are dropped via the ``_active_future`` filter in the receive loop.""" self._interrupted = True if self.audio_track: self.audio_track.interrupt() if self._active_send_task and not self._active_send_task.done(): self._active_send_task.cancel() if self._active_future is not None and not self._active_future.done(): self._active_future.cancel() async def aclose(self) -> None: self._interrupted = True if self._active_send_task and not self._active_send_task.done(): self._active_send_task.cancel() async with self._connection_lock: await self._close_connection_locked() await super().aclose()Base class for Text-to-Speech implementations
Initialize the Deepgram TTS plugin.
Args
api_key- Deepgram API key. Falls back to
DEEPGRAM_API_KEYenv var. model- Deepgram TTS voice/model id. Defaults to
aura-2-andromeda-en. encoding- Output audio encoding. Defaults to
linear16. sample_rate- Output sample rate.
base_url- WebSocket base URL.
max_connection_age_sec- Refresh the WebSocket after this many seconds to avoid hitting Deepgram's idle/session limits.
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: self._interrupted = True if self._active_send_task and not self._active_send_task.done(): self._active_send_task.cancel() async with self._connection_lock: await self._close_connection_locked() await super().aclose()Cleanup resources
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; in-flight audio frames are dropped via the ``_active_future`` filter in the receive loop.""" self._interrupted = True if self.audio_track: self.audio_track.interrupt() if self._active_send_task and not self._active_send_task.done(): self._active_send_task.cancel() if self._active_future is not None and not self._active_future.done(): self._active_future.cancel()Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; in-flight audio frames are dropped via the
_active_futurefilter in the receive loop. async def prewarm(self) ‑> None-
Expand source code
async def prewarm(self) -> None: """Pre-establish the Deepgram WebSocket so the first ``synthesize()`` call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly.""" try: await self._ensure_connection() except Exception as e: logger.warning(f"Deepgram TTS prewarm failed (non-fatal): {e}")Pre-establish the Deepgram WebSocket so the first
synthesize()call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly. def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: self._first_chunk_sent = FalseReset the first audio tracking state for next TTS task
async def synthesize(self,
text: AsyncIterator[Union[str, FlushMarker]] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """Synthesize text via Deepgram's streaming WebSocket API. Each ``FlushMarker`` in the input stream is forwarded to Deepgram as a ``{"type": "Flush"}`` message, letting the server emit audio for the completed sentence without waiting for end-of-stream. """ try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._interrupted = False await self._ensure_connection() if not self._ws_connection: raise RuntimeError("WebSocket connection is not available.") done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future() self._active_future = done_future send_task = asyncio.create_task(self._send_text_task(text, done_future)) self._active_send_task = send_task try: await done_future except asyncio.CancelledError: pass await send_task except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") raise finally: if self._active_future is done_future: self._active_future = None if self._active_send_task is send_task: self._active_send_task = NoneSynthesize text via Deepgram's streaming WebSocket API.
Each
FlushMarkerin the input stream is forwarded to Deepgram as a{"type": "Flush"}message, letting the server emit audio for the completed sentence without waiting for end-of-stream.