Package videosdk.plugins.deepgram

Sub-modules

videosdk.plugins.deepgram.stt
videosdk.plugins.deepgram.stt_v2
videosdk.plugins.deepgram.tts

Classes

class DeepgramSTT (*,
api_key: str | None = None,
model: str = 'nova-2',
language: str = 'en-US',
interim_results: bool = True,
punctuate: bool = True,
smart_format: bool = True,
sample_rate: int = 48000,
endpointing: int = 50,
filler_words: bool = True,
keywords: list[str] | None = None,
keyterm: list[str] | None = None,
profanity_filter: bool = False,
numerals: bool = False,
tag: Union[str, List[str]] | None = None,
enable_diarization: bool = False,
base_url: str = 'wss://api.deepgram.com/v1/listen')
Expand source code
class DeepgramSTT(BaseSTT):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "nova-2",
        language: str = "en-US",
        interim_results: bool = True,
        punctuate: bool = True,
        smart_format: bool = True,
        sample_rate: int = 48000,
        endpointing: int = 50,
        filler_words: bool = True,
        keywords: list[str] | None = None,
        keyterm: list[str] | None = None,
        profanity_filter: bool = False,
        numerals:bool=False,
        tag:Union[str,List[str]]|None = None,
        enable_diarization:bool=False,
        base_url: str = "wss://api.deepgram.com/v1/listen",
    ) -> None:
        """Initialize the Deepgram STT plugin

        Args:
            api_key (str | None, optional): Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the STT plugin. Defaults to "nova-2". Use "nova-3" or "nova-3-general" for Nova-3.
            language (str): The language to use for the STT plugin. Defaults to "en-US".
            interim_results (bool): Whether to return interim results. Defaults to True.
            punctuate (bool): Whether to add punctuation. Defaults to True.
            smart_format (bool): Whether to use smart formatting. Defaults to True.
            sample_rate (int): Sample rate to use for the STT plugin. Defaults to 48000.
            endpointing (int): Endpointing threshold. Defaults to 50, set 0 to make false.
            filler_words (bool): Whether to include filler words. Defaults to True.
            keywords (list[str] | None): Optional keywords for boosting/suppression. Only for Nova-2, Nova-1, Enhanced, Base.
                Each entry is a keyword or "keyword:intensifier" (e.g. "snuffleupagus:5", "kansas:-10"). Max 100. Defaults to None.
            keyterm (list[str] | None): Optional keyterms/phrases for Keyterm Prompting. Only for Nova-3 (e.g. model="nova-3").
                Each entry is a keyterm or phrase (e.g. "tretinoin", "customer service"). Max 500 tokens total. Defaults to None.
            profanity_filter: Whether to filter profanity from the transcription. Defaults to False.
            numerals: Whether to include numerals in the transcription. Defaults to False.
            tag: List of tags to add to the requests for usage reporting. Defaults to None.
            enable_diarization: Diarize recognizes speaker changes and assigns a speaker to each word in the transcript.
            base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v1/listen".
        """
        super().__init__()

        self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Deepgram API key must be provided either through api_key parameter or DEEPGRAM_API_KEY environment variable")

        self.model = model
        _is_nova3 = model == "nova-3" or model.startswith("nova-3-")
        if _is_nova3 and keywords:
            raise ValueError(
                "Keywords are not supported for Nova-3. Use keyterm=... for Keyterm Prompting instead."
            )
        self.language = language
        self.sample_rate = sample_rate
        self.interim_results = interim_results
        self.punctuate = punctuate
        self.smart_format = smart_format
        self.endpointing = endpointing
        self.filler_words = filler_words
        self.keywords = keywords
        self.keyterm = keyterm
        self.profanity_filter = profanity_filter
        self.numerals = numerals
        self.tag = tag
        self.enable_diarization= enable_diarization
        self.base_url = base_url
        self._session: Optional[aiohttp.ClientSession] = None
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self._ws_task: Optional[asyncio.Task] = None
        self._last_speech_event_time = 0.0
        self._previous_speech_event_time = 0.0
        self._closed = False

    async def process_audio(
        self,
        audio_frames: bytes,
        language: Optional[str] = None,
        **kwargs: Any
    ) -> None:
        """Process audio frames and send to Deepgram's Streaming API"""
        if self._closed:
            return

        if not self._ws:
            await self._connect_ws()
            self._ws_task = asyncio.create_task(self._listen_for_responses())

        try:
            await self._ws.send_bytes(audio_frames)
        except Exception as e:
            logger.error(f"Error in process_audio: {str(e)}")
            self.emit("error", str(e))
            if self._ws:
                await self._ws.close()
                self._ws = None
                if self._ws_task:
                    self._ws_task.cancel()
                    self._ws_task = None

    async def _listen_for_responses(self) -> None:
        """Background task to listen for WebSocket responses"""
        if not self._ws:
            return

        try:
            async for msg in self._ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = msg.json()
                    responses = self._handle_ws_message(data)
                    for response in responses:
                        if self._transcript_callback:
                            await self._transcript_callback(response)
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    logger.error(f"WebSocket error: {self._ws.exception()}")
                    self.emit(
                        "error", f"WebSocket error: {self._ws.exception()}")
                    break
        except Exception as e:
            logger.error(f"Error in WebSocket listener: {str(e)}")
            self.emit("error", f"Error in WebSocket listener: {str(e)}")
        finally:
            if self._ws:
                await self._ws.close()
                self._ws = None

    async def _connect_ws(self) -> None:
        """Establish WebSocket connection with Deepgram's Streaming API"""

        if not self._session:
            self._session = aiohttp.ClientSession()

        if self.endpointing < 0:
            endpointing = "false"
        else:
            endpointing = self.endpointing

        query_params = {
            "model": self.model,
            "language": self.language,
            "interim_results": str(self.interim_results).lower(),
            "punctuate": str(self.punctuate).lower(),
            "smart_format": str(self.smart_format).lower(),
            "encoding": "linear16",
            "sample_rate": str(self.sample_rate),
            "channels": 2,
            "endpointing": endpointing,
            "filler_words": str(self.filler_words).lower(),
            "vad_events": "true",
            "no_delay": "true",
            "profanity_filter":str(self.profanity_filter).lower(),
            "numerals":str(self.numerals).lower(),
            "diarize":str(self.enable_diarization).lower()
        }
        params_list = list(query_params.items())
        if self.tag is not None:
            params_list.append(("tag",self.tag))
        _is_nova3 = self.model == "nova-3" or self.model.startswith("nova-3-")
        if _is_nova3 and self.keyterm:
            for t in self.keyterm:
                if t.strip():
                    params_list.append(("keyterm", t.strip()))
        elif not _is_nova3 and self.keywords:
            for kw in self.keywords[:100]:
                params_list.append(("keywords", kw))
        headers = {
            "Authorization": f"Token {self.api_key}",
        }

        ws_url = f"{self.base_url}?{urlencode(params_list)}"

        try:
            self._ws = await self._session.ws_connect(ws_url, headers=headers)
        except Exception as e:
            logger.error(f"Error connecting to WebSocket: {str(e)}")
            raise

    def _handle_ws_message(self, msg: dict) -> list[STTResponse]:
        """Handle incoming WebSocket messages and generate STT responses"""
        responses = []
        try:
            if msg["type"] == "SpeechStarted":
                current_time = time.time()

                if self._last_speech_event_time == 0.0:
                    self._last_speech_event_time = current_time
                    return responses

                if current_time - self._last_speech_event_time < 1.0:
                    global_event_emitter.emit("speech_started")

                self._previous_speech_event_time = self._last_speech_event_time
                self._last_speech_event_time = current_time

            if msg["type"] == "Results":
                channel = msg["channel"]
                alternatives = channel["alternatives"]

                if alternatives and len(alternatives) > 0:
                    alt = alternatives[0]
                    is_final = msg["is_final"]
                    if alt["transcript"] == "":
                        return responses

                    response = STTResponse(
                        event_type=SpeechEventType.FINAL if is_final else SpeechEventType.INTERIM,
                        data=SpeechData(
                            text=alt["transcript"],
                            language=self.language,
                            confidence=alt.get("confidence", 0.0),
                            start_time=alt["words"][0]["start"] if alt["words"] else 0.0,
                            end_time=alt["words"][-1]["end"] if alt["words"] else 0.0,
                            duration=msg["duration"]
                        ),
                        metadata={"model": self.model}
                    )
                    responses.append(response)

        except Exception as e:
            logger.error(f"Error handling WebSocket message: {str(e)}")

        return responses
    
    async def flush(self) -> None:
        """Send flush signal to Sarvam to trigger immediate transcription."""
        if self._ws and not self._ws.closed:
            flush_message = {"type": "Finalize"}
            await self._ws.send_str(json.dumps(flush_message)) 

    async def aclose(self) -> None:
        """Cleanup resources"""
        self._closed = True
        if self._ws_task:
            self._ws_task.cancel()
            logger.info("DeepgramSTT WebSocket task cancelled")
            try:
                await self._ws_task
            except asyncio.CancelledError:
                pass
            self._ws_task = None
            logger.info("DeepgramSTT WebSocket task cleared")

        if self._ws:
            await self._ws.close()
            logger.info("DeepgramSTT WebSocket closed")
            self._ws = None

        if self._session:
            await self._session.close()
            logger.info("DeepgramSTT cleaned up")
            self._session = None
        
        # Call base class cleanup
        await super().aclose()

