Package videosdk.plugins.deepgram

Sub-modules

videosdk.plugins.deepgram.stt
videosdk.plugins.deepgram.stt_v2
videosdk.plugins.deepgram.tts

Classes

class DeepgramSTT (*,
api_key: str | None = None,
model: str = 'nova-2',
language: str = 'en-US',
interim_results: bool = True,
punctuate: bool = True,
smart_format: bool = True,
sample_rate: int = 48000,
endpointing: int = 50,
filler_words: bool = True,
base_url: str = 'wss://api.deepgram.com/v1/listen')
Expand source code
class DeepgramSTT(BaseSTT):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "nova-2",
        language: str = "en-US",
        interim_results: bool = True,
        punctuate: bool = True,
        smart_format: bool = True,
        sample_rate: int = 48000,
        endpointing: int = 50,
        filler_words: bool = True,
        base_url: str = "wss://api.deepgram.com/v1/listen",
    ) -> None:
        """Initialize the Deepgram STT plugin

        Args:
            api_key (str | None, optional): Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the STT plugin. Defaults to "nova-2".
            language (str): The language to use for the STT plugin. Defaults to "en-US".
            interim_results (bool): Whether to return interim results. Defaults to True.
            punctuate (bool): Whether to add punctuation. Defaults to True.
            smart_format (bool): Whether to use smart formatting. Defaults to True.
            sample_rate (int): Sample rate to use for the STT plugin. Defaults to 48000.
            endpointing (int): Endpointing threshold. Defaults to 50.
            filler_words (bool): Whether to include filler words. Defaults to True.
            base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v1/listen".
        """
        super().__init__()

        self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Deepgram API key must be provided either through api_key parameter or DEEPGRAM_API_KEY environment variable")

        self.model = model
        self.language = language
        self.sample_rate = sample_rate
        self.interim_results = interim_results
        self.punctuate = punctuate
        self.smart_format = smart_format
        self.endpointing = endpointing
        self.filler_words = filler_words
        self.base_url = base_url
        self._session: Optional[aiohttp.ClientSession] = None
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self._ws_task: Optional[asyncio.Task] = None
        self._last_speech_event_time = 0.0
        self._previous_speech_event_time = 0.0

    async def process_audio(
        self,
        audio_frames: bytes,
        language: Optional[str] = None,
        **kwargs: Any
    ) -> None:
        """Process audio frames and send to Deepgram's Streaming API"""

        if not self._ws:
            await self._connect_ws()
            self._ws_task = asyncio.create_task(self._listen_for_responses())

        try:
            await self._ws.send_bytes(audio_frames)
        except Exception as e:
            logger.error(f"Error in process_audio: {str(e)}")
            self.emit("error", str(e))
            if self._ws:
                await self._ws.close()
                self._ws = None
                if self._ws_task:
                    self._ws_task.cancel()
                    self._ws_task = None

    async def _listen_for_responses(self) -> None:
        """Background task to listen for WebSocket responses"""
        if not self._ws:
            return

        try:
            async for msg in self._ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = msg.json()
                    responses = self._handle_ws_message(data)
                    for response in responses:
                        if self._transcript_callback:
                            await self._transcript_callback(response)
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    logger.error(f"WebSocket error: {self._ws.exception()}")
                    self.emit(
                        "error", f"WebSocket error: {self._ws.exception()}")
                    break
        except Exception as e:
            logger.error(f"Error in WebSocket listener: {str(e)}")
            self.emit("error", f"Error in WebSocket listener: {str(e)}")
        finally:
            if self._ws:
                await self._ws.close()
                self._ws = None

    async def _connect_ws(self) -> None:
        """Establish WebSocket connection with Deepgram's Streaming API"""

        if not self._session:
            self._session = aiohttp.ClientSession()

        query_params = {
            "model": self.model,
            "language": self.language,
            "interim_results": str(self.interim_results).lower(),
            "punctuate": str(self.punctuate).lower(),
            "smart_format": str(self.smart_format).lower(),
            "encoding": "linear16",
            "sample_rate": str(self.sample_rate),
            "channels": 2,
            "endpointing": self.endpointing,
            "filler_words": str(self.filler_words).lower(),
            "vad_events": "true",
            "no_delay": "true",
        }
        headers = {
            "Authorization": f"Token {self.api_key}",
        }

        ws_url = f"{self.base_url}?{urlencode(query_params)}"

        try:
            self._ws = await self._session.ws_connect(ws_url, headers=headers)
        except Exception as e:
            logger.error(f"Error connecting to WebSocket: {str(e)}")
            raise

    def _handle_ws_message(self, msg: dict) -> list[STTResponse]:
        """Handle incoming WebSocket messages and generate STT responses"""
        responses = []
        try:
            if msg["type"] == "SpeechStarted":
                current_time = time.time()

                if self._last_speech_event_time == 0.0:
                    self._last_speech_event_time = current_time
                    return responses

                if current_time - self._last_speech_event_time < 1.0:
                    global_event_emitter.emit("speech_started")

                self._previous_speech_event_time = self._last_speech_event_time
                self._last_speech_event_time = current_time

            if msg["type"] == "Results":
                channel = msg["channel"]
                alternatives = channel["alternatives"]

                if alternatives and len(alternatives) > 0:
                    alt = alternatives[0]
                    is_final = msg["is_final"]
                    if alt["transcript"] == "":
                        return responses

                    response = STTResponse(
                        event_type=SpeechEventType.FINAL if is_final else SpeechEventType.INTERIM,
                        data=SpeechData(
                            text=alt["transcript"],
                            language=self.language,
                            confidence=alt.get("confidence", 0.0),
                            start_time=alt["words"][0]["start"] if alt["words"] else 0.0,
                            end_time=alt["words"][-1]["end"] if alt["words"] else 0.0,
                        ),
                        metadata={"model": self.model}
                    )
                    responses.append(response)

        except Exception as e:
            logger.error(f"Error handling WebSocket message: {str(e)}")

        return responses

    async def aclose(self) -> None:
        """Cleanup resources"""
        if self._ws_task:
            self._ws_task.cancel()
            logger.info("DeepgramSTT WebSocket task cancelled")
            try:
                await self._ws_task
            except asyncio.CancelledError:
                pass
            self._ws_task = None
            logger.info("DeepgramSTT WebSocket task cleared")

        if self._ws:
            await self._ws.close()
            logger.info("DeepgramSTT WebSocket closed")
            self._ws = None

        if self._session:
            await self._session.close()
            logger.info("DeepgramSTT cleaned up")
            self._session = None
        
        # Call base class cleanup
        await super().aclose()

