Package videosdk.plugins.deepgram

Sub-modules

videosdk.plugins.deepgram.stt
videosdk.plugins.deepgram.stt_v2
videosdk.plugins.deepgram.tts

Classes

class DeepgramSTT (*,
api_key: str | None = None,
model: str = 'nova-2',
language: str = 'en-US',
interim_results: bool = True,
punctuate: bool = True,
smart_format: bool = True,
sample_rate: int = 48000,
endpointing: int = 50,
filler_words: bool = True,
keywords: list[str] | None = None,
keyterm: list[str] | None = None,
profanity_filter: bool = False,
numerals: bool = False,
tag: Union[str, List[str]] | None = None,
enable_diarization: bool = False,
base_url: str = 'wss://api.deepgram.com/v1/listen')
Expand source code
class DeepgramSTT(BaseSTT):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "nova-2",
        language: str = "en-US",
        interim_results: bool = True,
        punctuate: bool = True,
        smart_format: bool = True,
        sample_rate: int = 48000,
        endpointing: int = 50,
        filler_words: bool = True,
        keywords: list[str] | None = None,
        keyterm: list[str] | None = None,
        profanity_filter: bool = False,
        numerals:bool=False,
        tag:Union[str,List[str]]|None = None,
        enable_diarization:bool=False,
        base_url: str = "wss://api.deepgram.com/v1/listen",
    ) -> None:
        """Initialize the Deepgram STT plugin

        Args:
            api_key (str | None, optional): Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the STT plugin. Defaults to "nova-2". Use "nova-3" or "nova-3-general" for Nova-3.
            language (str): The language to use for the STT plugin. Defaults to "en-US".
            interim_results (bool): Whether to return interim results. Defaults to True.
            punctuate (bool): Whether to add punctuation. Defaults to True.
            smart_format (bool): Whether to use smart formatting. Defaults to True.
            sample_rate (int): Sample rate to use for the STT plugin. Defaults to 48000.
            endpointing (int): Endpointing threshold. Defaults to 50, set 0 to make false.
            filler_words (bool): Whether to include filler words. Defaults to True.
            keywords (list[str] | None): Optional keywords for boosting/suppression. Only for Nova-2, Nova-1, Enhanced, Base.
                Each entry is a keyword or "keyword:intensifier" (e.g. "snuffleupagus:5", "kansas:-10"). Max 100. Defaults to None.
            keyterm (list[str] | None): Optional keyterms/phrases for Keyterm Prompting. Only for Nova-3 (e.g. model="nova-3").
                Each entry is a keyterm or phrase (e.g. "tretinoin", "customer service"). Max 500 tokens total. Defaults to None.
            profanity_filter: Whether to filter profanity from the transcription. Defaults to False.
            numerals: Whether to include numerals in the transcription. Defaults to False.
            tag: List of tags to add to the requests for usage reporting. Defaults to None.
            enable_diarization: Diarize recognizes speaker changes and assigns a speaker to each word in the transcript.
            base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v1/listen".
        """
        super().__init__()

        self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Deepgram API key must be provided either through api_key parameter or DEEPGRAM_API_KEY environment variable")

        self.model = model
        _is_nova3 = model == "nova-3" or model.startswith("nova-3-")
        if _is_nova3 and keywords:
            raise ValueError(
                "Keywords are not supported for Nova-3. Use keyterm=... for Keyterm Prompting instead."
            )
        self.language = language
        self.sample_rate = sample_rate
        self.interim_results = interim_results
        self.punctuate = punctuate
        self.smart_format = smart_format
        self.endpointing = endpointing
        self.filler_words = filler_words
        self.keywords = keywords
        self.keyterm = keyterm
        self.profanity_filter = profanity_filter
        self.numerals = numerals
        self.tag = tag
        self.enable_diarization= enable_diarization
        self.base_url = base_url
        self._session: Optional[aiohttp.ClientSession] = None
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self._ws_task: Optional[asyncio.Task] = None
        self._last_speech_event_time = 0.0
        self._previous_speech_event_time = 0.0
        self._closed = False

    async def process_audio(
        self,
        audio_frames: bytes,
        language: Optional[str] = None,
        **kwargs: Any
    ) -> None:
        """Process audio frames and send to Deepgram's Streaming API"""
        if self._closed:
            return

        if not self._ws:
            await self._connect_ws()
            self._ws_task = asyncio.create_task(self._listen_for_responses())

        try:
            await self._ws.send_bytes(audio_frames)
        except Exception as e:
            logger.error(f"Error in process_audio: {str(e)}")
            self.emit("error", str(e))
            if self._ws:
                await self._ws.close()
                self._ws = None
                if self._ws_task:
                    self._ws_task.cancel()
                    self._ws_task = None

    async def _listen_for_responses(self) -> None:
        """Background task to listen for WebSocket responses"""
        if not self._ws:
            return

        try:
            async for msg in self._ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = msg.json()
                    responses = self._handle_ws_message(data)
                    for response in responses:
                        if self._transcript_callback:
                            await self._transcript_callback(response)
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    logger.error(f"WebSocket error: {self._ws.exception()}")
                    self.emit(
                        "error", f"WebSocket error: {self._ws.exception()}")
                    break
        except Exception as e:
            logger.error(f"Error in WebSocket listener: {str(e)}")
            self.emit("error", f"Error in WebSocket listener: {str(e)}")
        finally:
            if self._ws:
                await self._ws.close()
                self._ws = None

    async def _connect_ws(self) -> None:
        """Establish WebSocket connection with Deepgram's Streaming API"""

        if not self._session:
            self._session = aiohttp.ClientSession()

        if self.endpointing < 0:
            endpointing = "false"
        else:
            endpointing = self.endpointing

        query_params = {
            "model": self.model,
            "language": self.language,
            "interim_results": str(self.interim_results).lower(),
            "punctuate": str(self.punctuate).lower(),
            "smart_format": str(self.smart_format).lower(),
            "encoding": "linear16",
            "sample_rate": str(self.sample_rate),
            "channels": 2,
            "endpointing": endpointing,
            "filler_words": str(self.filler_words).lower(),
            "vad_events": "true",
            "no_delay": "true",
            "profanity_filter":str(self.filler_words).lower(),
            "numerals":str(self.filler_words).lower(),
            "diarize":str(self.enable_diarization).lower()
        }
        params_list = list(query_params.items())
        if self.tag is not None:
            params_list.append(("tag",self.tag))
        _is_nova3 = self.model == "nova-3" or self.model.startswith("nova-3-")
        if _is_nova3 and self.keyterm:
            for t in self.keyterm:
                if t.strip():
                    params_list.append(("keyterm", t.strip()))
        elif not _is_nova3 and self.keywords:
            for kw in self.keywords[:100]:
                params_list.append(("keywords", kw))
        headers = {
            "Authorization": f"Token {self.api_key}",
        }

        ws_url = f"{self.base_url}?{urlencode(params_list)}"

        try:
            self._ws = await self._session.ws_connect(ws_url, headers=headers)
        except Exception as e:
            logger.error(f"Error connecting to WebSocket: {str(e)}")
            raise

    def _handle_ws_message(self, msg: dict) -> list[STTResponse]:
        """Handle incoming WebSocket messages and generate STT responses"""
        responses = []
        try:
            if msg["type"] == "SpeechStarted":
                current_time = time.time()

                if self._last_speech_event_time == 0.0:
                    self._last_speech_event_time = current_time
                    return responses

                if current_time - self._last_speech_event_time < 1.0:
                    global_event_emitter.emit("speech_started")

                self._previous_speech_event_time = self._last_speech_event_time
                self._last_speech_event_time = current_time

            if msg["type"] == "Results":
                channel = msg["channel"]
                alternatives = channel["alternatives"]

                if alternatives and len(alternatives) > 0:
                    alt = alternatives[0]
                    is_final = msg["is_final"]
                    if alt["transcript"] == "":
                        return responses

                    response = STTResponse(
                        event_type=SpeechEventType.FINAL if is_final else SpeechEventType.INTERIM,
                        data=SpeechData(
                            text=alt["transcript"],
                            language=self.language,
                            confidence=alt.get("confidence", 0.0),
                            start_time=alt["words"][0]["start"] if alt["words"] else 0.0,
                            end_time=alt["words"][-1]["end"] if alt["words"] else 0.0,
                            duration=msg["duration"]
                        ),
                        metadata={"model": self.model}
                    )
                    responses.append(response)

        except Exception as e:
            logger.error(f"Error handling WebSocket message: {str(e)}")

        return responses
    
    async def flush(self) -> None:
        """Send flush signal to Sarvam to trigger immediate transcription."""
        if self._ws and not self._ws.closed:
            flush_message = {"type": "Finalize"}
            await self._ws.send_str(json.dumps(flush_message)) 

    async def flush(self) -> None:
        """Send flush signal to Deepgram to trigger immediate transcription."""
        if self._ws and not self._ws.closed:
            flush_message = {"type": "Finalize"}
            await self._ws.send_str(json.dumps(flush_message))

    async def aclose(self) -> None:
        """Cleanup resources"""
        self._closed = True
        if self._ws_task:
            self._ws_task.cancel()
            logger.info("DeepgramSTT WebSocket task cancelled")
            try:
                await self._ws_task
            except asyncio.CancelledError:
                pass
            self._ws_task = None
            logger.info("DeepgramSTT WebSocket task cleared")

        if self._ws:
            await self._ws.close()
            logger.info("DeepgramSTT WebSocket closed")
            self._ws = None

        if self._session:
            await self._session.close()
            logger.info("DeepgramSTT cleaned up")
            self._session = None
        
        # Call base class cleanup
        await super().aclose()