Base class for Speech-to-Text implementations

Initialize the Deepgram STT plugin

Args

api_key : str | None, optional
Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the STT plugin. Defaults to "nova-2". Use "nova-3" or "nova-3-general" for Nova-3.
language : str
The language to use for the STT plugin. Defaults to "en-US".
interim_results : bool
Whether to return interim results. Defaults to True.
punctuate : bool
Whether to add punctuation. Defaults to True.
smart_format : bool
Whether to use smart formatting. Defaults to True.
sample_rate : int
Sample rate to use for the STT plugin. Defaults to 48000.
endpointing : int
Endpointing threshold. Defaults to 50, set 0 to make false.
filler_words : bool
Whether to include filler words. Defaults to True.
keywords : list[str] | None
Optional keywords for boosting/suppression. Only for Nova-2, Nova-1, Enhanced, Base. Each entry is a keyword or "keyword:intensifier" (e.g. "snuffleupagus:5", "kansas:-10"). Max 100. Defaults to None.
keyterm : list[str] | None
Optional keyterms/phrases for Keyterm Prompting. Only for Nova-3 (e.g. model="nova-3"). Each entry is a keyterm or phrase (e.g. "tretinoin", "customer service"). Max 500 tokens total. Defaults to None.
profanity_filter
Whether to filter profanity from the transcription. Defaults to False.
numerals
Whether to include numerals in the transcription. Defaults to False.
tag
List of tags to add to the requests for usage reporting. Defaults to None.
enable_diarization
Diarize recognizes speaker changes and assigns a speaker to each word in the transcript.
base_url : str
The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v1/listen".

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    self._closed = True
    if self._ws_task:
        self._ws_task.cancel()
        logger.info("DeepgramSTT WebSocket task cancelled")
        try:
            await self._ws_task
        except asyncio.CancelledError:
            pass
        self._ws_task = None
        logger.info("DeepgramSTT WebSocket task cleared")

    if self._ws:
        await self._ws.close()
        logger.info("DeepgramSTT WebSocket closed")
        self._ws = None

    if self._session:
        await self._session.close()
        logger.info("DeepgramSTT cleaned up")
        self._session = None
    
    # Call base class cleanup
    await super().aclose()