Base class for Speech-to-Text implementations

Initialize the Deepgram STT plugin

Args

api_key : str | None, optional
Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the STT plugin. Defaults to "nova-2".
language : str
The language to use for the STT plugin. Defaults to "en-US".
interim_results : bool
Whether to return interim results. Defaults to True.
punctuate : bool
Whether to add punctuation. Defaults to True.
smart_format : bool
Whether to use smart formatting. Defaults to True.
sample_rate : int
Sample rate to use for the STT plugin. Defaults to 48000.
endpointing : int
Endpointing threshold. Defaults to 50.
filler_words : bool
Whether to include filler words. Defaults to True.
base_url : str
The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v1/listen".

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    if self._ws_task:
        self._ws_task.cancel()
        logger.info("DeepgramSTT WebSocket task cancelled")
        try:
            await self._ws_task
        except asyncio.CancelledError:
            pass
        self._ws_task = None
        logger.info("DeepgramSTT WebSocket task cleared")

    if self._ws:
        await self._ws.close()
        logger.info("DeepgramSTT WebSocket closed")
        self._ws = None

    if self._session:
        await self._session.close()
        logger.info("DeepgramSTT cleaned up")
        self._session = None
    
    # Call base class cleanup
    await super().aclose()

Cleanup resources

async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None
Expand source code
async def process_audio(
    self,
    audio_frames: bytes,
    language: Optional[str] = None,
    **kwargs: Any
) -> None:
    """Process audio frames and send to Deepgram's Streaming API"""

    if not self._ws:
        await self._connect_ws()
        self._ws_task = asyncio.create_task(self._listen_for_responses())

    try:
        await self._ws.send_bytes(audio_frames)
    except Exception as e:
        logger.error(f"Error in process_audio: {str(e)}")
        self.emit("error", str(e))
        if self._ws:
            await self._ws.close()
            self._ws = None
            if self._ws_task:
                self._ws_task.cancel()
                self._ws_task = None

Process audio frames and send to Deepgram's Streaming API