Base class for Speech-to-Text implementations

Initialize the Deepgram STT plugin

Args

api_key : str | None, optional
Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the STT plugin. Defaults to "nova-2". Use "nova-3" or "nova-3-general" for Nova-3.
language : str
The language to use for the STT plugin. Defaults to "en-US".
interim_results : bool
Whether to return interim results. Defaults to True.
punctuate : bool
Whether to add punctuation. Defaults to True.
smart_format : bool
Whether to use smart formatting. Defaults to True.
sample_rate : int
Sample rate to use for the STT plugin. Defaults to 48000.
endpointing : int
Endpointing threshold. Defaults to 50, set 0 to make false.
filler_words : bool
Whether to include filler words. Defaults to True.
keywords : list[str] | None
Optional keywords for boosting/suppression. Only for Nova-2, Nova-1, Enhanced, Base. Each entry is a keyword or "keyword:intensifier" (e.g. "snuffleupagus:5", "kansas:-10"). Max 100. Defaults to None.
keyterm : list[str] | None
Optional keyterms/phrases for Keyterm Prompting. Only for Nova-3 (e.g. model="nova-3"). Each entry is a keyterm or phrase (e.g. "tretinoin", "customer service"). Max 500 tokens total. Defaults to None.
profanity_filter
Whether to filter profanity from the transcription. Defaults to False.
numerals
Whether to include numerals in the transcription. Defaults to False.
tag
List of tags to add to the requests for usage reporting. Defaults to None.
enable_diarization
Diarize recognizes speaker changes and assigns a speaker to each word in the transcript.
base_url : str
The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v1/listen".

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    self._closed = True
    if self._ws_task:
        self._ws_task.cancel()
        logger.info("DeepgramSTT WebSocket task cancelled")
        try:
            await self._ws_task
        except asyncio.CancelledError:
            pass
        self._ws_task = None
        logger.info("DeepgramSTT WebSocket task cleared")

    if self._ws:
        await self._ws.close()
        logger.info("DeepgramSTT WebSocket closed")
        self._ws = None

    if self._session:
        await self._session.close()
        logger.info("DeepgramSTT cleaned up")
        self._session = None
    
    # Call base class cleanup
    await super().aclose()