Cleanup resources

async def flush(self) ‑> None
Expand source code
async def flush(self) -> None:
    """Send flush signal to Sarvam to trigger immediate transcription."""
    if self._ws and not self._ws.closed:
        flush_message = {"type": "Finalize"}
        await self._ws.send_str(json.dumps(flush_message)) 

Send flush signal to Sarvam to trigger immediate transcription.

async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None
Expand source code
async def process_audio(
    self,
    audio_frames: bytes,
    language: Optional[str] = None,
    **kwargs: Any
) -> None:
    """Process audio frames and send to Deepgram's Streaming API"""
    if self._closed:
        return

    if not self._ws:
        await self._connect_ws()
        self._ws_task = asyncio.create_task(self._listen_for_responses())

    try:
        await self._ws.send_bytes(audio_frames)
    except Exception as e:
        logger.error(f"Error in process_audio: {str(e)}")
        self.emit("error", str(e))
        if self._ws:
            await self._ws.close()
            self._ws = None
            if self._ws_task:
                self._ws_task.cancel()
                self._ws_task = None

Process audio frames and send to Deepgram's Streaming API

class DeepgramSTTV2 (*,
api_key: str | None = None,
model: str = 'flux-general-en',
input_sample_rate: int = 48000,
target_sample_rate: int = 16000,
eager_eot_threshold: float = 0.6,
eot_threshold: float = 0.8,
eot_timeout_ms: int = 7000,
keyterm: list[str] | None = None,
language_hint: list[str] | None = None,
tag: Union[str, List[str]] | None = None,
base_url: str = 'wss://api.deepgram.com/v2/listen',
enable_preemptive_generation: bool = False)
Expand source code
class DeepgramSTTV2(BaseSTT):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "flux-general-en",
        input_sample_rate: int = 48000,
        target_sample_rate: int = 16000,
        eager_eot_threshold:float=0.6,
        eot_threshold:float=0.8,
        eot_timeout_ms:int=7000,
        keyterm: list[str] | None = None,
        language_hint: list[str] | None = None,
        tag:Union[str,List[str]]|None = None,
        base_url: str = "wss://api.deepgram.com/v2/listen",
        enable_preemptive_generation: bool = False,
    ) -> None:
        """Initialize the Deepgram STT plugin (Flux / v2 API).

        Args:
            api_key (str | None, optional): Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the STT plugin. Defaults to "flux-general-en".
            input_sample_rate (int): The input sample rate to use for the STT plugin. Defaults to 48000.
            target_sample_rate (int): The target sample rate to use for the STT plugin. Defaults to 16000.
            eager_eot_threshold (float): Eager end-of-turn threshold. Defaults to 0.6.
            eot_threshold (float): End-of-turn threshold. Defaults to 0.8.
            eot_timeout_ms (int): End-of-turn timeout in milliseconds. Defaults to 7000.
            keyterm (list[str] | None): Optional list of keyterms/phrases to improve recognition (Keyterm Prompting).
                Each entry is a keyterm or multi-word phrase (e.g. "tretinoin", "customer service").
                Formatting is preserved (e.g. "Deepgram", "iPhone"). Max 500 tokens total across all keyterms. Defaults to None.
            language_hint (list[str] | None): Optional list of language hints to bias the model for multilingual workloads. Defaults to None.
            tag: List of tags to add to the requests for usage reporting. Defaults to None.
            base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v2/listen".
            enable_preemptive_generation (bool): Enable preemptive generation based on EagerEndOfTurn events. Defaults to False.
        """
        super().__init__()

        self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Deepgram API key must be provided either through api_key parameter or DEEPGRAM_API_KEY environment variable")

        self.model = model
        self.input_sample_rate = input_sample_rate
        self.target_sample_rate = target_sample_rate
        self.eager_eot_threshold = eager_eot_threshold
        self.eot_threshold=eot_threshold
        self.eot_timeout_ms = eot_timeout_ms
        self.keyterm = keyterm
        self.language_hint = language_hint
        self.tag=tag
        self.base_url = base_url
        self.enable_preemptive_generation = enable_preemptive_generation

        self._stream_buffer = bytearray()
        self._target_chunk_size = int(0.1 * self.target_sample_rate * 2)
        self._min_chunk_size = int(0.05 * self.target_sample_rate * 2)

        self._session: Optional[aiohttp.ClientSession] = None
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self._ws_task: Optional[asyncio.Task] = None
        self._last_transcript: str = ""
        self._ws_task = None
    

    async def process_audio(
        self,
        audio_frames: bytes,
        **kwargs: Any
    ) -> None:
        """Process audio frames and send to Deeepgram's Flux API"""
        
        if not self._ws:
            await self._connect_ws()
            self._ws_task = asyncio.create_task(self._listen_for_responses())
            
        try:
            resampled_audio = await asyncio.to_thread(self._resample_audio, audio_frames)
            if not resampled_audio:
                return
                
            self._stream_buffer.extend(resampled_audio)
             # chunk size 100ms
            while len(self._stream_buffer) >= self._target_chunk_size:
                chunk_to_send = bytes(self._stream_buffer[:self._target_chunk_size])
                self._stream_buffer = self._stream_buffer[self._target_chunk_size:]
                
                await self._ws.send_bytes(bytes(chunk_to_send))
                
        except Exception as e:
            logger.error(f"Error in process_audio: {str(e)}")
            self.emit("error", str(e))
            if self._ws:
                await self._ws.close()
                self._ws = None
                if self._ws_task:
                    self._ws_task.cancel()
                    self._ws_task = None

    async def _listen_for_responses(self) -> None:
        """Background task to listen for WebSocket responses"""
        if not self._ws:
            return

        try:
            async for msg in self._ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = msg.json()
                    responses = self._handle_ws_message(data)
                    for response in responses:
                        if self._transcript_callback:
                            await self._transcript_callback(response)
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    logger.error(f"WebSocket error: {self._ws.exception()}")
                    self.emit(
                        "error", f"WebSocket error: {self._ws.exception()}")
                    break
        except Exception as e:
            logger.error(f"Error in WebSocket listener: {str(e)}")
            self.emit("error", f"Error in WebSocket listener: {str(e)}")
        finally:
            if self._ws:
                await self._ws.close()
                self._ws = None

    async def _connect_ws(self) -> None:
        """Establish WebSocket connection with Deepgram's Streaming API"""
        if not self._session:
            self._session = aiohttp.ClientSession()

        query_params = {
            "model": self.model,
            "encoding": "linear16",
            "sample_rate": self.target_sample_rate,
            "eot_threshold": self.eot_threshold,
            "eot_timeout_ms": self.eot_timeout_ms,
            "eager_eot_threshold": self.eager_eot_threshold,
        }
        params_list = list(query_params.items())
        if self.tag is not None:
            params_list.append(("tag",self.tag))
        if self.keyterm:
            for t in self.keyterm:
                if t.strip():
                    params_list.append(("keyterm", t.strip()))
        if self.language_hint:
            for t in self.language_hint:
                if t.strip():
                    params_list.append(("language_hint", t.strip()))
        headers = {"Authorization": f"Token {self.api_key}"}
        ws_url = f"{self.base_url}?{urlencode(params_list)}"

        try:
            self._ws = await self._session.ws_connect(ws_url, headers=headers)
            logger.info("Connected to Deepgram V2 WebSocket.")
        except Exception as e:
            logger.error(f"Error connecting to WebSocket: {str(e)}")
            raise

    def _handle_ws_message(self, msg: dict) -> list[STTResponse]:
        """Handle incoming WebSocket messages and generate STT responses"""
        responses = []

        try:
            if msg.get("type") != "TurnInfo":
                return responses

            event = msg.get("event")
            transcript = msg.get("transcript", "")
            # logger.info(f"{event} and {transcript}")
            start_time = msg.get("audio_window_start", 0.0)
            end_time = msg.get("audio_window_end", 0.0)
            confidence = msg.get("end_of_turn_confidence", 0.0)
            duration = end_time - start_time
            
            self._last_transcript = transcript
            # Emit turn-related events
            if event == "StartOfTurn":
                global_event_emitter.emit("speech_started")
            elif event == "EagerEndOfTurn":
                # Handle EagerEndOfTurn for preemptive generation
                if self.enable_preemptive_generation and transcript and self._transcript_callback:
                    responses.append(
                        STTResponse(
                            event_type=SpeechEventType.PREFLIGHT,
                            data=SpeechData(
                                text=transcript,
                                confidence=confidence,
                                start_time=start_time,
                                end_time=end_time,
                                duration=duration,
                            ),
                            metadata={"model": self.model},
                        )
                    )
            elif event == "EndOfTurn":
                logger.info(f"EndOfTurn (FINAL) Transcript: {transcript} and Confidence: {confidence}")
                global_event_emitter.emit("speech_stopped")
                if transcript and self._transcript_callback:
                    responses.append(
                        STTResponse(
                            event_type=SpeechEventType.FINAL,
                            data=SpeechData(
                                text=transcript,
                                confidence=confidence,
                                start_time=start_time,
                                end_time=end_time,
                                duration=duration,
                            ),
                            metadata={"model": self.model},
                        )
                    )
            elif event == "TurnResumed":
                # Send interim to signal user continued speaking
                if self.enable_preemptive_generation and transcript:
                    responses.append(
                            STTResponse(
                                event_type=SpeechEventType.INTERIM,
                                data=SpeechData(
                                    text=transcript,
                                    confidence=confidence,
                                    start_time=start_time,
                                    end_time=end_time,
                                    duration=duration,
                                ),
                                metadata={"model": self.model, "turn_resumed": True},
                            )
                    )

        except Exception as e:
            logger.error(f"Error handling WebSocket message: {str(e)}")

        return responses
    
    def _resample_audio(self, audio_bytes: bytes) -> bytes:
        """Resample audio from input sample rate to target sample rate and convert to mono."""
        try:
            if not audio_bytes:
                return b''

            raw_audio = np.frombuffer(audio_bytes, dtype=np.int16)
            if raw_audio.size == 0:
                return b''

            if raw_audio.size % 2 == 0: 
                stereo_audio = raw_audio.reshape(-1, 2)
                mono_audio = stereo_audio.astype(np.float32).mean(axis=1)
            else:
                mono_audio = raw_audio.astype(np.float32)

            if self.input_sample_rate != self.target_sample_rate:
                target_length = int(len(mono_audio) * self.target_sample_rate / self.input_sample_rate)
                resampled_data = signal.resample(mono_audio, target_length)
            else:
                resampled_data = mono_audio

            resampled_data = np.clip(resampled_data, -32767, 32767)
            return resampled_data.astype(np.int16).tobytes()

        except Exception as e:
            logger.error(f"Error resampling audio: {e}")
            return b''

    async def flush(self) -> None:
        """
        Deepgram Flux (v2 API) handles turn detection internally and automatically emits EndOfTurn
        events. There is no explicit Finalize command without terminating the connection via CloseStream.
        This method is a no-op to satisfy the BaseSTT interface requirements.
        """
        pass

    async def aclose(self) -> None:
        """Cleanup resources"""
        
        if len(self._stream_buffer) >= self._min_chunk_size and self._ws:
            try:
                final_chunk = bytes(self._stream_buffer)
                await self._ws.send_bytes(final_chunk)
            except Exception as e:
                logger.error(f"Error sending final audio: {e}")
        
        if self._ws:
            try:
                await self._ws.send_str(json.dumps({"type": "CloseStream"}))
                await asyncio.sleep(0.5)  
            except Exception as e:
                logger.error(f"Error sending termination: {e}")

        if self._ws_task:
            self._ws_task.cancel()
            try:
                await self._ws_task
            except asyncio.CancelledError:
                pass
            self._ws_task = None
            
        if self._ws:
            await self._ws.close()
            self._ws = None
            
        if self._session:
            await self._session.close()
            self._session = None
        await super().aclose()