class DeepgramSTTV2 (*,
api_key: str | None = None,
model: str = 'flux-general-en',
input_sample_rate: int = 48000,
target_sample_rate: int = 16000,
eager_eot_threshold: float = 0.6,
eot_threshold: float = 0.8,
eot_timeout_ms: int = 7000,
base_url: str = 'wss://api.deepgram.com/v2/listen',
enable_preemptive_generation: bool = False)
Expand source code
class DeepgramSTTV2(BaseSTT):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "flux-general-en",
        input_sample_rate: int = 48000,
        target_sample_rate: int = 16000,
        eager_eot_threshold:float=0.6,
        eot_threshold:float=0.8,
        eot_timeout_ms:int=7000,
        base_url: str = "wss://api.deepgram.com/v2/listen",
        enable_preemptive_generation: bool = False,
    ) -> None:
        """Initialize the Deepgram STT plugin

        Args:
            api_key (str | None, optional): Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the STT plugin. Defaults to "flux-general-en".
            input_sample_rate (int): The input sample rate to use for the STT plugin. Defaults to 48000.
            target_sample_rate (int): The target sample rate to use for the STT plugin. Defaults to 16000.
            eager_eot_threshold (float): Eager end-of-turn threshold. Defaults to 0.6.
            eot_threshold (float): End-of-turn threshold. Defaults to 0.8.
            eot_timeout_ms (int): End-of-turn timeout in milliseconds. Defaults to 7000.
            base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v2/listen".
            enable_preemptive_generation (bool): Enable preemptive generation based on EagerEndOfTurn events. Defaults to False.
        """
        super().__init__()

        self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Deepgram API key must be provided either through api_key parameter or DEEPGRAM_API_KEY environment variable")

        self.model = model
        self.input_sample_rate = input_sample_rate
        self.target_sample_rate = target_sample_rate
        self.eager_eot_threshold = eager_eot_threshold
        self.eot_threshold=eot_threshold
        self.eot_timeout_ms = eot_timeout_ms
        self.base_url = base_url
        self.enable_preemptive_generation = enable_preemptive_generation

        self._stream_buffer = bytearray()
        self._target_chunk_size = int(0.1 * self.target_sample_rate * 2)
        self._min_chunk_size = int(0.05 * self.target_sample_rate * 2)

        self._session: Optional[aiohttp.ClientSession] = None
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self._ws_task: Optional[asyncio.Task] = None
        self._last_transcript: str = ""
        self._ws_task = None
    

    async def process_audio(
        self,
        audio_frames: bytes,
        **kwargs: Any
    ) -> None:
        """Process audio frames and send to Deeepgram's Flux API"""
        
        if not self._ws:
            await self._connect_ws()
            self._ws_task = asyncio.create_task(self._listen_for_responses())
            
        try:
            resampled_audio = self._resample_audio(audio_frames)
            if not resampled_audio:
                return
                
            self._stream_buffer.extend(resampled_audio)
             # chunk size 100ms
            while len(self._stream_buffer) >= self._target_chunk_size:
                chunk_to_send = bytes(self._stream_buffer[:self._target_chunk_size])
                self._stream_buffer = self._stream_buffer[self._target_chunk_size:]
                
                await self._ws.send_bytes(bytes(chunk_to_send))
                
        except Exception as e:
            logger.error(f"Error in process_audio: {str(e)}")
            self.emit("error", str(e))
            if self._ws:
                await self._ws.close()
                self._ws = None
                if self._ws_task:
                    self._ws_task.cancel()
                    self._ws_task = None

    async def _listen_for_responses(self) -> None:
        """Background task to listen for WebSocket responses"""
        if not self._ws:
            return

        try:
            async for msg in self._ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = msg.json()
                    responses = self._handle_ws_message(data)
                    for response in responses:
                        if self._transcript_callback:
                            await self._transcript_callback(response)
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    logger.error(f"WebSocket error: {self._ws.exception()}")
                    self.emit(
                        "error", f"WebSocket error: {self._ws.exception()}")
                    break
        except Exception as e:
            logger.error(f"Error in WebSocket listener: {str(e)}")
            self.emit("error", f"Error in WebSocket listener: {str(e)}")
        finally:
            if self._ws:
                await self._ws.close()
                self._ws = None

    async def _connect_ws(self) -> None:
        """Establish WebSocket connection with Deepgram's Streaming API"""
        if not self._session:
            self._session = aiohttp.ClientSession()

        query_params = {
            "model": self.model,
            "encoding": "linear16",
            "sample_rate": self.target_sample_rate,
            "eot_threshold": self.eot_threshold,
            "eot_timeout_ms": self.eot_timeout_ms,
            "eager_eot_threshold": self.eager_eot_threshold,
        }
        headers = {"Authorization": f"Token {self.api_key}"}
        ws_url = f"{self.base_url}?{urlencode(query_params)}"

        try:
            self._ws = await self._session.ws_connect(ws_url, headers=headers)
            logger.info("Connected to Deepgram V2 WebSocket.")
        except Exception as e:
            logger.error(f"Error connecting to WebSocket: {str(e)}")
            raise

    def _handle_ws_message(self, msg: dict) -> list[STTResponse]:
        """Handle incoming WebSocket messages and generate STT responses"""
        responses = []

        try:
            if msg.get("type") != "TurnInfo":
                return responses

            event = msg.get("event")
            transcript = msg.get("transcript", "")
            # logger.info(f"{event} and {transcript}")
            start_time = msg.get("audio_window_start", 0.0)
            end_time = msg.get("audio_window_end", 0.0)
            confidence = msg.get("end_of_turn_confidence", 0.0)

            self._last_transcript = transcript
            # Emit turn-related events
            if event == "StartOfTurn":
                global_event_emitter.emit("speech_started")
            elif event == "EagerEndOfTurn":
                # Handle EagerEndOfTurn for preemptive generation
                if self.enable_preemptive_generation and transcript and self._transcript_callback:
                    responses.append(
                        STTResponse(
                            event_type=SpeechEventType.PREFLIGHT,
                            data=SpeechData(
                                text=transcript,
                                confidence=confidence,
                                start_time=start_time,
                                end_time=end_time,
                            ),
                            metadata={"model": self.model},
                        )
                    )
            elif event == "EndOfTurn":
                logger.info(f"EndOfTurn (FINAL) Transcript: {transcript} and Confidence: {confidence}")
                global_event_emitter.emit("speech_stopped")
                if transcript and self._transcript_callback:
                    responses.append(
                        STTResponse(
                            event_type=SpeechEventType.FINAL,
                            data=SpeechData(
                                text=transcript,
                                confidence=confidence,
                                start_time=start_time,
                                end_time=end_time,
                            ),
                            metadata={"model": self.model},
                        )
                    )
            elif event == "TurnResumed":
                # Send interim to signal user continued speaking
                if self.enable_preemptive_generation and transcript:
                    responses.append(
                            STTResponse(
                                event_type=SpeechEventType.INTERIM,
                                data=SpeechData(
                                    text=transcript,
                                    confidence=confidence,
                                    start_time=start_time,
                                    end_time=end_time,
                                ),
                                metadata={"model": self.model, "turn_resumed": True},
                            )
                    )

        except Exception as e:
            logger.error(f"Error handling WebSocket message: {str(e)}")

        return responses
    
    def _resample_audio(self, audio_bytes: bytes) -> bytes:
        """Resample audio from input sample rate to target sample rate and convert to mono."""
        try:
            if not audio_bytes:
                return b''

            raw_audio = np.frombuffer(audio_bytes, dtype=np.int16)
            if raw_audio.size == 0:
                return b''

            if raw_audio.size % 2 == 0: 
                stereo_audio = raw_audio.reshape(-1, 2)
                mono_audio = stereo_audio.astype(np.float32).mean(axis=1)
            else:
                mono_audio = raw_audio.astype(np.float32)

            if self.input_sample_rate != self.target_sample_rate:
                target_length = int(len(mono_audio) * self.target_sample_rate / self.input_sample_rate)
                resampled_data = signal.resample(mono_audio, target_length)
            else:
                resampled_data = mono_audio

            resampled_data = np.clip(resampled_data, -32767, 32767)
            return resampled_data.astype(np.int16).tobytes()

        except Exception as e:
            logger.error(f"Error resampling audio: {e}")
            return b''


    async def aclose(self) -> None:
        """Cleanup resources"""
        
        if len(self._stream_buffer) >= self._min_chunk_size and self._ws:
            try:
                final_chunk = bytes(self._stream_buffer)
                await self._ws.send_bytes(final_chunk)
            except Exception as e:
                logger.error(f"Error sending final audio: {e}")
        
        if self._ws:
            try:
                await self._ws.send_str(json.dumps({"type": "Terminate"}))
                await asyncio.sleep(0.5)  
            except Exception as e:
                logger.error(f"Error sending termination: {e}")

        if self._ws_task:
            self._ws_task.cancel()
            try:
                await self._ws_task
            except asyncio.CancelledError:
                pass
            self._ws_task = None
            
        if self._ws:
            await self._ws.close()
            self._ws = None
            
        if self._session:
            await self._session.close()
            self._session = None
        await super().aclose()