Cleanup resources

async def flush(self) ‑> None
Expand source code
async def flush(self) -> None:
    """Send flush signal to Deepgram to trigger immediate transcription."""
    if self._ws and not self._ws.closed:
        flush_message = {"type": "Finalize"}
        await self._ws.send_str(json.dumps(flush_message))

Send flush signal to Deepgram to trigger immediate transcription.

async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None
Expand source code
async def process_audio(
    self,
    audio_frames: bytes,
    language: Optional[str] = None,
    **kwargs: Any
) -> None:
    """Process audio frames and send to Deepgram's Streaming API"""
    if self._closed:
        return

    if not self._ws:
        await self._connect_ws()
        self._ws_task = asyncio.create_task(self._listen_for_responses())

    try:
        await self._ws.send_bytes(audio_frames)
    except Exception as e:
        logger.error(f"Error in process_audio: {str(e)}")
        self.emit("error", str(e))
        if self._ws:
            await self._ws.close()
            self._ws = None
            if self._ws_task:
                self._ws_task.cancel()
                self._ws_task = None

Process audio frames and send to Deepgram's Streaming API

class DeepgramSTTV2 (*,
api_key: str | None = None,
model: str = 'flux-general-en',
input_sample_rate: int = 48000,
target_sample_rate: int = 16000,
eager_eot_threshold: float = 0.6,
eot_threshold: float = 0.8,
eot_timeout_ms: int = 7000,
keyterm: list[str] | None = None,
tag: Union[str, List[str]] | None = None,
base_url: str = 'wss://api.deepgram.com/v2/listen',
enable_preemptive_generation: bool = False)
Expand source code
class DeepgramSTTV2(BaseSTT):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "flux-general-en",
        input_sample_rate: int = 48000,
        target_sample_rate: int = 16000,
        eager_eot_threshold:float=0.6,
        eot_threshold:float=0.8,
        eot_timeout_ms:int=7000,
        keyterm: list[str] | None = None,
        tag:Union[str,List[str]]|None = None,
        base_url: str = "wss://api.deepgram.com/v2/listen",
        enable_preemptive_generation: bool = False,
    ) -> None:
        """Initialize the Deepgram STT plugin (Flux / v2 API).

        Args:
            api_key (str | None, optional): Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the STT plugin. Defaults to "flux-general-en".
            input_sample_rate (int): The input sample rate to use for the STT plugin. Defaults to 48000.
            target_sample_rate (int): The target sample rate to use for the STT plugin. Defaults to 16000.
            eager_eot_threshold (float): Eager end-of-turn threshold. Defaults to 0.6.
            eot_threshold (float): End-of-turn threshold. Defaults to 0.8.
            eot_timeout_ms (int): End-of-turn timeout in milliseconds. Defaults to 7000.
            keyterm (list[str] | None): Optional list of keyterms/phrases to improve recognition (Keyterm Prompting).
                Each entry is a keyterm or multi-word phrase (e.g. "tretinoin", "customer service").
                Formatting is preserved (e.g. "Deepgram", "iPhone"). Max 500 tokens total across all keyterms. Defaults to None.
            tag: List of tags to add to the requests for usage reporting. Defaults to None.
            base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v2/listen".
            enable_preemptive_generation (bool): Enable preemptive generation based on EagerEndOfTurn events. Defaults to False.
        """
        super().__init__()

        self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Deepgram API key must be provided either through api_key parameter or DEEPGRAM_API_KEY environment variable")

        self.model = model
        self.input_sample_rate = input_sample_rate
        self.target_sample_rate = target_sample_rate
        self.eager_eot_threshold = eager_eot_threshold
        self.eot_threshold=eot_threshold
        self.eot_timeout_ms = eot_timeout_ms
        self.keyterm = keyterm
        self.tag=tag
        self.base_url = base_url
        self.enable_preemptive_generation = enable_preemptive_generation

        self._stream_buffer = bytearray()
        self._target_chunk_size = int(0.1 * self.target_sample_rate * 2)
        self._min_chunk_size = int(0.05 * self.target_sample_rate * 2)

        self._session: Optional[aiohttp.ClientSession] = None
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self._ws_task: Optional[asyncio.Task] = None
        self._last_transcript: str = ""
        self._ws_task = None
    

    async def process_audio(
        self,
        audio_frames: bytes,
        **kwargs: Any
    ) -> None:
        """Process audio frames and send to Deeepgram's Flux API"""
        
        if not self._ws:
            await self._connect_ws()
            self._ws_task = asyncio.create_task(self._listen_for_responses())
            
        try:
            resampled_audio = self._resample_audio(audio_frames)
            if not resampled_audio:
                return
                
            self._stream_buffer.extend(resampled_audio)
             # chunk size 100ms
            while len(self._stream_buffer) >= self._target_chunk_size:
                chunk_to_send = bytes(self._stream_buffer[:self._target_chunk_size])
                self._stream_buffer = self._stream_buffer[self._target_chunk_size:]
                
                await self._ws.send_bytes(bytes(chunk_to_send))
                
        except Exception as e:
            logger.error(f"Error in process_audio: {str(e)}")
            self.emit("error", str(e))
            if self._ws:
                await self._ws.close()
                self._ws = None
                if self._ws_task:
                    self._ws_task.cancel()
                    self._ws_task = None

    async def _listen_for_responses(self) -> None:
        """Background task to listen for WebSocket responses"""
        if not self._ws:
            return

        try:
            async for msg in self._ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = msg.json()
                    responses = self._handle_ws_message(data)
                    for response in responses:
                        if self._transcript_callback:
                            await self._transcript_callback(response)
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    logger.error(f"WebSocket error: {self._ws.exception()}")
                    self.emit(
                        "error", f"WebSocket error: {self._ws.exception()}")
                    break
        except Exception as e:
            logger.error(f"Error in WebSocket listener: {str(e)}")
            self.emit("error", f"Error in WebSocket listener: {str(e)}")
        finally:
            if self._ws:
                await self._ws.close()
                self._ws = None

    async def _connect_ws(self) -> None:
        """Establish WebSocket connection with Deepgram's Streaming API"""
        if not self._session:
            self._session = aiohttp.ClientSession()

        query_params = {
            "model": self.model,
            "encoding": "linear16",
            "sample_rate": self.target_sample_rate,
            "eot_threshold": self.eot_threshold,
            "eot_timeout_ms": self.eot_timeout_ms,
            "eager_eot_threshold": self.eager_eot_threshold,
        }
        params_list = list(query_params.items())
        if self.tag is not None:
            params_list.append(("tag",self.tag))
        if self.keyterm:
            for t in self.keyterm:
                if t.strip():
                    params_list.append(("keyterm", t.strip()))
        headers = {"Authorization": f"Token {self.api_key}"}
        ws_url = f"{self.base_url}?{urlencode(params_list)}"

        try:
            self._ws = await self._session.ws_connect(ws_url, headers=headers)
            logger.info("Connected to Deepgram V2 WebSocket.")
        except Exception as e:
            logger.error(f"Error connecting to WebSocket: {str(e)}")
            raise

    def _handle_ws_message(self, msg: dict) -> list[STTResponse]:
        """Handle incoming WebSocket messages and generate STT responses"""
        responses = []

        try:
            if msg.get("type") != "TurnInfo":
                return responses

            event = msg.get("event")
            transcript = msg.get("transcript", "")
            # logger.info(f"{event} and {transcript}")
            start_time = msg.get("audio_window_start", 0.0)
            end_time = msg.get("audio_window_end", 0.0)
            confidence = msg.get("end_of_turn_confidence", 0.0)
            duration = end_time - start_time
            
            self._last_transcript = transcript
            # Emit turn-related events
            if event == "StartOfTurn":
                global_event_emitter.emit("speech_started")
            elif event == "EagerEndOfTurn":
                # Handle EagerEndOfTurn for preemptive generation
                if self.enable_preemptive_generation and transcript and self._transcript_callback:
                    responses.append(
                        STTResponse(
                            event_type=SpeechEventType.PREFLIGHT,
                            data=SpeechData(
                                text=transcript,
                                confidence=confidence,
                                start_time=start_time,
                                end_time=end_time,
                                duration=duration,
                            ),
                            metadata={"model": self.model},
                        )
                    )
            elif event == "EndOfTurn":
                logger.info(f"EndOfTurn (FINAL) Transcript: {transcript} and Confidence: {confidence}")
                global_event_emitter.emit("speech_stopped")
                if transcript and self._transcript_callback:
                    responses.append(
                        STTResponse(
                            event_type=SpeechEventType.FINAL,
                            data=SpeechData(
                                text=transcript,
                                confidence=confidence,
                                start_time=start_time,
                                end_time=end_time,
                                duration=duration,
                            ),
                            metadata={"model": self.model},
                        )
                    )
            elif event == "TurnResumed":
                # Send interim to signal user continued speaking
                if self.enable_preemptive_generation and transcript:
                    responses.append(
                            STTResponse(
                                event_type=SpeechEventType.INTERIM,
                                data=SpeechData(
                                    text=transcript,
                                    confidence=confidence,
                                    start_time=start_time,
                                    end_time=end_time,
                                    duration=duration,
                                ),
                                metadata={"model": self.model, "turn_resumed": True},
                            )
                    )

        except Exception as e:
            logger.error(f"Error handling WebSocket message: {str(e)}")

        return responses
    
    def _resample_audio(self, audio_bytes: bytes) -> bytes:
        """Resample audio from input sample rate to target sample rate and convert to mono."""
        try:
            if not audio_bytes:
                return b''

            raw_audio = np.frombuffer(audio_bytes, dtype=np.int16)
            if raw_audio.size == 0:
                return b''

            if raw_audio.size % 2 == 0: 
                stereo_audio = raw_audio.reshape(-1, 2)
                mono_audio = stereo_audio.astype(np.float32).mean(axis=1)
            else:
                mono_audio = raw_audio.astype(np.float32)

            if self.input_sample_rate != self.target_sample_rate:
                target_length = int(len(mono_audio) * self.target_sample_rate / self.input_sample_rate)
                resampled_data = signal.resample(mono_audio, target_length)
            else:
                resampled_data = mono_audio

            resampled_data = np.clip(resampled_data, -32767, 32767)
            return resampled_data.astype(np.int16).tobytes()

        except Exception as e:
            logger.error(f"Error resampling audio: {e}")
            return b''


    async def aclose(self) -> None:
        """Cleanup resources"""
        
        if len(self._stream_buffer) >= self._min_chunk_size and self._ws:
            try:
                final_chunk = bytes(self._stream_buffer)
                await self._ws.send_bytes(final_chunk)
            except Exception as e:
                logger.error(f"Error sending final audio: {e}")
        
        if self._ws:
            try:
                await self._ws.send_str(json.dumps({"type": "Terminate"}))
                await asyncio.sleep(0.5)  
            except Exception as e:
                logger.error(f"Error sending termination: {e}")

        if self._ws_task:
            self._ws_task.cancel()
            try:
                await self._ws_task
            except asyncio.CancelledError:
                pass
            self._ws_task = None
            
        if self._ws:
            await self._ws.close()
            self._ws = None
            
        if self._session:
            await self._session.close()
            self._session = None
        await super().aclose()