Base class for Speech-to-Text implementations

Initialize the Deepgram STT plugin (Flux / v2 API).

Args

api_key : str | None, optional
Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the STT plugin. Defaults to "flux-general-en".
input_sample_rate : int
The input sample rate to use for the STT plugin. Defaults to 48000.
target_sample_rate : int
The target sample rate to use for the STT plugin. Defaults to 16000.
eager_eot_threshold : float
Eager end-of-turn threshold. Defaults to 0.6.
eot_threshold : float
End-of-turn threshold. Defaults to 0.8.
eot_timeout_ms : int
End-of-turn timeout in milliseconds. Defaults to 7000.
keyterm : list[str] | None
Optional list of keyterms/phrases to improve recognition (Keyterm Prompting). Each entry is a keyterm or multi-word phrase (e.g. "tretinoin", "customer service"). Formatting is preserved (e.g. "Deepgram", "iPhone"). Max 500 tokens total across all keyterms. Defaults to None.
language_hint : list[str] | None
Optional list of language hints to bias the model for multilingual workloads. Defaults to None.
tag
List of tags to add to the requests for usage reporting. Defaults to None.
base_url : str
The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v2/listen".
enable_preemptive_generation : bool
Enable preemptive generation based on EagerEndOfTurn events. Defaults to False.

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    
    if len(self._stream_buffer) >= self._min_chunk_size and self._ws:
        try:
            final_chunk = bytes(self._stream_buffer)
            await self._ws.send_bytes(final_chunk)
        except Exception as e:
            logger.error(f"Error sending final audio: {e}")
    
    if self._ws:
        try:
            await self._ws.send_str(json.dumps({"type": "CloseStream"}))
            await asyncio.sleep(0.5)  
        except Exception as e:
            logger.error(f"Error sending termination: {e}")

    if self._ws_task:
        self._ws_task.cancel()
        try:
            await self._ws_task
        except asyncio.CancelledError:
            pass
        self._ws_task = None
        
    if self._ws:
        await self._ws.close()
        self._ws = None
        
    if self._session:
        await self._session.close()
        self._session = None
    await super().aclose()