Base class for Speech-to-Text implementations

Initialize the Deepgram STT plugin

Args

api_key : str | None, optional
Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the STT plugin. Defaults to "flux-general-en".
input_sample_rate : int
The input sample rate to use for the STT plugin. Defaults to 48000.
target_sample_rate : int
The target sample rate to use for the STT plugin. Defaults to 16000.
eager_eot_threshold : float
Eager end-of-turn threshold. Defaults to 0.6.
eot_threshold : float
End-of-turn threshold. Defaults to 0.8.
eot_timeout_ms : int
End-of-turn timeout in milliseconds. Defaults to 7000.
base_url : str
The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v2/listen".
enable_preemptive_generation : bool
Enable preemptive generation based on EagerEndOfTurn events. Defaults to False.

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    
    if len(self._stream_buffer) >= self._min_chunk_size and self._ws:
        try:
            final_chunk = bytes(self._stream_buffer)
            await self._ws.send_bytes(final_chunk)
        except Exception as e:
            logger.error(f"Error sending final audio: {e}")
    
    if self._ws:
        try:
            await self._ws.send_str(json.dumps({"type": "Terminate"}))
            await asyncio.sleep(0.5)  
        except Exception as e:
            logger.error(f"Error sending termination: {e}")

    if self._ws_task:
        self._ws_task.cancel()
        try:
            await self._ws_task
        except asyncio.CancelledError:
            pass
        self._ws_task = None
        
    if self._ws:
        await self._ws.close()
        self._ws = None
        
    if self._session:
        await self._session.close()
        self._session = None
    await super().aclose()

