Package videosdk.plugins.deepgram
Sub-modules
videosdk.plugins.deepgram.sttvideosdk.plugins.deepgram.stt_v2videosdk.plugins.deepgram.tts
Classes
class DeepgramSTT (*,
api_key: str | None = None,
model: str = 'nova-2',
language: str = 'en-US',
interim_results: bool = True,
punctuate: bool = True,
smart_format: bool = True,
sample_rate: int = 48000,
endpointing: int = 50,
filler_words: bool = True,
base_url: str = 'wss://api.deepgram.com/v1/listen')-
Expand source code
class DeepgramSTT(BaseSTT): def __init__( self, *, api_key: str | None = None, model: str = "nova-2", language: str = "en-US", interim_results: bool = True, punctuate: bool = True, smart_format: bool = True, sample_rate: int = 48000, endpointing: int = 50, filler_words: bool = True, base_url: str = "wss://api.deepgram.com/v1/listen", ) -> None: """Initialize the Deepgram STT plugin Args: api_key (str | None, optional): Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None. model (str): The model to use for the STT plugin. Defaults to "nova-2". language (str): The language to use for the STT plugin. Defaults to "en-US". interim_results (bool): Whether to return interim results. Defaults to True. punctuate (bool): Whether to add punctuation. Defaults to True. smart_format (bool): Whether to use smart formatting. Defaults to True. sample_rate (int): Sample rate to use for the STT plugin. Defaults to 48000. endpointing (int): Endpointing threshold. Defaults to 50. filler_words (bool): Whether to include filler words. Defaults to True. base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v1/listen". """ super().__init__() self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY") if not self.api_key: raise ValueError( "Deepgram API key must be provided either through api_key parameter or DEEPGRAM_API_KEY environment variable") self.model = model self.language = language self.sample_rate = sample_rate self.interim_results = interim_results self.punctuate = punctuate self.smart_format = smart_format self.endpointing = endpointing self.filler_words = filler_words self.base_url = base_url self._session: Optional[aiohttp.ClientSession] = None self._ws: Optional[aiohttp.ClientWebSocketResponse] = None self._ws_task: Optional[asyncio.Task] = None self._last_speech_event_time = 0.0 self._previous_speech_event_time = 0.0 async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """Process audio frames and send to Deepgram's Streaming API""" if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: await self._ws.send_bytes(audio_frames) except Exception as e: logger.error(f"Error in process_audio: {str(e)}") self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = None async def _listen_for_responses(self) -> None: """Background task to listen for WebSocket responses""" if not self._ws: return try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: data = msg.json() responses = self._handle_ws_message(data) for response in responses: if self._transcript_callback: await self._transcript_callback(response) elif msg.type == aiohttp.WSMsgType.ERROR: logger.error(f"WebSocket error: {self._ws.exception()}") self.emit( "error", f"WebSocket error: {self._ws.exception()}") break except Exception as e: logger.error(f"Error in WebSocket listener: {str(e)}") self.emit("error", f"Error in WebSocket listener: {str(e)}") finally: if self._ws: await self._ws.close() self._ws = None async def _connect_ws(self) -> None: """Establish WebSocket connection with Deepgram's Streaming API""" if not self._session: self._session = aiohttp.ClientSession() query_params = { "model": self.model, "language": self.language, "interim_results": str(self.interim_results).lower(), "punctuate": str(self.punctuate).lower(), "smart_format": str(self.smart_format).lower(), "encoding": "linear16", "sample_rate": str(self.sample_rate), "channels": 2, "endpointing": self.endpointing, "filler_words": str(self.filler_words).lower(), "vad_events": "true", "no_delay": "true", } headers = { "Authorization": f"Token {self.api_key}", } ws_url = f"{self.base_url}?{urlencode(query_params)}" try: self._ws = await self._session.ws_connect(ws_url, headers=headers) except Exception as e: logger.error(f"Error connecting to WebSocket: {str(e)}") raise def _handle_ws_message(self, msg: dict) -> list[STTResponse]: """Handle incoming WebSocket messages and generate STT responses""" responses = [] try: if msg["type"] == "SpeechStarted": current_time = time.time() if self._last_speech_event_time == 0.0: self._last_speech_event_time = current_time return responses if current_time - self._last_speech_event_time < 1.0: global_event_emitter.emit("speech_started") self._previous_speech_event_time = self._last_speech_event_time self._last_speech_event_time = current_time if msg["type"] == "Results": channel = msg["channel"] alternatives = channel["alternatives"] if alternatives and len(alternatives) > 0: alt = alternatives[0] is_final = msg["is_final"] if alt["transcript"] == "": return responses response = STTResponse( event_type=SpeechEventType.FINAL if is_final else SpeechEventType.INTERIM, data=SpeechData( text=alt["transcript"], language=self.language, confidence=alt.get("confidence", 0.0), start_time=alt["words"][0]["start"] if alt["words"] else 0.0, end_time=alt["words"][-1]["end"] if alt["words"] else 0.0, ), metadata={"model": self.model} ) responses.append(response) except Exception as e: logger.error(f"Error handling WebSocket message: {str(e)}") return responses async def aclose(self) -> None: """Cleanup resources""" if self._ws_task: self._ws_task.cancel() logger.info("DeepgramSTT WebSocket task cancelled") try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None logger.info("DeepgramSTT WebSocket task cleared") if self._ws: await self._ws.close() logger.info("DeepgramSTT WebSocket closed") self._ws = None if self._session: await self._session.close() logger.info("DeepgramSTT cleaned up") self._session = None # Call base class cleanup await super().aclose()Base class for Speech-to-Text implementations
Initialize the Deepgram STT plugin
Args
api_key:str | None, optional- Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
model:str- The model to use for the STT plugin. Defaults to "nova-2".
language:str- The language to use for the STT plugin. Defaults to "en-US".
interim_results:bool- Whether to return interim results. Defaults to True.
punctuate:bool- Whether to add punctuation. Defaults to True.
smart_format:bool- Whether to use smart formatting. Defaults to True.
sample_rate:int- Sample rate to use for the STT plugin. Defaults to 48000.
endpointing:int- Endpointing threshold. Defaults to 50.
filler_words:bool- Whether to include filler words. Defaults to True.
base_url:str- The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v1/listen".
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" if self._ws_task: self._ws_task.cancel() logger.info("DeepgramSTT WebSocket task cancelled") try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None logger.info("DeepgramSTT WebSocket task cleared") if self._ws: await self._ws.close() logger.info("DeepgramSTT WebSocket closed") self._ws = None if self._session: await self._session.close() logger.info("DeepgramSTT cleaned up") self._session = None # Call base class cleanup await super().aclose()Cleanup resources
async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None-
Expand source code
async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """Process audio frames and send to Deepgram's Streaming API""" if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: await self._ws.send_bytes(audio_frames) except Exception as e: logger.error(f"Error in process_audio: {str(e)}") self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = NoneProcess audio frames and send to Deepgram's Streaming API
class DeepgramSTTV2 (*,
api_key: str | None = None,
model: str = 'flux-general-en',
input_sample_rate: int = 48000,
target_sample_rate: int = 16000,
eager_eot_threshold: float = 0.6,
eot_threshold: float = 0.8,
eot_timeout_ms: int = 7000,
base_url: str = 'wss://api.deepgram.com/v2/listen',
enable_preemptive_generation: bool = False)-
Expand source code
class DeepgramSTTV2(BaseSTT): def __init__( self, *, api_key: str | None = None, model: str = "flux-general-en", input_sample_rate: int = 48000, target_sample_rate: int = 16000, eager_eot_threshold:float=0.6, eot_threshold:float=0.8, eot_timeout_ms:int=7000, base_url: str = "wss://api.deepgram.com/v2/listen", enable_preemptive_generation: bool = False, ) -> None: """Initialize the Deepgram STT plugin Args: api_key (str | None, optional): Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None. model (str): The model to use for the STT plugin. Defaults to "flux-general-en". input_sample_rate (int): The input sample rate to use for the STT plugin. Defaults to 48000. target_sample_rate (int): The target sample rate to use for the STT plugin. Defaults to 16000. eager_eot_threshold (float): Eager end-of-turn threshold. Defaults to 0.6. eot_threshold (float): End-of-turn threshold. Defaults to 0.8. eot_timeout_ms (int): End-of-turn timeout in milliseconds. Defaults to 7000. base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v2/listen". enable_preemptive_generation (bool): Enable preemptive generation based on EagerEndOfTurn events. Defaults to False. """ super().__init__() self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY") if not self.api_key: raise ValueError( "Deepgram API key must be provided either through api_key parameter or DEEPGRAM_API_KEY environment variable") self.model = model self.input_sample_rate = input_sample_rate self.target_sample_rate = target_sample_rate self.eager_eot_threshold = eager_eot_threshold self.eot_threshold=eot_threshold self.eot_timeout_ms = eot_timeout_ms self.base_url = base_url self.enable_preemptive_generation = enable_preemptive_generation self._stream_buffer = bytearray() self._target_chunk_size = int(0.1 * self.target_sample_rate * 2) self._min_chunk_size = int(0.05 * self.target_sample_rate * 2) self._session: Optional[aiohttp.ClientSession] = None self._ws: Optional[aiohttp.ClientWebSocketResponse] = None self._ws_task: Optional[asyncio.Task] = None self._last_transcript: str = "" self._ws_task = None async def process_audio( self, audio_frames: bytes, **kwargs: Any ) -> None: """Process audio frames and send to Deeepgram's Flux API""" if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: resampled_audio = self._resample_audio(audio_frames) if not resampled_audio: return self._stream_buffer.extend(resampled_audio) # chunk size 100ms while len(self._stream_buffer) >= self._target_chunk_size: chunk_to_send = bytes(self._stream_buffer[:self._target_chunk_size]) self._stream_buffer = self._stream_buffer[self._target_chunk_size:] await self._ws.send_bytes(bytes(chunk_to_send)) except Exception as e: logger.error(f"Error in process_audio: {str(e)}") self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = None async def _listen_for_responses(self) -> None: """Background task to listen for WebSocket responses""" if not self._ws: return try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: data = msg.json() responses = self._handle_ws_message(data) for response in responses: if self._transcript_callback: await self._transcript_callback(response) elif msg.type == aiohttp.WSMsgType.ERROR: logger.error(f"WebSocket error: {self._ws.exception()}") self.emit( "error", f"WebSocket error: {self._ws.exception()}") break except Exception as e: logger.error(f"Error in WebSocket listener: {str(e)}") self.emit("error", f"Error in WebSocket listener: {str(e)}") finally: if self._ws: await self._ws.close() self._ws = None async def _connect_ws(self) -> None: """Establish WebSocket connection with Deepgram's Streaming API""" if not self._session: self._session = aiohttp.ClientSession() query_params = { "model": self.model, "encoding": "linear16", "sample_rate": self.target_sample_rate, "eot_threshold": self.eot_threshold, "eot_timeout_ms": self.eot_timeout_ms, "eager_eot_threshold": self.eager_eot_threshold, } headers = {"Authorization": f"Token {self.api_key}"} ws_url = f"{self.base_url}?{urlencode(query_params)}" try: self._ws = await self._session.ws_connect(ws_url, headers=headers) logger.info("Connected to Deepgram V2 WebSocket.") except Exception as e: logger.error(f"Error connecting to WebSocket: {str(e)}") raise def _handle_ws_message(self, msg: dict) -> list[STTResponse]: """Handle incoming WebSocket messages and generate STT responses""" responses = [] try: if msg.get("type") != "TurnInfo": return responses event = msg.get("event") transcript = msg.get("transcript", "") # logger.info(f"{event} and {transcript}") start_time = msg.get("audio_window_start", 0.0) end_time = msg.get("audio_window_end", 0.0) confidence = msg.get("end_of_turn_confidence", 0.0) self._last_transcript = transcript # Emit turn-related events if event == "StartOfTurn": global_event_emitter.emit("speech_started") elif event == "EagerEndOfTurn": # Handle EagerEndOfTurn for preemptive generation if self.enable_preemptive_generation and transcript and self._transcript_callback: responses.append( STTResponse( event_type=SpeechEventType.PREFLIGHT, data=SpeechData( text=transcript, confidence=confidence, start_time=start_time, end_time=end_time, ), metadata={"model": self.model}, ) ) elif event == "EndOfTurn": logger.info(f"EndOfTurn (FINAL) Transcript: {transcript} and Confidence: {confidence}") global_event_emitter.emit("speech_stopped") if transcript and self._transcript_callback: responses.append( STTResponse( event_type=SpeechEventType.FINAL, data=SpeechData( text=transcript, confidence=confidence, start_time=start_time, end_time=end_time, ), metadata={"model": self.model}, ) ) elif event == "TurnResumed": # Send interim to signal user continued speaking if self.enable_preemptive_generation and transcript: responses.append( STTResponse( event_type=SpeechEventType.INTERIM, data=SpeechData( text=transcript, confidence=confidence, start_time=start_time, end_time=end_time, ), metadata={"model": self.model, "turn_resumed": True}, ) ) except Exception as e: logger.error(f"Error handling WebSocket message: {str(e)}") return responses def _resample_audio(self, audio_bytes: bytes) -> bytes: """Resample audio from input sample rate to target sample rate and convert to mono.""" try: if not audio_bytes: return b'' raw_audio = np.frombuffer(audio_bytes, dtype=np.int16) if raw_audio.size == 0: return b'' if raw_audio.size % 2 == 0: stereo_audio = raw_audio.reshape(-1, 2) mono_audio = stereo_audio.astype(np.float32).mean(axis=1) else: mono_audio = raw_audio.astype(np.float32) if self.input_sample_rate != self.target_sample_rate: target_length = int(len(mono_audio) * self.target_sample_rate / self.input_sample_rate) resampled_data = signal.resample(mono_audio, target_length) else: resampled_data = mono_audio resampled_data = np.clip(resampled_data, -32767, 32767) return resampled_data.astype(np.int16).tobytes() except Exception as e: logger.error(f"Error resampling audio: {e}") return b'' async def aclose(self) -> None: """Cleanup resources""" if len(self._stream_buffer) >= self._min_chunk_size and self._ws: try: final_chunk = bytes(self._stream_buffer) await self._ws.send_bytes(final_chunk) except Exception as e: logger.error(f"Error sending final audio: {e}") if self._ws: try: await self._ws.send_str(json.dumps({"type": "Terminate"})) await asyncio.sleep(0.5) except Exception as e: logger.error(f"Error sending termination: {e}") if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: await self._ws.close() self._ws = None if self._session: await self._session.close() self._session = None await super().aclose()Base class for Speech-to-Text implementations
Initialize the Deepgram STT plugin
Args
api_key:str | None, optional- Deepgram API key. Uses DEEPGRAM_API_KEY environment variable if not provided. Defaults to None.
model:str- The model to use for the STT plugin. Defaults to "flux-general-en".
input_sample_rate:int- The input sample rate to use for the STT plugin. Defaults to 48000.
target_sample_rate:int- The target sample rate to use for the STT plugin. Defaults to 16000.
eager_eot_threshold:float- Eager end-of-turn threshold. Defaults to 0.6.
eot_threshold:float- End-of-turn threshold. Defaults to 0.8.
eot_timeout_ms:int- End-of-turn timeout in milliseconds. Defaults to 7000.
base_url:str- The base URL to use for the STT plugin. Defaults to "wss://api.deepgram.com/v2/listen".
enable_preemptive_generation:bool- Enable preemptive generation based on EagerEndOfTurn events. Defaults to False.
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" if len(self._stream_buffer) >= self._min_chunk_size and self._ws: try: final_chunk = bytes(self._stream_buffer) await self._ws.send_bytes(final_chunk) except Exception as e: logger.error(f"Error sending final audio: {e}") if self._ws: try: await self._ws.send_str(json.dumps({"type": "Terminate"})) await asyncio.sleep(0.5) except Exception as e: logger.error(f"Error sending termination: {e}") if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: await self._ws.close() self._ws = None if self._session: await self._session.close() self._session = None await super().aclose()Cleanup resources
async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None-
Expand source code
async def process_audio( self, audio_frames: bytes, **kwargs: Any ) -> None: """Process audio frames and send to Deeepgram's Flux API""" if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: resampled_audio = self._resample_audio(audio_frames) if not resampled_audio: return self._stream_buffer.extend(resampled_audio) # chunk size 100ms while len(self._stream_buffer) >= self._target_chunk_size: chunk_to_send = bytes(self._stream_buffer[:self._target_chunk_size]) self._stream_buffer = self._stream_buffer[self._target_chunk_size:] await self._ws.send_bytes(bytes(chunk_to_send)) except Exception as e: logger.error(f"Error in process_audio: {str(e)}") self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = NoneProcess audio frames and send to Deeepgram's Flux API
class DeepgramTTS (*,
api_key: str | None = None,
model: str = 'aura-2-thalia-en',
encoding: str = 'linear16',
sample_rate: int = 24000,
base_url: str = 'wss://api.deepgram.com/v1/speak',
**kwargs: Any)-
Expand source code
class DeepgramTTS(TTS): def __init__( self, *, api_key: str | None = None, model: str = DEFAULT_MODEL, encoding: str = DEFAULT_ENCODING, sample_rate: int = DEEPGRAM_SAMPLE_RATE, base_url: str = API_BASE_URL, **kwargs: Any, ) -> None: super().__init__(sample_rate=sample_rate, num_channels=DEEPGRAM_CHANNELS) self.model = model self.encoding = encoding self.base_url = base_url self.audio_track = None self.loop = None self._ws_session: aiohttp.ClientSession | None = None self._ws_connection: aiohttp.ClientWebSocketResponse | None = None self._send_task: asyncio.Task | None = None self._recv_task: asyncio.Task | None = None self._should_stop = False self._first_chunk_sent = False self._connection_lock = asyncio.Lock() self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY") if not self.api_key: raise ValueError( "Deepgram API key must be provided either through the 'api_key' parameter or the DEEPGRAM_API_KEY environment variable." ) def reset_first_audio_tracking(self) -> None: self._first_chunk_sent = False async def _ensure_connection(self) -> None: async with self._connection_lock: if self._ws_connection and not self._ws_connection.closed: return params = { "model": self.model, "encoding": self.encoding, "sample_rate": self.sample_rate, } param_string = "&".join([f"{k}={v}" for k, v in params.items()]) full_ws_url = f"{self.base_url}?{param_string}" headers = {"Authorization": f"Token {self.api_key}"} self._ws_session = aiohttp.ClientSession() self._ws_connection = await asyncio.wait_for( self._ws_session.ws_connect(full_ws_url, headers=headers), timeout=50.0 ) if self._recv_task and not self._recv_task.done(): self._recv_task.cancel() self._recv_task = asyncio.create_task(self._receive_audio_task()) async def synthesize( self, text: AsyncIterator[str] | str, **kwargs: Any, ) -> None: try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._should_stop = True if self._send_task and not self._send_task.done(): self._send_task.cancel() self._should_stop = False await self._stream_synthesis(text) except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") raise async def _stream_synthesis(self, text: Union[AsyncIterator[str], str]) -> None: try: await self._ensure_connection() self._send_task = asyncio.create_task(self._send_text_task(text)) await self._send_task except Exception as e: self.emit("error", f"Streaming synthesis failed: {str(e)}") await self.aclose() finally: if self._send_task and not self._send_task.done(): self._send_task.cancel() self._send_task = None async def _send_text_task(self, text: Union[AsyncIterator[str], str]) -> None: if not self._ws_connection or self._ws_connection.closed: return try: if isinstance(text, str): if not self._should_stop: payload = {"type": "Speak", "text": text} await self._ws_connection.send_json(payload) else: async for chunk in text: if self._ws_connection.closed or self._should_stop: break payload = {"type": "Speak", "text": chunk} await self._ws_connection.send_json(payload) if not self._ws_connection.closed and not self._should_stop: await self._ws_connection.send_json({"type": "Flush"}) except asyncio.CancelledError: pass except Exception as e: if not self._should_stop: self.emit("error", f"Send task error: {str(e)}") async def _receive_audio_task(self) -> None: if not self._ws_connection: return try: while not self._ws_connection.closed: msg = await self._ws_connection.receive() if msg.type == aiohttp.WSMsgType.BINARY: if not self._should_stop: await self._stream_audio_chunks(msg.data) elif msg.type == aiohttp.WSMsgType.TEXT: data = json.loads(msg.data) if data.get('type') == 'Error' and not self._should_stop: self.emit("error", f"Deepgram error: {data.get('description', 'Unknown error')}") break elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING): break elif msg.type == aiohttp.WSMsgType.ERROR: raise ConnectionError(f"WebSocket error: {self._ws_connection.exception()}") except asyncio.CancelledError: pass except Exception as e: if not self._should_stop: self.emit("error", f"Receive task error: {str(e)}") async def _stream_audio_chunks(self, audio_bytes: bytes) -> None: if not audio_bytes or self._should_stop: return if self.audio_track and self.loop: await self.audio_track.add_new_bytes(audio_bytes) async def interrupt(self) -> None: self._should_stop = True if self.audio_track: self.audio_track.interrupt() if self._send_task and not self._send_task.done(): self._send_task.cancel() async def aclose(self) -> None: self._should_stop = True for task in [self._send_task, self._recv_task]: if task and not task.done(): task.cancel() if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close() if self._ws_session and not self._ws_session.closed: await self._ws_session.close() self._ws_connection = None self._ws_session = None await super().aclose()Base class for Text-to-Speech implementations
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: self._should_stop = True for task in [self._send_task, self._recv_task]: if task and not task.done(): task.cancel() if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close() if self._ws_session and not self._ws_session.closed: await self._ws_session.close() self._ws_connection = None self._ws_session = None await super().aclose()Cleanup resources
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: self._should_stop = True if self.audio_track: self.audio_track.interrupt() if self._send_task and not self._send_task.done(): self._send_task.cancel()Interrupt the TTS process
def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: self._first_chunk_sent = FalseReset the first audio tracking state for next TTS task
async def synthesize(self, text: AsyncIterator[str] | str, **kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[str] | str, **kwargs: Any, ) -> None: try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._should_stop = True if self._send_task and not self._send_task.done(): self._send_task.cancel() self._should_stop = False await self._stream_synthesis(text) except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") raiseConvert text to speech
Args
text- Text to convert to speech (either string or async iterator of strings)
voice_id- Optional voice identifier
**kwargs- Additional provider-specific arguments
Returns
None