Package videosdk.plugins.deepgram
Sub-modules
videosdk.plugins.deepgram.sttvideosdk.plugins.deepgram.stt_v2videosdk.plugins.deepgram.tts
Classes
class DeepgramSTT (*,
api_key: str | None = None,
model: str = 'nova-2',
language: str = 'en-US',
interim_results: bool = True,
punctuate: bool = True,
smart_format: bool = True,
sample_rate: int = 48000,
endpointing: int = 50,
filler_words: bool = True,
keywords: list[str] | None = None,
keyterm: list[str] | None = None,
profanity_filter: bool = False,
numerals: bool = False,
tag: Union[str, List[str]] | None = None,
enable_diarization: bool = False,
base_url: str = 'wss://api.deepgram.com/v1/listen')-
Expand source code
class DeepgramSTT(BaseSTT): def __init__( self, *, api_key: str | None = None, model: str = "nova-2", language: str = "en-US", interim_results: bool = True, punctuate: bool = True, smart_format: bool = True, sample_rate: int = 48000, endpointing: int = 50, filler_words: bool = True, keywords: list[str] | None = None, keyterm: list[str] | None = None, profanity_filter: bool = False, numerals:bool=False, tag:Union[str,List[str]]|None = None, enable_diarization:bool=False, base_url: str = "wss://api.deepgram.com/v1/listen", ) -> None: """Initialize the Deepgram STT plugin Args: api_key (str | None, optional): Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None. model (str): The model to use for the STT plugin. Defaults to "nova-2". Use "nova-3" or "nova-3-general" for Nova-3. language (str): The language to use for the STT plugin. Defaults to "en-US". interim_results (bool): Whether to return interim results. Defaults to True. punctuate (bool): Whether to add punctuation. Defaults to True. smart_format (bool): Whether to use smart formatting. Defaults to True. sample_rate (int): Sample rate to use for the STT plugin. Defaults to 48000. endpointing (int): Endpointing threshold. Defaults to 50, set 0 to make false. filler_words (bool): Whether to include filler words. Defaults to True. keywords (list[str] | None): Optional keywords for boosting/suppression. Only for Nova-2, Nova-1, Enhanced, Base. Each entry is a keyword or "keyword:intensifier" (e.g. "snuffleupagus:5", "kansas:-10"). Max 100. Defaults to None. keyterm (list[str] | None): Optional keyterms/phrases for Keyterm Prompting. Only for Nova-3 (e.g. model="nova-3"). Each entry is a keyterm or phrase (e.g. "tretinoin", "customer service"). Max 500 tokens total. Defaults to None. profanity_filter: Whether to filter profanity from the transcription. Defaults to False. numerals: Whether to include numerals in the transcription. Defaults to False. tag: List of tags to add to the requests for usage reporting. Defaults to None. enable_diarization: Diarize recognizes speaker changes and assigns a speaker to each word in the transcript. base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v1/listen". """ super().__init__() self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY") if not self.api_key: raise ValueError( "Deepgram API key must be provided either through api_key parameter or DEEPGRAM_API_KEY environment variable") self.model = model _is_nova3 = model == "nova-3" or model.startswith("nova-3-") if _is_nova3 and keywords: raise ValueError( "Keywords are not supported for Nova-3. Use keyterm=... for Keyterm Prompting instead." ) self.language = language self.sample_rate = sample_rate self.interim_results = interim_results self.punctuate = punctuate self.smart_format = smart_format self.endpointing = endpointing self.filler_words = filler_words self.keywords = keywords self.keyterm = keyterm self.profanity_filter = profanity_filter self.numerals = numerals self.tag = tag self.enable_diarization= enable_diarization self.base_url = base_url self._session: Optional[aiohttp.ClientSession] = None self._ws: Optional[aiohttp.ClientWebSocketResponse] = None self._ws_task: Optional[asyncio.Task] = None self._last_speech_event_time = 0.0 self._previous_speech_event_time = 0.0 self._closed = False async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """Process audio frames and send to Deepgram's Streaming API""" if self._closed: return if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: await self._ws.send_bytes(audio_frames) except Exception as e: logger.error(f"Error in process_audio: {str(e)}") self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = None async def _listen_for_responses(self) -> None: """Background task to listen for WebSocket responses""" if not self._ws: return try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: data = msg.json() responses = self._handle_ws_message(data) for response in responses: if self._transcript_callback: await self._transcript_callback(response) elif msg.type == aiohttp.WSMsgType.ERROR: logger.error(f"WebSocket error: {self._ws.exception()}") self.emit( "error", f"WebSocket error: {self._ws.exception()}") break except Exception as e: logger.error(f"Error in WebSocket listener: {str(e)}") self.emit("error", f"Error in WebSocket listener: {str(e)}") finally: if self._ws: await self._ws.close() self._ws = None async def _connect_ws(self) -> None: """Establish WebSocket connection with Deepgram's Streaming API""" if not self._session: self._session = aiohttp.ClientSession() if self.endpointing < 0: endpointing = "false" else: endpointing = self.endpointing query_params = { "model": self.model, "language": self.language, "interim_results": str(self.interim_results).lower(), "punctuate": str(self.punctuate).lower(), "smart_format": str(self.smart_format).lower(), "encoding": "linear16", "sample_rate": str(self.sample_rate), "channels": 2, "endpointing": endpointing, "filler_words": str(self.filler_words).lower(), "vad_events": "true", "no_delay": "true", "profanity_filter":str(self.profanity_filter).lower(), "numerals":str(self.numerals).lower(), "diarize":str(self.enable_diarization).lower() } params_list = list(query_params.items()) if self.tag is not None: params_list.append(("tag",self.tag)) _is_nova3 = self.model == "nova-3" or self.model.startswith("nova-3-") if _is_nova3 and self.keyterm: for t in self.keyterm: if t.strip(): params_list.append(("keyterm", t.strip())) elif not _is_nova3 and self.keywords: for kw in self.keywords[:100]: params_list.append(("keywords", kw)) headers = { "Authorization": f"Token {self.api_key}", } ws_url = f"{self.base_url}?{urlencode(params_list)}" try: self._ws = await self._session.ws_connect(ws_url, headers=headers) except Exception as e: logger.error(f"Error connecting to WebSocket: {str(e)}") raise def _handle_ws_message(self, msg: dict) -> list[STTResponse]: """Handle incoming WebSocket messages and generate STT responses""" responses = [] try: if msg["type"] == "SpeechStarted": current_time = time.time() if self._last_speech_event_time == 0.0: self._last_speech_event_time = current_time return responses if current_time - self._last_speech_event_time < 1.0: global_event_emitter.emit("speech_started") self._previous_speech_event_time = self._last_speech_event_time self._last_speech_event_time = current_time if msg["type"] == "Results": channel = msg["channel"] alternatives = channel["alternatives"] if alternatives and len(alternatives) > 0: alt = alternatives[0] is_final = msg["is_final"] if alt["transcript"] == "": return responses response = STTResponse( event_type=SpeechEventType.FINAL if is_final else SpeechEventType.INTERIM, data=SpeechData( text=alt["transcript"], language=self.language, confidence=alt.get("confidence", 0.0), start_time=alt["words"][0]["start"] if alt["words"] else 0.0, end_time=alt["words"][-1]["end"] if alt["words"] else 0.0, duration=msg["duration"] ), metadata={"model": self.model} ) responses.append(response) except Exception as e: logger.error(f"Error handling WebSocket message: {str(e)}") return responses async def flush(self) -> None: """Send flush signal to Sarvam to trigger immediate transcription.""" if self._ws and not self._ws.closed: flush_message = {"type": "Finalize"} await self._ws.send_str(json.dumps(flush_message)) async def aclose(self) -> None: """Cleanup resources""" self._closed = True if self._ws_task: self._ws_task.cancel() logger.info("DeepgramSTT WebSocket task cancelled") try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None logger.info("DeepgramSTT WebSocket task cleared") if self._ws: await self._ws.close() logger.info("DeepgramSTT WebSocket closed") self._ws = None if self._session: await self._session.close() logger.info("DeepgramSTT cleaned up") self._session = None # Call base class cleanup await super().aclose()Base class for Speech-to-Text implementations
Initialize the Deepgram STT plugin
Args
api_key:str | None, optional- Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
model:str- The model to use for the STT plugin. Defaults to "nova-2". Use "nova-3" or "nova-3-general" for Nova-3.
language:str- The language to use for the STT plugin. Defaults to "en-US".
interim_results:bool- Whether to return interim results. Defaults to True.
punctuate:bool- Whether to add punctuation. Defaults to True.
smart_format:bool- Whether to use smart formatting. Defaults to True.
sample_rate:int- Sample rate to use for the STT plugin. Defaults to 48000.
endpointing:int- Endpointing threshold. Defaults to 50, set 0 to make false.
filler_words:bool- Whether to include filler words. Defaults to True.
keywords:list[str] | None- Optional keywords for boosting/suppression. Only for Nova-2, Nova-1, Enhanced, Base. Each entry is a keyword or "keyword:intensifier" (e.g. "snuffleupagus:5", "kansas:-10"). Max 100. Defaults to None.
keyterm:list[str] | None- Optional keyterms/phrases for Keyterm Prompting. Only for Nova-3 (e.g. model="nova-3"). Each entry is a keyterm or phrase (e.g. "tretinoin", "customer service"). Max 500 tokens total. Defaults to None.
profanity_filter- Whether to filter profanity from the transcription. Defaults to False.
numerals- Whether to include numerals in the transcription. Defaults to False.
tag- List of tags to add to the requests for usage reporting. Defaults to None.
enable_diarization- Diarize recognizes speaker changes and assigns a speaker to each word in the transcript.
base_url:str- The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v1/listen".
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" self._closed = True if self._ws_task: self._ws_task.cancel() logger.info("DeepgramSTT WebSocket task cancelled") try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None logger.info("DeepgramSTT WebSocket task cleared") if self._ws: await self._ws.close() logger.info("DeepgramSTT WebSocket closed") self._ws = None if self._session: await self._session.close() logger.info("DeepgramSTT cleaned up") self._session = None # Call base class cleanup await super().aclose()Cleanup resources
async def flush(self) ‑> None-
Expand source code
async def flush(self) -> None: """Send flush signal to Sarvam to trigger immediate transcription.""" if self._ws and not self._ws.closed: flush_message = {"type": "Finalize"} await self._ws.send_str(json.dumps(flush_message))Send flush signal to Sarvam to trigger immediate transcription.
async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None-
Expand source code
async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """Process audio frames and send to Deepgram's Streaming API""" if self._closed: return if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: await self._ws.send_bytes(audio_frames) except Exception as e: logger.error(f"Error in process_audio: {str(e)}") self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = NoneProcess audio frames and send to Deepgram's Streaming API
class DeepgramSTTV2 (*,
api_key: str | None = None,
model: str = 'flux-general-en',
input_sample_rate: int = 48000,
target_sample_rate: int = 16000,
eager_eot_threshold: float = 0.6,
eot_threshold: float = 0.8,
eot_timeout_ms: int = 7000,
keyterm: list[str] | None = None,
language_hint: list[str] | None = None,
tag: Union[str, List[str]] | None = None,
base_url: str = 'wss://api.deepgram.com/v2/listen',
enable_preemptive_generation: bool = False)-
Expand source code
class DeepgramSTTV2(BaseSTT): def __init__( self, *, api_key: str | None = None, model: str = "flux-general-en", input_sample_rate: int = 48000, target_sample_rate: int = 16000, eager_eot_threshold:float=0.6, eot_threshold:float=0.8, eot_timeout_ms:int=7000, keyterm: list[str] | None = None, language_hint: list[str] | None = None, tag:Union[str,List[str]]|None = None, base_url: str = "wss://api.deepgram.com/v2/listen", enable_preemptive_generation: bool = False, ) -> None: """Initialize the Deepgram STT plugin (Flux / v2 API). Args: api_key (str | None, optional): Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None. model (str): The model to use for the STT plugin. Defaults to "flux-general-en". input_sample_rate (int): The input sample rate to use for the STT plugin. Defaults to 48000. target_sample_rate (int): The target sample rate to use for the STT plugin. Defaults to 16000. eager_eot_threshold (float): Eager end-of-turn threshold. Defaults to 0.6. eot_threshold (float): End-of-turn threshold. Defaults to 0.8. eot_timeout_ms (int): End-of-turn timeout in milliseconds. Defaults to 7000. keyterm (list[str] | None): Optional list of keyterms/phrases to improve recognition (Keyterm Prompting). Each entry is a keyterm or multi-word phrase (e.g. "tretinoin", "customer service"). Formatting is preserved (e.g. "Deepgram", "iPhone"). Max 500 tokens total across all keyterms. Defaults to None. language_hint (list[str] | None): Optional list of language hints to bias the model for multilingual workloads. Defaults to None. tag: List of tags to add to the requests for usage reporting. Defaults to None. base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v2/listen". enable_preemptive_generation (bool): Enable preemptive generation based on EagerEndOfTurn events. Defaults to False. """ super().__init__() self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY") if not self.api_key: raise ValueError( "Deepgram API key must be provided either through api_key parameter or DEEPGRAM_API_KEY environment variable") self.model = model self.input_sample_rate = input_sample_rate self.target_sample_rate = target_sample_rate self.eager_eot_threshold = eager_eot_threshold self.eot_threshold=eot_threshold self.eot_timeout_ms = eot_timeout_ms self.keyterm = keyterm self.language_hint = language_hint self.tag=tag self.base_url = base_url self.enable_preemptive_generation = enable_preemptive_generation self._stream_buffer = bytearray() self._target_chunk_size = int(0.1 * self.target_sample_rate * 2) self._min_chunk_size = int(0.05 * self.target_sample_rate * 2) self._session: Optional[aiohttp.ClientSession] = None self._ws: Optional[aiohttp.ClientWebSocketResponse] = None self._ws_task: Optional[asyncio.Task] = None self._last_transcript: str = "" self._ws_task = None async def process_audio( self, audio_frames: bytes, **kwargs: Any ) -> None: """Process audio frames and send to Deeepgram's Flux API""" if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: resampled_audio = await asyncio.to_thread(self._resample_audio, audio_frames) if not resampled_audio: return self._stream_buffer.extend(resampled_audio) # chunk size 100ms while len(self._stream_buffer) >= self._target_chunk_size: chunk_to_send = bytes(self._stream_buffer[:self._target_chunk_size]) self._stream_buffer = self._stream_buffer[self._target_chunk_size:] await self._ws.send_bytes(bytes(chunk_to_send)) except Exception as e: logger.error(f"Error in process_audio: {str(e)}") self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = None async def _listen_for_responses(self) -> None: """Background task to listen for WebSocket responses""" if not self._ws: return try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: data = msg.json() responses = self._handle_ws_message(data) for response in responses: if self._transcript_callback: await self._transcript_callback(response) elif msg.type == aiohttp.WSMsgType.ERROR: logger.error(f"WebSocket error: {self._ws.exception()}") self.emit( "error", f"WebSocket error: {self._ws.exception()}") break except Exception as e: logger.error(f"Error in WebSocket listener: {str(e)}") self.emit("error", f"Error in WebSocket listener: {str(e)}") finally: if self._ws: await self._ws.close() self._ws = None async def _connect_ws(self) -> None: """Establish WebSocket connection with Deepgram's Streaming API""" if not self._session: self._session = aiohttp.ClientSession() query_params = { "model": self.model, "encoding": "linear16", "sample_rate": self.target_sample_rate, "eot_threshold": self.eot_threshold, "eot_timeout_ms": self.eot_timeout_ms, "eager_eot_threshold": self.eager_eot_threshold, } params_list = list(query_params.items()) if self.tag is not None: params_list.append(("tag",self.tag)) if self.keyterm: for t in self.keyterm: if t.strip(): params_list.append(("keyterm", t.strip())) if self.language_hint: for t in self.language_hint: if t.strip(): params_list.append(("language_hint", t.strip())) headers = {"Authorization": f"Token {self.api_key}"} ws_url = f"{self.base_url}?{urlencode(params_list)}" try: self._ws = await self._session.ws_connect(ws_url, headers=headers) logger.info("Connected to Deepgram V2 WebSocket.") except Exception as e: logger.error(f"Error connecting to WebSocket: {str(e)}") raise def _handle_ws_message(self, msg: dict) -> list[STTResponse]: """Handle incoming WebSocket messages and generate STT responses""" responses = [] try: if msg.get("type") != "TurnInfo": return responses event = msg.get("event") transcript = msg.get("transcript", "") # logger.info(f"{event} and {transcript}") start_time = msg.get("audio_window_start", 0.0) end_time = msg.get("audio_window_end", 0.0) confidence = msg.get("end_of_turn_confidence", 0.0) duration = end_time - start_time self._last_transcript = transcript # Emit turn-related events if event == "StartOfTurn": global_event_emitter.emit("speech_started") elif event == "EagerEndOfTurn": # Handle EagerEndOfTurn for preemptive generation if self.enable_preemptive_generation and transcript and self._transcript_callback: responses.append( STTResponse( event_type=SpeechEventType.PREFLIGHT, data=SpeechData( text=transcript, confidence=confidence, start_time=start_time, end_time=end_time, duration=duration, ), metadata={"model": self.model}, ) ) elif event == "EndOfTurn": logger.info(f"EndOfTurn (FINAL) Transcript: {transcript} and Confidence: {confidence}") global_event_emitter.emit("speech_stopped") if transcript and self._transcript_callback: responses.append( STTResponse( event_type=SpeechEventType.FINAL, data=SpeechData( text=transcript, confidence=confidence, start_time=start_time, end_time=end_time, duration=duration, ), metadata={"model": self.model}, ) ) elif event == "TurnResumed": # Send interim to signal user continued speaking if self.enable_preemptive_generation and transcript: responses.append( STTResponse( event_type=SpeechEventType.INTERIM, data=SpeechData( text=transcript, confidence=confidence, start_time=start_time, end_time=end_time, duration=duration, ), metadata={"model": self.model, "turn_resumed": True}, ) ) except Exception as e: logger.error(f"Error handling WebSocket message: {str(e)}") return responses def _resample_audio(self, audio_bytes: bytes) -> bytes: """Resample audio from input sample rate to target sample rate and convert to mono.""" try: if not audio_bytes: return b'' raw_audio = np.frombuffer(audio_bytes, dtype=np.int16) if raw_audio.size == 0: return b'' if raw_audio.size % 2 == 0: stereo_audio = raw_audio.reshape(-1, 2) mono_audio = stereo_audio.astype(np.float32).mean(axis=1) else: mono_audio = raw_audio.astype(np.float32) if self.input_sample_rate != self.target_sample_rate: target_length = int(len(mono_audio) * self.target_sample_rate / self.input_sample_rate) resampled_data = signal.resample(mono_audio, target_length) else: resampled_data = mono_audio resampled_data = np.clip(resampled_data, -32767, 32767) return resampled_data.astype(np.int16).tobytes() except Exception as e: logger.error(f"Error resampling audio: {e}") return b'' async def flush(self) -> None: """ Deepgram Flux (v2 API) handles turn detection internally and automatically emits EndOfTurn events. There is no explicit Finalize command without terminating the connection via CloseStream. This method is a no-op to satisfy the BaseSTT interface requirements. """ pass async def aclose(self) -> None: """Cleanup resources""" if len(self._stream_buffer) >= self._min_chunk_size and self._ws: try: final_chunk = bytes(self._stream_buffer) await self._ws.send_bytes(final_chunk) except Exception as e: logger.error(f"Error sending final audio: {e}") if self._ws: try: await self._ws.send_str(json.dumps({"type": "CloseStream"})) await asyncio.sleep(0.5) except Exception as e: logger.error(f"Error sending termination: {e}") if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: await self._ws.close() self._ws = None if self._session: await self._session.close() self._session = None await super().aclose()Base class for Speech-to-Text implementations
Initialize the Deepgram STT plugin (Flux / v2 API).
Args
api_key:str | None, optional- Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
model:str- The model to use for the STT plugin. Defaults to "flux-general-en".
input_sample_rate:int- The input sample rate to use for the STT plugin. Defaults to 48000.
target_sample_rate:int- The target sample rate to use for the STT plugin. Defaults to 16000.
eager_eot_threshold:float- Eager end-of-turn threshold. Defaults to 0.6.
eot_threshold:float- End-of-turn threshold. Defaults to 0.8.
eot_timeout_ms:int- End-of-turn timeout in milliseconds. Defaults to 7000.
keyterm:list[str] | None- Optional list of keyterms/phrases to improve recognition (Keyterm Prompting). Each entry is a keyterm or multi-word phrase (e.g. "tretinoin", "customer service"). Formatting is preserved (e.g. "Deepgram", "iPhone"). Max 500 tokens total across all keyterms. Defaults to None.
language_hint:list[str] | None- Optional list of language hints to bias the model for multilingual workloads. Defaults to None.
tag- List of tags to add to the requests for usage reporting. Defaults to None.
base_url:str- The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v2/listen".
enable_preemptive_generation:bool- Enable preemptive generation based on EagerEndOfTurn events. Defaults to False.
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" if len(self._stream_buffer) >= self._min_chunk_size and self._ws: try: final_chunk = bytes(self._stream_buffer) await self._ws.send_bytes(final_chunk) except Exception as e: logger.error(f"Error sending final audio: {e}") if self._ws: try: await self._ws.send_str(json.dumps({"type": "CloseStream"})) await asyncio.sleep(0.5) except Exception as e: logger.error(f"Error sending termination: {e}") if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: await self._ws.close() self._ws = None if self._session: await self._session.close() self._session = None await super().aclose()Cleanup resources
async def flush(self) ‑> None-
Expand source code
async def flush(self) -> None: """ Deepgram Flux (v2 API) handles turn detection internally and automatically emits EndOfTurn events. There is no explicit Finalize command without terminating the connection via CloseStream. This method is a no-op to satisfy the BaseSTT interface requirements. """ passDeepgram Flux (v2 API) handles turn detection internally and automatically emits EndOfTurn events. There is no explicit Finalize command without terminating the connection via CloseStream. This method is a no-op to satisfy the BaseSTT interface requirements.
async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None-
Expand source code
async def process_audio( self, audio_frames: bytes, **kwargs: Any ) -> None: """Process audio frames and send to Deeepgram's Flux API""" if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: resampled_audio = await asyncio.to_thread(self._resample_audio, audio_frames) if not resampled_audio: return self._stream_buffer.extend(resampled_audio) # chunk size 100ms while len(self._stream_buffer) >= self._target_chunk_size: chunk_to_send = bytes(self._stream_buffer[:self._target_chunk_size]) self._stream_buffer = self._stream_buffer[self._target_chunk_size:] await self._ws.send_bytes(bytes(chunk_to_send)) except Exception as e: logger.error(f"Error in process_audio: {str(e)}") self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = NoneProcess audio frames and send to Deeepgram's Flux API
class DeepgramTTS (*,
api_key: str | None = None,
model: str = 'aura-2-andromeda-en',
encoding: str = 'linear16',
sample_rate: int = 24000,
base_url: str = 'wss://api.deepgram.com/v1/speak',
max_connection_age_sec: float = 300.0,
**kwargs: Any)-
Expand source code
class DeepgramTTS(TTS): def __init__( self, *, api_key: str | None = None, model: str = DEFAULT_MODEL, encoding: str = DEFAULT_ENCODING, sample_rate: int = DEEPGRAM_SAMPLE_RATE, base_url: str = API_BASE_URL, max_connection_age_sec: float = DEFAULT_CONNECTION_MAX_AGE_SEC, **kwargs: Any, ) -> None: """Initialize the Deepgram TTS plugin. Args: api_key: Deepgram API key. Falls back to ``DEEPGRAM_API_KEY`` env var. model: Deepgram TTS voice/model id. Defaults to ``aura-2-andromeda-en``. encoding: Output audio encoding. Defaults to ``linear16``. sample_rate: Output sample rate. base_url: WebSocket base URL. max_connection_age_sec: Refresh the WebSocket after this many seconds to avoid hitting Deepgram's idle/session limits. """ super().__init__(sample_rate=sample_rate, num_channels=DEEPGRAM_CHANNELS) self.model = model self.encoding = encoding self.base_url = base_url self.audio_track = None self.loop = None self._max_connection_age_sec = max_connection_age_sec self._ws_session: aiohttp.ClientSession | None = None self._ws_connection: aiohttp.ClientWebSocketResponse | None = None self._ws_connect_time: float = 0.0 self._connection_lock = asyncio.Lock() self._receive_task: asyncio.Task | None = None self._active_future: asyncio.Future[None] | None = None self._active_send_task: asyncio.Task | None = None self._interrupted = False self._first_chunk_sent = False self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY") if not self.api_key: raise ValueError( "Deepgram API key must be provided either through the 'api_key' parameter or the DEEPGRAM_API_KEY environment variable." ) def reset_first_audio_tracking(self) -> None: self._first_chunk_sent = False async def prewarm(self) -> None: """Pre-establish the Deepgram WebSocket so the first ``synthesize()`` call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly.""" try: await self._ensure_connection() except Exception as e: logger.warning(f"Deepgram TTS prewarm failed (non-fatal): {e}") async def _ensure_connection(self) -> None: async with self._connection_lock: now = asyncio.get_event_loop().time() if self._ws_connection and not self._ws_connection.closed: age = now - self._ws_connect_time if age < self._max_connection_age_sec: return logger.info(f"Refreshing Deepgram WebSocket (age={age:.1f}s)") await self._close_connection_locked() elif self._ws_connection or self._ws_session: await self._close_connection_locked() params = { "model": self.model, "encoding": self.encoding, "sample_rate": self.sample_rate, } param_string = "&".join([f"{k}={v}" for k, v in params.items()]) full_ws_url = f"{self.base_url}?{param_string}" headers = {"Authorization": f"Token {self.api_key}"} self._ws_session = aiohttp.ClientSession() self._ws_connection = await asyncio.wait_for( self._ws_session.ws_connect( full_ws_url, headers=headers, heartbeat=30.0 ), timeout=10.0, ) self._ws_connect_time = now self._receive_task = asyncio.create_task(self._receive_audio_task()) async def _close_connection_locked(self) -> None: if self._receive_task and not self._receive_task.done(): self._receive_task.cancel() try: await self._receive_task except (asyncio.CancelledError, Exception): pass self._receive_task = None if self._ws_connection and not self._ws_connection.closed: try: await self._ws_connection.close() except Exception: pass self._ws_connection = None if self._ws_session and not self._ws_session.closed: try: await self._ws_session.close() except Exception: pass self._ws_session = None async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """Synthesize text via Deepgram's streaming WebSocket API. Each ``FlushMarker`` in the input stream is forwarded to Deepgram as a ``{"type": "Flush"}`` message, letting the server emit audio for the completed sentence without waiting for end-of-stream. """ try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._interrupted = False await self._ensure_connection() if not self._ws_connection: raise RuntimeError("WebSocket connection is not available.") done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future() self._active_future = done_future send_task = asyncio.create_task(self._send_text_task(text, done_future)) self._active_send_task = send_task try: await done_future except asyncio.CancelledError: pass await send_task except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") raise finally: if self._active_future is done_future: self._active_future = None if self._active_send_task is send_task: self._active_send_task = None async def _send_text_task( self, text: Union[AsyncIterator[Union[str, FlushMarker]], str], done_future: asyncio.Future[None], ) -> None: if not self._ws_connection or self._ws_connection.closed: if not done_future.done(): done_future.set_exception(RuntimeError("WebSocket closed")) return has_sent = False try: if isinstance(text, str): if text and not self._interrupted: await self._ws_connection.send_json({"type": "Speak", "text": text}) has_sent = True else: async for chunk in text: if self._interrupted or self._ws_connection.closed: break if isinstance(chunk, FlushMarker): if has_sent: await self._ws_connection.send_json({"type": "Flush"}) continue if not chunk: continue await self._ws_connection.send_json({"type": "Speak", "text": chunk}) has_sent = True if has_sent and not self._interrupted and not self._ws_connection.closed: await self._ws_connection.send_json({"type": "Flush"}) if not done_future.done(): try: await asyncio.wait_for(asyncio.shield(done_future), timeout=30.0) except asyncio.TimeoutError: if not done_future.done(): done_future.set_result(None) except asyncio.CancelledError: pass except asyncio.CancelledError: if not done_future.done(): done_future.cancel() raise except Exception as e: if not self._interrupted: self.emit("error", f"Send task error: {str(e)}") if not done_future.done(): done_future.set_exception(e) async def _receive_audio_task(self) -> None: if not self._ws_connection: return try: while self._ws_connection and not self._ws_connection.closed: msg = await self._ws_connection.receive() if msg.type == aiohttp.WSMsgType.BINARY: fut = self._active_future if not self._interrupted and fut is not None and not fut.done(): await self._stream_audio_chunks(msg.data) elif msg.type == aiohttp.WSMsgType.TEXT: data = json.loads(msg.data) msg_type = data.get("type") if msg_type == "Error": err = data.get("description", "Unknown error") if not self._interrupted: self.emit("error", f"Deepgram error: {err}") fut = self._active_future if fut is not None and not fut.done(): fut.set_exception(RuntimeError(err)) elif msg_type == "Flushed": fut = self._active_future if fut is not None and not fut.done(): fut.set_result(None) elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING): break elif msg.type == aiohttp.WSMsgType.ERROR: raise ConnectionError(f"WebSocket error: {self._ws_connection.exception()}") except asyncio.CancelledError: raise except Exception as e: if not self._interrupted: self.emit("error", f"Receive task error: {str(e)}") fut = self._active_future if fut is not None and not fut.done(): fut.set_exception(e) async def _stream_audio_chunks(self, audio_bytes: bytes) -> None: if not audio_bytes or self._interrupted: return if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() if self.audio_track and self.loop: await self.audio_track.add_new_bytes(audio_bytes) async def interrupt(self) -> None: """Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; in-flight audio frames are dropped via the ``_active_future`` filter in the receive loop.""" self._interrupted = True if self.audio_track: self.audio_track.interrupt() if self._active_send_task and not self._active_send_task.done(): self._active_send_task.cancel() if self._active_future is not None and not self._active_future.done(): self._active_future.cancel() async def aclose(self) -> None: self._interrupted = True if self._active_send_task and not self._active_send_task.done(): self._active_send_task.cancel() async with self._connection_lock: await self._close_connection_locked() await super().aclose()Base class for Text-to-Speech implementations
Initialize the Deepgram TTS plugin.
Args
api_key- Deepgram API key. Falls back to
DEEPGRAM_API_KEYenv var. model- Deepgram TTS voice/model id. Defaults to
aura-2-andromeda-en. encoding- Output audio encoding. Defaults to
linear16. sample_rate- Output sample rate.
base_url- WebSocket base URL.
max_connection_age_sec- Refresh the WebSocket after this many seconds to avoid hitting Deepgram's idle/session limits.
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: self._interrupted = True if self._active_send_task and not self._active_send_task.done(): self._active_send_task.cancel() async with self._connection_lock: await self._close_connection_locked() await super().aclose()Cleanup resources
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; in-flight audio frames are dropped via the ``_active_future`` filter in the receive loop.""" self._interrupted = True if self.audio_track: self.audio_track.interrupt() if self._active_send_task and not self._active_send_task.done(): self._active_send_task.cancel() if self._active_future is not None and not self._active_future.done(): self._active_future.cancel()Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; in-flight audio frames are dropped via the
_active_futurefilter in the receive loop. async def prewarm(self) ‑> None-
Expand source code
async def prewarm(self) -> None: """Pre-establish the Deepgram WebSocket so the first ``synthesize()`` call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly.""" try: await self._ensure_connection() except Exception as e: logger.warning(f"Deepgram TTS prewarm failed (non-fatal): {e}")Pre-establish the Deepgram WebSocket so the first
synthesize()call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly. def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: self._first_chunk_sent = FalseReset the first audio tracking state for next TTS task
async def synthesize(self,
text: AsyncIterator[Union[str, FlushMarker]] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """Synthesize text via Deepgram's streaming WebSocket API. Each ``FlushMarker`` in the input stream is forwarded to Deepgram as a ``{"type": "Flush"}`` message, letting the server emit audio for the completed sentence without waiting for end-of-stream. """ try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._interrupted = False await self._ensure_connection() if not self._ws_connection: raise RuntimeError("WebSocket connection is not available.") done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future() self._active_future = done_future send_task = asyncio.create_task(self._send_text_task(text, done_future)) self._active_send_task = send_task try: await done_future except asyncio.CancelledError: pass await send_task except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") raise finally: if self._active_future is done_future: self._active_future = None if self._active_send_task is send_task: self._active_send_task = NoneSynthesize text via Deepgram's streaming WebSocket API.
Each
FlushMarkerin the input stream is forwarded to Deepgram as a{"type": "Flush"}message, letting the server emit audio for the completed sentence without waiting for end-of-stream.