Package videosdk.plugins.elevenlabs
Sub-modules
videosdk.plugins.elevenlabs.sttvideosdk.plugins.elevenlabs.tts
Classes
class ElevenLabsSTT (*,
api_key: str | None = None,
model_id: str = 'scribe_v2_realtime',
language_code: str = 'en',
sample_rate: int = 48000,
commit_strategy: str = 'vad',
vad_silence_threshold_secs: float = 0.8,
vad_threshold: float = 0.4,
min_speech_duration_ms: int = 50,
min_silence_duration_ms: int = 50,
base_url: str = 'wss://api.elevenlabs.io/v1/speech-to-text/realtime')-
Expand source code
class ElevenLabsSTT(BaseSTT): """ ElevenLabs Realtime Speech-to-Text (STT) client. """ def __init__( self, *, api_key: str | None = None, model_id: str = "scribe_v2_realtime", language_code: str = "en", sample_rate: int = 48000, commit_strategy: str = "vad", vad_silence_threshold_secs: float = 0.8, vad_threshold: float = 0.4, min_speech_duration_ms: int = 50, min_silence_duration_ms: int = 50, base_url: str = "wss://api.elevenlabs.io/v1/speech-to-text/realtime", ) -> None: """ Initialize the ElevenLabs STT client. Args: api_key: ElevenLabs API key for authentication. Defaults to env variable ELEVENLABS_API_KEY. model_id: STT model identifier. language_code: Language code for transcription. sample_rate: Sample rate of input audio in Hz. commit_strategy: Strategy for committing transcripts ('vad' is by default). vad_silence_threshold_secs: Duration of silence to detect end-of-speech. vad_threshold: Threshold for detecting voice activity. min_speech_duration_ms: Minimum duration in milliseconds for a speech segment. min_silence_duration_ms: Minimum duration in milliseconds of silence to consider end-of-speech. base_url: WebSocket endpoint for ElevenLabs STT. Raises: ValueError: If required parameters are missing or invalid. """ super().__init__() self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY") if not self.api_key: raise ValueError("ElevenLabs API key must be provided via api_key or ELEVENLABS_API_KEY env var") self.model_id = model_id self.language_code = language_code self.commit_strategy = commit_strategy self.base_url = base_url self.sample_rate = sample_rate if self.sample_rate not in SUPPORTED_SAMPLE_RATES: raise ValueError(f"Unsupported sample_rate: {self.sample_rate}. Supported rates: {SUPPORTED_SAMPLE_RATES}") self.vad_silence_threshold_secs = vad_silence_threshold_secs self.vad_threshold = vad_threshold self.min_speech_duration_ms = min_speech_duration_ms self.min_silence_duration_ms = min_silence_duration_ms self._last_final_text = "" self._last_final_time = 0.0 self._duplicate_suppression_window = 0.75 self._stream_buffer = bytearray() self._target_chunk_size = int(0.1 * self.sample_rate * 2) self.heartbeat = 15.0 self._session: Optional[aiohttp.ClientSession] = None self._ws: Optional[aiohttp.ClientWebSocketResponse] = None self._ws_task: Optional[asyncio.Task] = None async def process_audio( self, audio_frames: bytes, **kwargs: Any ) -> None: """ Process and send audio frames. Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead. """ if not self._ws or self._ws.closed: await self._connect_ws() if not self._ws_task or self._ws_task.done(): self._ws_task = asyncio.create_task(self._listen_for_responses()) elif self._ws_task and self._ws_task.done(): logger.warning("WebSocket listener stopped unexpectedly, restarting") self._ws_task = asyncio.create_task(self._listen_for_responses()) try: mono_audio = self._convert_to_mono(audio_frames) if not mono_audio: return self._stream_buffer.extend(mono_audio) while len(self._stream_buffer) >= self._target_chunk_size: chunk = self._stream_buffer[:self._target_chunk_size] await self._send_audio(chunk) self._stream_buffer = self._stream_buffer[self._target_chunk_size:] except Exception as e: logger.exception("Error in process_audio: %s", e) self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None async def _connect_ws(self) -> None: if not self._session: self._session = aiohttp.ClientSession() query_params = { "model_id": str(self.model_id), "language_code": str(self.language_code), "audio_format": f"pcm_{self.sample_rate}", "commit_strategy": str(self.commit_strategy), "vad_silence_threshold_secs": self.vad_silence_threshold_secs, "vad_threshold": self.vad_threshold, "min_speech_duration_ms": self.min_speech_duration_ms, "min_silence_duration_ms": self.min_silence_duration_ms, } ws_url = f"{self.base_url}?{urlencode(query_params)}" headers = {"xi-api-key": self.api_key} try: self._ws = await self._session.ws_connect(ws_url, headers=headers, heartbeat=self.heartbeat) logger.info("Connected to ElevenLabs Realtime STT WebSocket.") except Exception as e: logger.exception("Error connecting to ElevenLabs WebSocket: %s", e) raise async def _send_audio(self, audio_bytes: bytes) -> None: if not self._ws: return payload = { "message_type": "input_audio_chunk", "audio_base_64": base64.b64encode(audio_bytes).decode(), "sample_rate": self.sample_rate, } try: await self._ws.send_str(json.dumps(payload)) except Exception as e: logger.exception("Error sending audio chunk: %s", e) self.emit("error", str(e)) await self.aclose() def _convert_to_mono(self, audio_bytes: bytes) -> bytes: """ Convert input audio bytes to mono. """ if not audio_bytes: return b"" try: raw_audio = np.frombuffer(audio_bytes, dtype=np.int16) if raw_audio.size == 0: return b"" if raw_audio.size % 2 == 0: try: stereo = raw_audio.reshape(-1, 2).astype(np.float32) mono = stereo.mean(axis=1) return mono.astype(np.int16).tobytes() except ValueError: pass return audio_bytes except Exception as e: logger.error("Error converting to mono: %s", e) return b"" async def _listen_for_responses(self) -> None: """ Listen for incoming WebSocket messages from ElevenLabs STT. """ if not self._ws: return try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: data = None try: data = msg.json() except Exception: try: data = json.loads(msg.data) except Exception: logger.debug("Received non-json ws text message") continue responses = await self._handle_ws_event(data) if responses: for r in responses: if self._transcript_callback: try: await self._transcript_callback(r) except Exception: logger.exception("Error in transcript callback") elif msg.type == aiohttp.WSMsgType.ERROR: logger.error("WebSocket error: %s", self._ws.exception()) self.emit("error", f"WebSocket error: {self._ws.exception()}") break elif msg.type == aiohttp.WSMsgType.CLOSED: logger.info("WebSocket closed by server.") break except asyncio.CancelledError: logger.debug("WebSocket listener cancelled") except Exception as e: logger.exception("Error in WebSocket listener: %s", e) self.emit("error", str(e)) finally: if self._ws: try: await self._ws.close() except Exception: pass self._ws = None self._ws_task = None async def _handle_ws_event(self, data: dict) -> List[STTResponse]: """ Process a single WebSocket event from ElevenLabs STT. Args: data: JSON-decoded WebSocket message. Returns: List of STTResponse objects for this event. """ responses: List[STTResponse] = [] message_type = data.get("message_type") logger.debug("Received WS event: %s", message_type) if message_type in STT_ERROR_MSGS: logger.error("ElevenLabs STT error: %s", data) self.emit("error", data) return responses if message_type == "session_started": global_event_emitter.emit("speech_session_started") return responses if message_type == "committed_transcript": logger.info("==== Received final transcript event: %s", data) text = data.get("text", "") clean_text = text.strip() confidence = float(data.get("confidence", 0.0)) now = time.time() if clean_text == "": global_event_emitter.emit("speech_stopped") self._last_final_text = "" self._last_final_time = now return responses resp = STTResponse( event_type=SpeechEventType.FINAL, data=SpeechData( text=clean_text, confidence=confidence, ), metadata={"model": self.model_id, "raw_event": data}, ) responses.append(resp) global_event_emitter.emit("speech_stopped") self._last_final_text = clean_text self._last_final_time = now return responses if message_type == "partial_transcript": text = data.get("text", "") clean_text = text.strip() if ( self._last_final_text and clean_text and clean_text == self._last_final_text and (time.time() - self._last_final_time) < self._duplicate_suppression_window ): logger.debug("Dropping duplicate partial matching recent final transcript") return responses resp = STTResponse( event_type=SpeechEventType.INTERIM, data=SpeechData( text=text, confidence=float(data.get("confidence", 0.0)), ), metadata={"model": self.model_id, "raw_event": data}, ) responses.append(resp) if clean_text: global_event_emitter.emit("speech_started") return responses logger.debug("Ignoring unrecognized message_type: %s", message_type) return responses async def aclose(self) -> None: """ Close the WebSocket connection and cleanup session resources. Cancels the listener task, closes WebSocket and HTTP session, and calls the parent class cleanup. """ if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None if self._session: try: await self._session.close() except Exception: pass finally: self._session = None await super().aclose()ElevenLabs Realtime Speech-to-Text (STT) client.
Initialize the ElevenLabs STT client.
Args
api_key- ElevenLabs API key for authentication. Defaults to env variable ELEVENLABS_API_KEY.
model_id- STT model identifier.
language_code- Language code for transcription.
sample_rate- Sample rate of input audio in Hz.
commit_strategy- Strategy for committing transcripts ('vad' is by default).
vad_silence_threshold_secs- Duration of silence to detect end-of-speech.
vad_threshold- Threshold for detecting voice activity.
min_speech_duration_ms- Minimum duration in milliseconds for a speech segment.
min_silence_duration_ms- Minimum duration in milliseconds of silence to consider end-of-speech.
base_url- WebSocket endpoint for ElevenLabs STT.
Raises
ValueError- If required parameters are missing or invalid.
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """ Close the WebSocket connection and cleanup session resources. Cancels the listener task, closes WebSocket and HTTP session, and calls the parent class cleanup. """ if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None if self._session: try: await self._session.close() except Exception: pass finally: self._session = None await super().aclose()Close the WebSocket connection and cleanup session resources.
Cancels the listener task, closes WebSocket and HTTP session, and calls the parent class cleanup.
async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None-
Expand source code
async def process_audio( self, audio_frames: bytes, **kwargs: Any ) -> None: """ Process and send audio frames. Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead. """ if not self._ws or self._ws.closed: await self._connect_ws() if not self._ws_task or self._ws_task.done(): self._ws_task = asyncio.create_task(self._listen_for_responses()) elif self._ws_task and self._ws_task.done(): logger.warning("WebSocket listener stopped unexpectedly, restarting") self._ws_task = asyncio.create_task(self._listen_for_responses()) try: mono_audio = self._convert_to_mono(audio_frames) if not mono_audio: return self._stream_buffer.extend(mono_audio) while len(self._stream_buffer) >= self._target_chunk_size: chunk = self._stream_buffer[:self._target_chunk_size] await self._send_audio(chunk) self._stream_buffer = self._stream_buffer[self._target_chunk_size:] except Exception as e: logger.exception("Error in process_audio: %s", e) self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = NoneProcess and send audio frames. Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead.
class ElevenLabsTTS (*,
api_key: str | None = None,
model: str = 'eleven_flash_v2_5',
voice: str = 'EXAVITQu4vr4xnSDxMaL',
speed: float = 1.0,
response_format: str = 'pcm_24000',
voice_settings: VoiceSettings | None = None,
base_url: str = 'https://api.elevenlabs.io/v1',
enable_streaming: bool = True,
inactivity_timeout: int = 300)-
Expand source code
class ElevenLabsTTS(TTS): def __init__( self, *, api_key: str | None = None, model: str = DEFAULT_MODEL, voice: str = DEFAULT_VOICE_ID, speed: float = 1.0, response_format: str = "pcm_24000", voice_settings: VoiceSettings | None = None, base_url: str = API_BASE_URL, enable_streaming: bool = True, inactivity_timeout: int = WS_INACTIVITY_TIMEOUT, ) -> None: """Initialize the ElevenLabs TTS plugin. Args: api_key (Optional[str], optional): ElevenLabs API key. Uses ELEVENLABS_API_KEY environment variable if not provided. Defaults to None. model (str): The model to use for the TTS plugin. Defaults to "eleven_flash_v2_5". voice (str): The voice to use for the TTS plugin. Defaults to "EXAVITQu4vr4xnSDxMaL". speed (float): The speed to use for the TTS plugin. Defaults to 1.0. response_format (str): The response format to use for the TTS plugin. Defaults to "pcm_24000". voice_settings (Optional[VoiceSettings], optional): The voice settings to use for the TTS plugin. Defaults to None. base_url (str): The base URL to use for the TTS plugin. Defaults to "https://api.elevenlabs.io/v1". enable_streaming (bool): Whether to enable streaming for the TTS plugin. Defaults to True. inactivity_timeout (int): The inactivity timeout to use for the TTS plugin. Defaults to 300. """ super().__init__( sample_rate=ELEVENLABS_SAMPLE_RATE, num_channels=ELEVENLABS_CHANNELS ) self.model = model self.voice = voice self.speed = speed self.audio_track = None self.loop = None self.response_format = response_format self.base_url = base_url self.enable_streaming = enable_streaming self.voice_settings = voice_settings or VoiceSettings() self.inactivity_timeout = inactivity_timeout self._first_chunk_sent = False self._ws_session = None self._ws_connection = None self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY") if not self.api_key: raise ValueError( "ElevenLabs API key must be provided either through api_key parameter or ELEVENLABS_API_KEY environment variable") self._session = httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=30.0, write=5.0, pool=5.0), follow_redirects=True, ) self._streams = weakref.WeakSet() self._send_task: asyncio.Task | None = None self._recv_task: asyncio.Task | None = None self._should_stop = False self._connection_lock = asyncio.Lock() self._ws_voice_id: str | None = None self._active_contexts: set[str] = set() self._context_futures: dict[str, asyncio.Future[None]] = {} def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return target_voice = voice_id or self.voice self._should_stop = False if self.enable_streaming: await self._stream_synthesis(text, target_voice) else: if isinstance(text, AsyncIterator): async for segment in segment_text(text): if self._should_stop: break await self._chunked_synthesis(segment, target_voice) else: await self._chunked_synthesis(text, target_voice) except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") async def _chunked_synthesis(self, text: str, voice_id: str) -> None: """Non-streaming synthesis using the standard API""" url = f"{self.base_url}/text-to-speech/{voice_id}/stream" params = { "model_id": self.model, "output_format": self.response_format, } headers = { "xi-api-key": self.api_key, "Content-Type": "application/json", } payload = { "text": text, "voice_settings": { "stability": self.voice_settings.stability, "similarity_boost": self.voice_settings.similarity_boost, "style": self.voice_settings.style, "use_speaker_boost": self.voice_settings.use_speaker_boost, }, } try: async with self._session.stream( "POST", url, headers=headers, json=payload, params=params ) as response: response.raise_for_status() async for chunk in response.aiter_bytes(): if self._should_stop: break if chunk: await self._stream_audio_chunks(chunk) except httpx.HTTPStatusError as e: self.emit( "error", f"HTTP error {e.response.status_code}: {e.response.text}") except Exception as e: self.emit("error", f"Chunked synthesis failed: {str(e)}") async def _stream_synthesis(self, text: Union[AsyncIterator[str], str], voice_id: str) -> None: """WebSocket-based streaming synthesis using multi-context connection""" try: await self._ensure_connection(voice_id) context_id = uuid.uuid4().hex[:12] done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future() self.register_context(context_id, done_future) async def _single_chunk_gen(s: str) -> AsyncIterator[str]: yield s async def _send_chunks() -> None: try: first_message_sent = False if isinstance(text, str): async for segment in segment_text(_single_chunk_gen(text)): if self._should_stop: break await self.send_text(context_id, f"{segment} ", voice_settings=None if first_message_sent else self._voice_settings_dict(), flush=True) first_message_sent = True else: async for chunk in text: if self._should_stop: break await self.send_text(context_id, f"{chunk} ", voice_settings=None if first_message_sent else self._voice_settings_dict()) first_message_sent = True if not self._should_stop: await self.flush_context(context_id) await self.close_context(context_id) except Exception as e: if not done_future.done(): done_future.set_exception(e) sender = asyncio.create_task(_send_chunks()) await done_future await sender except Exception as e: self.emit("error", f"Streaming synthesis failed: {str(e)}") if isinstance(text, str): await self._chunked_synthesis(text, voice_id) else: async for segment in segment_text(text): if self._should_stop: break await self._chunked_synthesis(segment, voice_id) def _voice_settings_dict(self) -> dict[str, Any]: return { "stability": self.voice_settings.stability, "similarity_boost": self.voice_settings.similarity_boost, "style": self.voice_settings.style, "use_speaker_boost": self.voice_settings.use_speaker_boost, } async def _stream_audio_chunks(self, audio_bytes: bytes) -> None: if not audio_bytes or self._should_stop: return if not self._first_chunk_sent and hasattr(self, '_first_audio_callback') and self._first_audio_callback: self._first_chunk_sent = True asyncio.create_task(self._first_audio_callback()) if self.audio_track and self.loop: await self.audio_track.add_new_bytes(audio_bytes) async def interrupt(self) -> None: """Simple but effective interruption""" self._should_stop = True if self.audio_track: self.audio_track.interrupt() await self.close_all_contexts() async def aclose(self) -> None: """Cleanup resources""" self._should_stop = True for task in [self._send_task, self._recv_task]: if task and not task.done(): task.cancel() for stream in list(self._streams): try: await stream.aclose() except Exception: pass self._streams.clear() if self._ws_connection and not self._ws_connection.closed: try: await self._ws_connection.send_str(json.dumps({"close_socket": True})) except Exception: pass await self._ws_connection.close() if self._ws_session and not self._ws_session.closed: await self._ws_session.close() self._ws_connection = None self._ws_session = None if self._session: await self._session.aclose() await super().aclose() async def _ensure_connection(self, voice_id: str) -> None: async with self._connection_lock: if self._ws_connection and not self._ws_connection.closed and self._ws_voice_id == voice_id: return if self._ws_connection and not self._ws_connection.closed: try: await self._ws_connection.send_str(json.dumps({"close_socket": True})) except Exception: pass await self._ws_connection.close() if self._ws_session and not self._ws_session.closed: await self._ws_session.close() self._ws_session = aiohttp.ClientSession() self._ws_voice_id = voice_id ws_url = f"{self.base_url}/text-to-speech/{voice_id}/multi-stream-input".replace("https://", "wss://").replace("http://", "ws://") params = { "model_id": self.model, "output_format": self.response_format, "inactivity_timeout": self.inactivity_timeout, } param_string = "&".join([f"{k}={v}" for k, v in params.items()]) full_ws_url = f"{ws_url}?{param_string}" headers = {"xi-api-key": self.api_key} self._ws_connection = await asyncio.wait_for(self._ws_session.ws_connect(full_ws_url, headers=headers), timeout=10.0) if self._recv_task and not self._recv_task.done(): self._recv_task.cancel() self._recv_task = asyncio.create_task(self._recv_loop()) def register_context(self, context_id: str, done_future: asyncio.Future[None]) -> None: self._context_futures[context_id] = done_future async def send_text( self, context_id: str, text: str, *, voice_settings: Optional[dict[str, Any]] = None, flush: bool = False, ) -> None: if not self._ws_connection or self._ws_connection.closed: raise RuntimeError("WebSocket connection is closed") if context_id not in self._active_contexts: init_msg = { "context_id": context_id, "text": " ", } if voice_settings: init_msg["voice_settings"] = voice_settings await self._ws_connection.send_str(json.dumps(init_msg)) self._active_contexts.add(context_id) pkt: dict[str, Any] = {"context_id": context_id, "text": text} if flush: pkt["flush"] = True await self._ws_connection.send_str(json.dumps(pkt)) async def flush_context(self, context_id: str) -> None: if not self._ws_connection or self._ws_connection.closed: return await self._ws_connection.send_str(json.dumps({"context_id": context_id, "flush": True})) async def close_context(self, context_id: str) -> None: if not self._ws_connection or self._ws_connection.closed: return await self._ws_connection.send_str(json.dumps({"context_id": context_id, "close_context": True})) async def close_all_contexts(self) -> None: try: for context_id in list(self._active_contexts): await self.close_context(context_id) except Exception: pass async def _recv_loop(self) -> None: try: while self._ws_connection and not self._ws_connection.closed: msg = await self._ws_connection.receive() if msg.type == aiohttp.WSMsgType.TEXT: data = json.loads(msg.data) if data.get("error"): ctx_id = data.get("contextId") fut = self._context_futures.get(ctx_id) if fut and not fut.done(): fut.set_exception(RuntimeError(data["error"])) continue if data.get("audio"): audio_chunk = base64.b64decode(data["audio"]) if isinstance(data["audio"], str) else None if audio_chunk: if not self._first_chunk_sent and hasattr(self, '_first_audio_callback') and self._first_audio_callback: self._first_chunk_sent = True asyncio.create_task(self._first_audio_callback()) if self.audio_track: await self.audio_track.add_new_bytes(audio_chunk) if data.get("is_final") or data.get("isFinal"): ctx_id = data.get("contextId") if ctx_id: fut = self._context_futures.pop(ctx_id, None) self._active_contexts.discard(ctx_id) if fut and not fut.done(): fut.set_result(None) elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING): break except Exception: for fut in self._context_futures.values(): if not fut.done(): fut.set_exception(RuntimeError("WebSocket receive loop error")) self._context_futures.clear()Base class for Text-to-Speech implementations
Initialize the ElevenLabs TTS plugin.
Args
api_key:Optional[str], optional- ElevenLabs API key. Uses ELEVENLABS_API_KEY environment variable if not provided. Defaults to None.
model:str- The model to use for the TTS plugin. Defaults to "eleven_flash_v2_5".
voice:str- The voice to use for the TTS plugin. Defaults to "EXAVITQu4vr4xnSDxMaL".
speed:float- The speed to use for the TTS plugin. Defaults to 1.0.
response_format:str- The response format to use for the TTS plugin. Defaults to "pcm_24000".
voice_settings:Optional[VoiceSettings], optional- The voice settings to use for the TTS plugin. Defaults to None.
base_url:str- The base URL to use for the TTS plugin. Defaults to "https://api.elevenlabs.io/v1".
enable_streaming:bool- Whether to enable streaming for the TTS plugin. Defaults to True.
inactivity_timeout:int- The inactivity timeout to use for the TTS plugin. Defaults to 300.
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" self._should_stop = True for task in [self._send_task, self._recv_task]: if task and not task.done(): task.cancel() for stream in list(self._streams): try: await stream.aclose() except Exception: pass self._streams.clear() if self._ws_connection and not self._ws_connection.closed: try: await self._ws_connection.send_str(json.dumps({"close_socket": True})) except Exception: pass await self._ws_connection.close() if self._ws_session and not self._ws_session.closed: await self._ws_session.close() self._ws_connection = None self._ws_session = None if self._session: await self._session.aclose() await super().aclose()Cleanup resources
async def close_all_contexts(self) ‑> None-
Expand source code
async def close_all_contexts(self) -> None: try: for context_id in list(self._active_contexts): await self.close_context(context_id) except Exception: pass async def close_context(self, context_id: str) ‑> None-
Expand source code
async def close_context(self, context_id: str) -> None: if not self._ws_connection or self._ws_connection.closed: return await self._ws_connection.send_str(json.dumps({"context_id": context_id, "close_context": True})) async def flush_context(self, context_id: str) ‑> None-
Expand source code
async def flush_context(self, context_id: str) -> None: if not self._ws_connection or self._ws_connection.closed: return await self._ws_connection.send_str(json.dumps({"context_id": context_id, "flush": True})) async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Simple but effective interruption""" self._should_stop = True if self.audio_track: self.audio_track.interrupt() await self.close_all_contexts()Simple but effective interruption
def register_context(self, context_id: str, done_future: asyncio.Future[None]) ‑> None-
Expand source code
def register_context(self, context_id: str, done_future: asyncio.Future[None]) -> None: self._context_futures[context_id] = done_future def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = FalseReset the first audio tracking state for next TTS task
async def send_text(self,
context_id: str,
text: str,
*,
voice_settings: Optional[dict[str, Any]] = None,
flush: bool = False) ‑> None-
Expand source code
async def send_text( self, context_id: str, text: str, *, voice_settings: Optional[dict[str, Any]] = None, flush: bool = False, ) -> None: if not self._ws_connection or self._ws_connection.closed: raise RuntimeError("WebSocket connection is closed") if context_id not in self._active_contexts: init_msg = { "context_id": context_id, "text": " ", } if voice_settings: init_msg["voice_settings"] = voice_settings await self._ws_connection.send_str(json.dumps(init_msg)) self._active_contexts.add(context_id) pkt: dict[str, Any] = {"context_id": context_id, "text": text} if flush: pkt["flush"] = True await self._ws_connection.send_str(json.dumps(pkt)) async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return target_voice = voice_id or self.voice self._should_stop = False if self.enable_streaming: await self._stream_synthesis(text, target_voice) else: if isinstance(text, AsyncIterator): async for segment in segment_text(text): if self._should_stop: break await self._chunked_synthesis(segment, target_voice) else: await self._chunked_synthesis(text, target_voice) except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}")Convert text to speech
Args
text- Text to convert to speech (either string or async iterator of strings)
voice_id- Optional voice identifier
**kwargs- Additional provider-specific arguments
Returns
None
class VoiceSettings (stability: float = 0.71,
similarity_boost: float = 0.5,
style: float = 0.0,
use_speaker_boost: bool = True)-
Expand source code
@dataclass class VoiceSettings: stability: float = 0.71 similarity_boost: float = 0.5 style: float = 0.0 use_speaker_boost: bool = TrueVoiceSettings(stability: 'float' = 0.71, similarity_boost: 'float' = 0.5, style: 'float' = 0.0, use_speaker_boost: 'bool' = True)
Instance variables
var similarity_boost : floatvar stability : floatvar style : floatvar use_speaker_boost : bool