Cleanup resources

async def flush(self) ‑> None
Expand source code
async def flush(self) -> None:
    """
    Deepgram Flux (v2 API) handles turn detection internally and automatically emits EndOfTurn
    events. There is no explicit Finalize command without terminating the connection via CloseStream.
    This method is a no-op to satisfy the BaseSTT interface requirements.
    """
    pass

Deepgram Flux (v2 API) handles turn detection internally and automatically emits EndOfTurn events. There is no explicit Finalize command without terminating the connection via CloseStream. This method is a no-op to satisfy the BaseSTT interface requirements.

async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None
Expand source code
async def process_audio(
    self,
    audio_frames: bytes,
    **kwargs: Any
) -> None:
    """Process audio frames and send to Deeepgram's Flux API"""
    
    if not self._ws:
        await self._connect_ws()
        self._ws_task = asyncio.create_task(self._listen_for_responses())
        
    try:
        resampled_audio = await asyncio.to_thread(self._resample_audio, audio_frames)
        if not resampled_audio:
            return
            
        self._stream_buffer.extend(resampled_audio)
         # chunk size 100ms
        while len(self._stream_buffer) >= self._target_chunk_size:
            chunk_to_send = bytes(self._stream_buffer[:self._target_chunk_size])
            self._stream_buffer = self._stream_buffer[self._target_chunk_size:]
            
            await self._ws.send_bytes(bytes(chunk_to_send))
            
    except Exception as e:
        logger.error(f"Error in process_audio: {str(e)}")
        self.emit("error", str(e))
        if self._ws:
            await self._ws.close()
            self._ws = None
            if self._ws_task:
                self._ws_task.cancel()
                self._ws_task = None

