Module videosdk.plugins.deepgram.tts

Classes

class DeepgramTTS (*,
api_key: str | None = None,
model: str = 'aura-2-thalia-en',
encoding: str = 'linear16',
sample_rate: int = 24000,
base_url: str = 'wss://api.deepgram.com/v1/speak',
**kwargs: Any)
Expand source code
class DeepgramTTS(TTS):
    def __init__(
            self,
            *,
            api_key: str | None = None,
            model: str = DEFAULT_MODEL,
            encoding: str = DEFAULT_ENCODING,
            sample_rate: int = DEEPGRAM_SAMPLE_RATE,
            base_url: str = API_BASE_URL,
            **kwargs: Any,
    ) -> None:
        super().__init__(sample_rate=sample_rate, num_channels=DEEPGRAM_CHANNELS)

        self.model = model
        self.encoding = encoding
        self.base_url = base_url
        self.audio_track = None
        self.loop = None
        self._ws_session: aiohttp.ClientSession | None = None
        self._ws_connection: aiohttp.ClientWebSocketResponse | None = None
        self._send_task: asyncio.Task | None = None
        self._recv_task: asyncio.Task | None = None
        self._should_stop = False
        self._first_chunk_sent = False
        self._connection_lock = asyncio.Lock()

        self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Deepgram API key must be provided either through the 'api_key' parameter or the DEEPGRAM_API_KEY environment variable."
            )

    def reset_first_audio_tracking(self) -> None:
        self._first_chunk_sent = False

    async def _ensure_connection(self) -> None:
        async with self._connection_lock:
            if self._ws_connection and not self._ws_connection.closed:
                return

            params = {
                "model": self.model,
                "encoding": self.encoding,
                "sample_rate": self.sample_rate,
            }
            param_string = "&".join([f"{k}={v}" for k, v in params.items()])
            full_ws_url = f"{self.base_url}?{param_string}"
            headers = {"Authorization": f"Token {self.api_key}"}

            self._ws_session = aiohttp.ClientSession()
            self._ws_connection = await asyncio.wait_for(
                self._ws_session.ws_connect(full_ws_url, headers=headers),
                timeout=50.0
            )
            if self._recv_task and not self._recv_task.done():
                self._recv_task.cancel()
            self._recv_task = asyncio.create_task(self._receive_audio_task())

    async def synthesize(
            self,
            text: AsyncIterator[str] | str,
            **kwargs: Any,
    ) -> None:
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            self._should_stop = True
            if self._send_task and not self._send_task.done():
                self._send_task.cancel()
                
            self._should_stop = False
            await self._stream_synthesis(text)

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")
            raise

    async def _stream_synthesis(self, text: Union[AsyncIterator[str], str]) -> None:
        try:
            await self._ensure_connection()
            self._send_task = asyncio.create_task(self._send_text_task(text))
            await self._send_task
        except Exception as e:
            self.emit("error", f"Streaming synthesis failed: {str(e)}")
            await self.aclose()
        finally:
            if self._send_task and not self._send_task.done():
                self._send_task.cancel()
            self._send_task = None

    async def _send_text_task(self, text: Union[AsyncIterator[str], str]) -> None:
        if not self._ws_connection or self._ws_connection.closed:
            return

        try:
            if isinstance(text, str):
                if not self._should_stop:
                    payload = {"type": "Speak", "text": text}
                    await self._ws_connection.send_json(payload)
            else:
                async for chunk in text:
                    if self._ws_connection.closed or self._should_stop:
                        break
                    payload = {"type": "Speak", "text": chunk}
                    await self._ws_connection.send_json(payload)

            if not self._ws_connection.closed and not self._should_stop:
                await self._ws_connection.send_json({"type": "Flush"})
        except asyncio.CancelledError:
            pass
        except Exception as e:
            if not self._should_stop:
                self.emit("error", f"Send task error: {str(e)}")

    async def _receive_audio_task(self) -> None:
        if not self._ws_connection:
            return

        try:
            while not self._ws_connection.closed:
                msg = await self._ws_connection.receive()

                if msg.type == aiohttp.WSMsgType.BINARY:
                    if not self._should_stop:
                        await self._stream_audio_chunks(msg.data)
                elif msg.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(msg.data)
                    if data.get('type') == 'Error' and not self._should_stop:
                        self.emit("error", f"Deepgram error: {data.get('description', 'Unknown error')}")
                        break
                elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING):
                    break
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    raise ConnectionError(f"WebSocket error: {self._ws_connection.exception()}")
        except asyncio.CancelledError:
            pass
        except Exception as e:
            if not self._should_stop:
                self.emit("error", f"Receive task error: {str(e)}")

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        if not audio_bytes or self._should_stop:
            return

        if self.audio_track and self.loop:
            await self.audio_track.add_new_bytes(audio_bytes)

    async def interrupt(self) -> None:
        self._should_stop = True

        if self.audio_track:
            self.audio_track.interrupt()

        if self._send_task and not self._send_task.done():
            self._send_task.cancel()


    async def aclose(self) -> None:
        self._should_stop = True

        for task in [self._send_task, self._recv_task]:
            if task and not task.done():
                task.cancel()

        if self._ws_connection and not self._ws_connection.closed:
            await self._ws_connection.close()
        if self._ws_session and not self._ws_session.closed:
            await self._ws_session.close()

        self._ws_connection = None
        self._ws_session = None

        await super().aclose()

Base class for Text-to-Speech implementations

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._should_stop = True

    for task in [self._send_task, self._recv_task]:
        if task and not task.done():
            task.cancel()

    if self._ws_connection and not self._ws_connection.closed:
        await self._ws_connection.close()
    if self._ws_session and not self._ws_session.closed:
        await self._ws_session.close()

    self._ws_connection = None
    self._ws_session = None

    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    self._should_stop = True

    if self.audio_track:
        self.audio_track.interrupt()

    if self._send_task and not self._send_task.done():
        self._send_task.cancel()

Interrupt the TTS process

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self, text: AsyncIterator[str] | str, **kwargs: Any) ‑> None
Expand source code
async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        **kwargs: Any,
) -> None:
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        self._should_stop = True
        if self._send_task and not self._send_task.done():
            self._send_task.cancel()
            
        self._should_stop = False
        await self._stream_synthesis(text)

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")
        raise

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None