Base class for Speech-to-Text implementations

Initialize the Deepgram STT plugin (Flux / v2 API).

Args

api_key : str | None, optional
Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the STT plugin. Defaults to "flux-general-en".
input_sample_rate : int
The input sample rate to use for the STT plugin. Defaults to 48000.
target_sample_rate : int
The target sample rate to use for the STT plugin. Defaults to 16000.
eager_eot_threshold : float
Eager end-of-turn threshold. Defaults to 0.6.
eot_threshold : float
End-of-turn threshold. Defaults to 0.8.
eot_timeout_ms : int
End-of-turn timeout in milliseconds. Defaults to 7000.
keyterm : list[str] | None
Optional list of keyterms/phrases to improve recognition (Keyterm Prompting). Each entry is a keyterm or multi-word phrase (e.g. "tretinoin", "customer service"). Formatting is preserved (e.g. "Deepgram", "iPhone"). Max 500 tokens total across all keyterms. Defaults to None.
tag
List of tags to add to the requests for usage reporting. Defaults to None.
base_url : str
The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v2/listen".
enable_preemptive_generation : bool
Enable preemptive generation based on EagerEndOfTurn events. Defaults to False.

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    
    if len(self._stream_buffer) >= self._min_chunk_size and self._ws:
        try:
            final_chunk = bytes(self._stream_buffer)
            await self._ws.send_bytes(final_chunk)
        except Exception as e:
            logger.error(f"Error sending final audio: {e}")
    
    if self._ws:
        try:
            await self._ws.send_str(json.dumps({"type": "Terminate"}))
            await asyncio.sleep(0.5)  
        except Exception as e:
            logger.error(f"Error sending termination: {e}")

    if self._ws_task:
        self._ws_task.cancel()
        try:
            await self._ws_task
        except asyncio.CancelledError:
            pass
        self._ws_task = None
        
    if self._ws:
        await self._ws.close()
        self._ws = None
        
    if self._session:
        await self._session.close()
        self._session = None
    await super().aclose()