Process audio frames and send to Deeepgram's Flux API

class DeepgramTTS (*,
api_key: str | None = None,
model: str = 'aura-2-andromeda-en',
encoding: str = 'linear16',
sample_rate: int = 24000,
base_url: str = 'wss://api.deepgram.com/v1/speak',
max_connection_age_sec: float = 300.0,
**kwargs: Any)
Expand source code
class DeepgramTTS(TTS):
    def __init__(
            self,
            *,
            api_key: str | None = None,
            model: str = DEFAULT_MODEL,
            encoding: str = DEFAULT_ENCODING,
            sample_rate: int = DEEPGRAM_SAMPLE_RATE,
            base_url: str = API_BASE_URL,
            max_connection_age_sec: float = DEFAULT_CONNECTION_MAX_AGE_SEC,
            **kwargs: Any,
    ) -> None:
        """Initialize the Deepgram TTS plugin.

        Args:
            api_key: Deepgram API key. Falls back to ``DEEPGRAM_API_KEY`` env var.
            model: Deepgram TTS voice/model id. Defaults to ``aura-2-andromeda-en``.
            encoding: Output audio encoding. Defaults to ``linear16``.
            sample_rate: Output sample rate.
            base_url: WebSocket base URL.
            max_connection_age_sec: Refresh the WebSocket after this many seconds
                to avoid hitting Deepgram's idle/session limits.
        """
        super().__init__(sample_rate=sample_rate, num_channels=DEEPGRAM_CHANNELS)

        self.model = model
        self.encoding = encoding
        self.base_url = base_url
        self.audio_track = None
        self.loop = None
        self._max_connection_age_sec = max_connection_age_sec

        self._ws_session: aiohttp.ClientSession | None = None
        self._ws_connection: aiohttp.ClientWebSocketResponse | None = None
        self._ws_connect_time: float = 0.0
        self._connection_lock = asyncio.Lock()
        self._receive_task: asyncio.Task | None = None

        self._active_future: asyncio.Future[None] | None = None
        self._active_send_task: asyncio.Task | None = None

        self._interrupted = False
        self._first_chunk_sent = False

        self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Deepgram API key must be provided either through the 'api_key' parameter or the DEEPGRAM_API_KEY environment variable."
            )

    def reset_first_audio_tracking(self) -> None:
        self._first_chunk_sent = False

    async def prewarm(self) -> None:
        """Pre-establish the Deepgram WebSocket so the first ``synthesize()`` call
        does not pay the TLS + auth + upgrade cost. Safe to call repeatedly."""
        try:
            await self._ensure_connection()
        except Exception as e:
            logger.warning(f"Deepgram TTS prewarm failed (non-fatal): {e}")

    async def _ensure_connection(self) -> None:
        async with self._connection_lock:
            now = asyncio.get_event_loop().time()

            if self._ws_connection and not self._ws_connection.closed:
                age = now - self._ws_connect_time
                if age < self._max_connection_age_sec:
                    return
                logger.info(f"Refreshing Deepgram WebSocket (age={age:.1f}s)")
                await self._close_connection_locked()
            elif self._ws_connection or self._ws_session:
                await self._close_connection_locked()

            params = {
                "model": self.model,
                "encoding": self.encoding,
                "sample_rate": self.sample_rate,
            }
            param_string = "&".join([f"{k}={v}" for k, v in params.items()])
            full_ws_url = f"{self.base_url}?{param_string}"
            headers = {"Authorization": f"Token {self.api_key}"}

            self._ws_session = aiohttp.ClientSession()
            self._ws_connection = await asyncio.wait_for(
                self._ws_session.ws_connect(
                    full_ws_url, headers=headers, heartbeat=30.0
                ),
                timeout=10.0,
            )
            self._ws_connect_time = now
            self._receive_task = asyncio.create_task(self._receive_audio_task())

    async def _close_connection_locked(self) -> None:
        if self._receive_task and not self._receive_task.done():
            self._receive_task.cancel()
            try:
                await self._receive_task
            except (asyncio.CancelledError, Exception):
                pass
        self._receive_task = None
        if self._ws_connection and not self._ws_connection.closed:
            try:
                await self._ws_connection.close()
            except Exception:
                pass
        self._ws_connection = None
        if self._ws_session and not self._ws_session.closed:
            try:
                await self._ws_session.close()
            except Exception:
                pass
        self._ws_session = None

    async def synthesize(
            self,
            text: AsyncIterator[Union[str, FlushMarker]] | str,
            voice_id: Optional[str] = None,
            **kwargs: Any,
    ) -> None:
        """Synthesize text via Deepgram's streaming WebSocket API.

        Each ``FlushMarker`` in the input stream is forwarded to Deepgram as a
        ``{"type": "Flush"}`` message, letting the server emit audio for the
        completed sentence without waiting for end-of-stream.
        """
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            self._interrupted = False
            await self._ensure_connection()
            if not self._ws_connection:
                raise RuntimeError("WebSocket connection is not available.")

            done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future()
            self._active_future = done_future

            send_task = asyncio.create_task(self._send_text_task(text, done_future))
            self._active_send_task = send_task

            try:
                await done_future
            except asyncio.CancelledError:
                pass

            await send_task

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")
            raise
        finally:
            if self._active_future is done_future:
                self._active_future = None
            if self._active_send_task is send_task:
                self._active_send_task = None

    async def _send_text_task(
        self,
        text: Union[AsyncIterator[Union[str, FlushMarker]], str],
        done_future: asyncio.Future[None],
    ) -> None:
        if not self._ws_connection or self._ws_connection.closed:
            if not done_future.done():
                done_future.set_exception(RuntimeError("WebSocket closed"))
            return

        has_sent = False
        try:
            if isinstance(text, str):
                if text and not self._interrupted:
                    await self._ws_connection.send_json({"type": "Speak", "text": text})
                    has_sent = True
            else:
                async for chunk in text:
                    if self._interrupted or self._ws_connection.closed:
                        break
                    if isinstance(chunk, FlushMarker):
                        if has_sent:
                            await self._ws_connection.send_json({"type": "Flush"})
                        continue
                    if not chunk:
                        continue
                    await self._ws_connection.send_json({"type": "Speak", "text": chunk})
                    has_sent = True
                    
            if has_sent and not self._interrupted and not self._ws_connection.closed:
                await self._ws_connection.send_json({"type": "Flush"})

            if not done_future.done():
                try:
                    await asyncio.wait_for(asyncio.shield(done_future), timeout=30.0)
                except asyncio.TimeoutError:
                    if not done_future.done():
                        done_future.set_result(None)
                except asyncio.CancelledError:
                    pass

        except asyncio.CancelledError:
            if not done_future.done():
                done_future.cancel()
            raise
        except Exception as e:
            if not self._interrupted:
                self.emit("error", f"Send task error: {str(e)}")
            if not done_future.done():
                done_future.set_exception(e)

    async def _receive_audio_task(self) -> None:
        if not self._ws_connection:
            return

        try:
            while self._ws_connection and not self._ws_connection.closed:
                msg = await self._ws_connection.receive()

                if msg.type == aiohttp.WSMsgType.BINARY:
                    fut = self._active_future
                    if not self._interrupted and fut is not None and not fut.done():
                        await self._stream_audio_chunks(msg.data)
                elif msg.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(msg.data)
                    msg_type = data.get("type")
                    if msg_type == "Error":
                        err = data.get("description", "Unknown error")
                        if not self._interrupted:
                            self.emit("error", f"Deepgram error: {err}")
                        fut = self._active_future
                        if fut is not None and not fut.done():
                            fut.set_exception(RuntimeError(err))
                    elif msg_type == "Flushed":
                        fut = self._active_future
                        if fut is not None and not fut.done():
                            fut.set_result(None)
                elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING):
                    break
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    raise ConnectionError(f"WebSocket error: {self._ws_connection.exception()}")
        except asyncio.CancelledError:
            raise
        except Exception as e:
            if not self._interrupted:
                self.emit("error", f"Receive task error: {str(e)}")
            fut = self._active_future
            if fut is not None and not fut.done():
                fut.set_exception(e)

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        if not audio_bytes or self._interrupted:
            return

        if not self._first_chunk_sent and self._first_audio_callback:
            self._first_chunk_sent = True
            await self._first_audio_callback()

        if self.audio_track and self.loop:
            await self.audio_track.add_new_bytes(audio_bytes)

    async def interrupt(self) -> None:
        """Stop emitting audio for the current synthesis. Keeps the WebSocket
        open so the next turn does not pay reconnect cost; in-flight audio
        frames are dropped via the ``_active_future`` filter in the receive
        loop."""
        self._interrupted = True

        if self.audio_track:
            self.audio_track.interrupt()

        if self._active_send_task and not self._active_send_task.done():
            self._active_send_task.cancel()

        if self._active_future is not None and not self._active_future.done():
            self._active_future.cancel()

    async def aclose(self) -> None:
        self._interrupted = True

        if self._active_send_task and not self._active_send_task.done():
            self._active_send_task.cancel()

        async with self._connection_lock:
            await self._close_connection_locked()

        await super().aclose()