Cleanup resources

async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None
Expand source code
async def process_audio(
    self,
    audio_frames: bytes,
    **kwargs: Any
) -> None:
    """Process audio frames and send to Deeepgram's Flux API"""
    
    if not self._ws:
        await self._connect_ws()
        self._ws_task = asyncio.create_task(self._listen_for_responses())
        
    try:
        resampled_audio = self._resample_audio(audio_frames)
        if not resampled_audio:
            return
            
        self._stream_buffer.extend(resampled_audio)
         # chunk size 100ms
        while len(self._stream_buffer) >= self._target_chunk_size:
            chunk_to_send = bytes(self._stream_buffer[:self._target_chunk_size])
            self._stream_buffer = self._stream_buffer[self._target_chunk_size:]
            
            await self._ws.send_bytes(bytes(chunk_to_send))
            
    except Exception as e:
        logger.error(f"Error in process_audio: {str(e)}")
        self.emit("error", str(e))
        if self._ws:
            await self._ws.close()
            self._ws = None
            if self._ws_task:
                self._ws_task.cancel()
                self._ws_task = None

Process audio frames and send to Deeepgram's Flux API

class DeepgramTTS (*,
api_key: str | None = None,
model: str = 'aura-2-thalia-en',
encoding: str = 'linear16',
sample_rate: int = 24000,
base_url: str = 'wss://api.deepgram.com/v1/speak',
**kwargs: Any)
Expand source code
class DeepgramTTS(TTS):
    def __init__(
            self,
            *,
            api_key: str | None = None,
            model: str = DEFAULT_MODEL,
            encoding: str = DEFAULT_ENCODING,
            sample_rate: int = DEEPGRAM_SAMPLE_RATE,
            base_url: str = API_BASE_URL,
            **kwargs: Any,
    ) -> None:
        super().__init__(sample_rate=sample_rate, num_channels=DEEPGRAM_CHANNELS)

        self.model = model
        self.encoding = encoding
        self.base_url = base_url
        self.audio_track = None
        self.loop = None
        self._ws_session: aiohttp.ClientSession | None = None
        self._ws_connection: aiohttp.ClientWebSocketResponse | None = None
        self._send_task: asyncio.Task | None = None
        self._recv_task: asyncio.Task | None = None
        self._should_stop = False
        self._first_chunk_sent = False
        self._connection_lock = asyncio.Lock()

        self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Deepgram API key must be provided either through the 'api_key' parameter or the DEEPGRAM_API_KEY environment variable."
            )

    def reset_first_audio_tracking(self) -> None:
        self._first_chunk_sent = False

    async def _ensure_connection(self) -> None:
        async with self._connection_lock:
            if self._ws_connection and not self._ws_connection.closed:
                return

            params = {
                "model": self.model,
                "encoding": self.encoding,
                "sample_rate": self.sample_rate,
            }
            param_string = "&".join([f"{k}={v}" for k, v in params.items()])
            full_ws_url = f"{self.base_url}?{param_string}"
            headers = {"Authorization": f"Token {self.api_key}"}

            self._ws_session = aiohttp.ClientSession()
            self._ws_connection = await asyncio.wait_for(
                self._ws_session.ws_connect(full_ws_url, headers=headers),
                timeout=50.0
            )
            if self._recv_task and not self._recv_task.done():
                self._recv_task.cancel()
            self._recv_task = asyncio.create_task(self._receive_audio_task())

    async def synthesize(
            self,
            text: AsyncIterator[str] | str,
            **kwargs: Any,
    ) -> None:
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            self._should_stop = True
            if self._send_task and not self._send_task.done():
                self._send_task.cancel()
                
            self._should_stop = False
            await self._stream_synthesis(text)

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")
            raise

    async def _stream_synthesis(self, text: Union[AsyncIterator[str], str]) -> None:
        try:
            await self._ensure_connection()
            self._send_task = asyncio.create_task(self._send_text_task(text))
            await self._send_task
        except Exception as e:
            self.emit("error", f"Streaming synthesis failed: {str(e)}")
            await self.aclose()
        finally:
            if self._send_task and not self._send_task.done():
                self._send_task.cancel()
            self._send_task = None

    async def _send_text_task(self, text: Union[AsyncIterator[str], str]) -> None:
        if not self._ws_connection or self._ws_connection.closed:
            return

        try:
            if isinstance(text, str):
                if not self._should_stop:
                    payload = {"type": "Speak", "text": text}
                    await self._ws_connection.send_json(payload)
            else:
                async for chunk in text:
                    if self._ws_connection.closed or self._should_stop:
                        break
                    payload = {"type": "Speak", "text": chunk}
                    await self._ws_connection.send_json(payload)

            if not self._ws_connection.closed and not self._should_stop:
                await self._ws_connection.send_json({"type": "Flush"})
        except asyncio.CancelledError:
            pass
        except Exception as e:
            if not self._should_stop:
                self.emit("error", f"Send task error: {str(e)}")

    async def _receive_audio_task(self) -> None:
        if not self._ws_connection:
            return

        try:
            while not self._ws_connection.closed:
                msg = await self._ws_connection.receive()

                if msg.type == aiohttp.WSMsgType.BINARY:
                    if not self._should_stop:
                        await self._stream_audio_chunks(msg.data)
                elif msg.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(msg.data)
                    if data.get('type') == 'Error' and not self._should_stop:
                        self.emit("error", f"Deepgram error: {data.get('description', 'Unknown error')}")
                        break
                elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING):
                    break
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    raise ConnectionError(f"WebSocket error: {self._ws_connection.exception()}")
        except asyncio.CancelledError:
            pass
        except Exception as e:
            if not self._should_stop:
                self.emit("error", f"Receive task error: {str(e)}")

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        if not audio_bytes or self._should_stop:
            return

        if self.audio_track and self.loop:
            await self.audio_track.add_new_bytes(audio_bytes)

    async def interrupt(self) -> None:
        self._should_stop = True

        if self.audio_track:
            self.audio_track.interrupt()

        if self._send_task and not self._send_task.done():
            self._send_task.cancel()


    async def aclose(self) -> None:
        self._should_stop = True

        for task in [self._send_task, self._recv_task]:
            if task and not task.done():
                task.cancel()

        if self._ws_connection and not self._ws_connection.closed:
            await self._ws_connection.close()
        if self._ws_session and not self._ws_session.closed:
            await self._ws_session.close()

        self._ws_connection = None
        self._ws_session = None

        await super().aclose()

Base class for Text-to-Speech implementations

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._should_stop = True

    for task in [self._send_task, self._recv_task]:
        if task and not task.done():
            task.cancel()

    if self._ws_connection and not self._ws_connection.closed:
        await self._ws_connection.close()
    if self._ws_session and not self._ws_session.closed:
        await self._ws_session.close()

    self._ws_connection = None
    self._ws_session = None

    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    self._should_stop = True

    if self.audio_track:
        self.audio_track.interrupt()

    if self._send_task and not self._send_task.done():
        self._send_task.cancel()

Interrupt the TTS process

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self, text: AsyncIterator[str] | str, **kwargs: Any) ‑> None
Expand source code
async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        **kwargs: Any,
) -> None:
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        self._should_stop = True
        if self._send_task and not self._send_task.done():
            self._send_task.cancel()
            
        self._should_stop = False
        await self._stream_synthesis(text)

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")
        raise

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None