Cleanup resources

async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None
Expand source code
async def process_audio(
    self,
    audio_frames: bytes,
    **kwargs: Any
) -> None:
    """Process audio frames and send to Deeepgram's Flux API"""
    
    if not self._ws:
        await self._connect_ws()
        self._ws_task = asyncio.create_task(self._listen_for_responses())
        
    try:
        resampled_audio = self._resample_audio(audio_frames)
        if not resampled_audio:
            return
            
        self._stream_buffer.extend(resampled_audio)
         # chunk size 100ms
        while len(self._stream_buffer) >= self._target_chunk_size:
            chunk_to_send = bytes(self._stream_buffer[:self._target_chunk_size])
            self._stream_buffer = self._stream_buffer[self._target_chunk_size:]
            
            await self._ws.send_bytes(bytes(chunk_to_send))
            
    except Exception as e:
        logger.error(f"Error in process_audio: {str(e)}")
        self.emit("error", str(e))
        if self._ws:
            await self._ws.close()
            self._ws = None
            if self._ws_task:
                self._ws_task.cancel()
                self._ws_task = None

Process audio frames and send to Deeepgram's Flux API

class DeepgramTTS (*,
api_key: str | None = None,
model: str = 'aura-2-thalia-en',
encoding: str = 'linear16',
sample_rate: int = 24000,
base_url: str = 'wss://api.deepgram.com/v1/speak',
**kwargs: Any)
Expand source code
class DeepgramTTS(TTS):
    def __init__(
            self,
            *,
            api_key: str | None = None,
            model: str = DEFAULT_MODEL,
            encoding: str = DEFAULT_ENCODING,
            sample_rate: int = DEEPGRAM_SAMPLE_RATE,
            base_url: str = API_BASE_URL,
            **kwargs: Any,
    ) -> None:
        super().__init__(sample_rate=sample_rate, num_channels=DEEPGRAM_CHANNELS)

        self.model = model
        self.encoding = encoding
        self.base_url = base_url
        self.audio_track = None
        self.loop = None
        self._ws_session: aiohttp.ClientSession | None = None
        self._ws_connection: aiohttp.ClientWebSocketResponse | None = None
        self._send_task: asyncio.Task | None = None
        self._recv_task: asyncio.Task | None = None
        self._should_stop = False
        self._first_chunk_sent = False
        self._connection_lock = asyncio.Lock()

        self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Deepgram API key must be provided either through the 'api_key' parameter or the DEEPGRAM_API_KEY environment variable."
            )

    def reset_first_audio_tracking(self) -> None:
        self._first_chunk_sent = False

    async def _ensure_connection(self) -> None:
        async with self._connection_lock:
            if self._ws_connection and not self._ws_connection.closed:
                return

            params = {
                "model": self.model,
                "encoding": self.encoding,
                "sample_rate": self.sample_rate,
            }
            param_string = "&".join([f"{k}={v}" for k, v in params.items()])
            full_ws_url = f"{self.base_url}?{param_string}"
            headers = {"Authorization": f"Token {self.api_key}"}

            self._ws_session = aiohttp.ClientSession()
            self._ws_connection = await asyncio.wait_for(
                self._ws_session.ws_connect(full_ws_url, headers=headers),
                timeout=50.0
            )
            if self._recv_task and not self._recv_task.done():
                self._recv_task.cancel()
            self._recv_task = asyncio.create_task(self._receive_audio_task())

    async def synthesize(
            self,
            text: AsyncIterator[str] | str,
            **kwargs: Any,
    ) -> None:
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            self._should_stop = True
            if self._send_task and not self._send_task.done():
                self._send_task.cancel()
                
            self._should_stop = False
            await self._stream_synthesis(text)

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")
            raise

    async def _stream_synthesis(self, text: Union[AsyncIterator[str], str]) -> None:
        try:
            await self._ensure_connection()
            self._send_task = asyncio.create_task(self._send_text_task(text))
            await self._send_task
        except Exception as e:
            self.emit("error", f"Streaming synthesis failed: {str(e)}")
            await self.aclose()
        finally:
            if self._send_task and not self._send_task.done():
                self._send_task.cancel()
            self._send_task = None

    async def _send_text_task(self, text: Union[AsyncIterator[str], str]) -> None:
        if not self._ws_connection or self._ws_connection.closed:
            return

        try:
            if isinstance(text, str):
                if not self._should_stop:
                    payload = {"type": "Speak", "text": text}
                    await self._ws_connection.send_json(payload)
            else:
                async for chunk in text:
                    if self._ws_connection.closed or self._should_stop:
                        break
                    payload = {"type": "Speak", "text": chunk}
                    await self._ws_connection.send_json(payload)

            if not self._ws_connection.closed and not self._should_stop:
                await self._ws_connection.send_json({"type": "Flush"})
        except asyncio.CancelledError:
            pass
        except Exception as e:
            if not self._should_stop:
                self.emit("error", f"Send task error: {str(e)}")

    async def _receive_audio_task(self) -> None:
        if not self._ws_connection:
            return

        try:
            while not self._ws_connection.closed:
                msg = await self._ws_connection.receive()

                if msg.type == aiohttp.WSMsgType.BINARY:
                    if not self._should_stop:
                        await self._stream_audio_chunks(msg.data)
                elif msg.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(msg.data)
                    if data.get('type') == 'Error' and not self._should_stop:
                        self.emit("error", f"Deepgram error: {data.get('description', 'Unknown error')}")
                        break
                elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING):
                    break
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    raise ConnectionError(f"WebSocket error: {self._ws_connection.exception()}")
        except asyncio.CancelledError:
            pass
        except Exception as e:
            if not self._should_stop:
                self.emit("error", f"Receive task error: {str(e)}")

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        if not audio_bytes or self._should_stop:
            return

        if self.audio_track and self.loop:
            await self.audio_track.add_new_bytes(audio_bytes)

    async def interrupt(self) -> None:
        self._should_stop = True

        if self.audio_track:
            self.audio_track.interrupt()

        if self._send_task and not self._send_task.done():
            self._send_task.cancel()


    async def aclose(self) -> None:
        self._should_stop = True

        for task in [self._send_task, self._recv_task]:
            if task and not task.done():
                task.cancel()

        if self._ws_connection and not self._ws_connection.closed:
            await self._ws_connection.close()
        if self._ws_session and not self._ws_session.closed:
            await self._ws_session.close()

        self._ws_connection = None
        self._ws_session = None

        await super().aclose()

Base class for Text-to-Speech implementations

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._should_stop = True

    for task in [self._send_task, self._recv_task]:
        if task and not task.done():
            task.cancel()

    if self._ws_connection and not self._ws_connection.closed:
        await self._ws_connection.close()
    if self._ws_session and not self._ws_session.closed:
        await self._ws_session.close()

    self._ws_connection = None
    self._ws_session = None

    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    self._should_stop = True

    if self.audio_track:
        self.audio_track.interrupt()

    if self._send_task and not self._send_task.done():
        self._send_task.cancel()

Interrupt the TTS process

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self, text: AsyncIterator[str] | str, **kwargs: Any) ‑> None
Expand source code
async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        **kwargs: Any,
) -> None:
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        self._should_stop = True
        if self._send_task and not self._send_task.done():
            self._send_task.cancel()
            
        self._should_stop = False
        await self._stream_synthesis(text)

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")
        raise

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None