Base class for Text-to-Speech implementations

Initialize the Deepgram TTS plugin.

Args

api_key
Deepgram API key. Falls back to DEEPGRAM_API_KEY env var.
model
Deepgram TTS voice/model id. Defaults to aura-2-andromeda-en.
encoding
Output audio encoding. Defaults to linear16.
sample_rate
Output sample rate.
base_url
WebSocket base URL.
max_connection_age_sec
Refresh the WebSocket after this many seconds to avoid hitting Deepgram's idle/session limits.

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._interrupted = True

    if self._active_send_task and not self._active_send_task.done():
        self._active_send_task.cancel()

    async with self._connection_lock:
        await self._close_connection_locked()

    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Stop emitting audio for the current synthesis. Keeps the WebSocket
    open so the next turn does not pay reconnect cost; in-flight audio
    frames are dropped via the ``_active_future`` filter in the receive
    loop."""
    self._interrupted = True

    if self.audio_track:
        self.audio_track.interrupt()

    if self._active_send_task and not self._active_send_task.done():
        self._active_send_task.cancel()

    if self._active_future is not None and not self._active_future.done():
        self._active_future.cancel()

Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; in-flight audio frames are dropped via the _active_future filter in the receive loop.

async def prewarm(self) ‑> None
Expand source code
async def prewarm(self) -> None:
    """Pre-establish the Deepgram WebSocket so the first ``synthesize()`` call
    does not pay the TLS + auth + upgrade cost. Safe to call repeatedly."""
    try:
        await self._ensure_connection()
    except Exception as e:
        logger.warning(f"Deepgram TTS prewarm failed (non-fatal): {e}")

Pre-establish the Deepgram WebSocket so the first synthesize() call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly.

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self,
text: AsyncIterator[Union[str, FlushMarker]] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
        self,
        text: AsyncIterator[Union[str, FlushMarker]] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any,
) -> None:
    """Synthesize text via Deepgram's streaming WebSocket API.

    Each ``FlushMarker`` in the input stream is forwarded to Deepgram as a
    ``{"type": "Flush"}`` message, letting the server emit audio for the
    completed sentence without waiting for end-of-stream.
    """
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        self._interrupted = False
        await self._ensure_connection()
        if not self._ws_connection:
            raise RuntimeError("WebSocket connection is not available.")

        done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future()
        self._active_future = done_future

        send_task = asyncio.create_task(self._send_text_task(text, done_future))
        self._active_send_task = send_task

        try:
            await done_future
        except asyncio.CancelledError:
            pass

        await send_task

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")
        raise
    finally:
        if self._active_future is done_future:
            self._active_future = None
        if self._active_send_task is send_task:
            self._active_send_task = None

Synthesize text via Deepgram's streaming WebSocket API.

Each FlushMarker in the input stream is forwarded to Deepgram as a {"type": "Flush"} message, letting the server emit audio for the completed sentence without waiting for end-of-stream.