Package videosdk.plugins.elevenlabs
Sub-modules
videosdk.plugins.elevenlabs.sttvideosdk.plugins.elevenlabs.tts
Classes
class ElevenLabsSTT (*,
api_key: str | None = None,
model_id: str = 'scribe_v2_realtime',
language_code: str = 'en',
sample_rate: int = 48000,
commit_strategy: str = 'vad',
vad_silence_threshold_secs: float = 0.8,
vad_threshold: float = 0.4,
min_speech_duration_ms: int = 50,
min_silence_duration_ms: int = 50,
base_url: str = 'wss://api.elevenlabs.io/v1/speech-to-text/realtime',
detect_language: bool = False)-
Expand source code
class ElevenLabsSTT(BaseSTT): """ ElevenLabs Realtime Speech-to-Text (STT) client. """ def __init__( self, *, api_key: str | None = None, model_id: str = "scribe_v2_realtime", language_code: str = "en", sample_rate: int = 48000, commit_strategy: str = "vad", vad_silence_threshold_secs: float = 0.8, vad_threshold: float = 0.4, min_speech_duration_ms: int = 50, min_silence_duration_ms: int = 50, base_url: str = "wss://api.elevenlabs.io/v1/speech-to-text/realtime", detect_language:bool = False ) -> None: """ Initialize the ElevenLabs STT client. Args: api_key: ElevenLabs API key for authentication. Defaults to env variable ELEVENLABS_API_KEY. model_id: STT model identifier. language_code: Language code for transcription. sample_rate: Sample rate of input audio in Hz. commit_strategy: Strategy for committing transcripts ('vad' is by default). vad_silence_threshold_secs: Duration of silence to detect end-of-speech. vad_threshold: Threshold for detecting voice activity. min_speech_duration_ms: Minimum duration in milliseconds for a speech segment. min_silence_duration_ms: Minimum duration in milliseconds of silence to consider end-of-speech. base_url: WebSocket endpoint for ElevenLabs STT. Raises: ValueError: If required parameters are missing or invalid. """ super().__init__() self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY") if not self.api_key: raise ValueError("ElevenLabs API key must be provided via api_key or ELEVENLABS_API_KEY env var") self.model_id = model_id self.language_code = language_code self.commit_strategy = commit_strategy self.base_url = base_url self.sample_rate = sample_rate if self.sample_rate not in SUPPORTED_SAMPLE_RATES: raise ValueError(f"Unsupported sample_rate: {self.sample_rate}. Supported rates: {SUPPORTED_SAMPLE_RATES}") self.vad_silence_threshold_secs = vad_silence_threshold_secs self.vad_threshold = vad_threshold self.min_speech_duration_ms = min_speech_duration_ms self.min_silence_duration_ms = min_silence_duration_ms self.detect_language = detect_language self._last_final_text = "" self._last_final_time = 0.0 self._duplicate_suppression_window = 0.75 self._stream_buffer = bytearray() self._target_chunk_size = int(0.1 * self.sample_rate * 2) self.heartbeat = 15.0 self._session: Optional[aiohttp.ClientSession] = None self._ws: Optional[aiohttp.ClientWebSocketResponse] = None self._ws_task: Optional[asyncio.Task] = None async def process_audio( self, audio_frames: bytes, **kwargs: Any ) -> None: """ Process and send audio frames. Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead. """ if not self._ws or self._ws.closed: await self._connect_ws() if not self._ws_task or self._ws_task.done(): self._ws_task = asyncio.create_task(self._listen_for_responses()) elif self._ws_task and self._ws_task.done(): logger.warning("WebSocket listener stopped unexpectedly, restarting") self._ws_task = asyncio.create_task(self._listen_for_responses()) try: mono_audio = self._convert_to_mono(audio_frames) if not mono_audio: return self._stream_buffer.extend(mono_audio) while len(self._stream_buffer) >= self._target_chunk_size: chunk = self._stream_buffer[:self._target_chunk_size] await self._send_audio(chunk) self._stream_buffer = self._stream_buffer[self._target_chunk_size:] except Exception as e: logger.exception("Error in process_audio: %s", e) self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None async def flush(self) -> None: """Force ElevenLabs to commit the current audio buffer immediately.""" if not self._ws or self._ws.closed: return try: buf = bytes(self._stream_buffer) self._stream_buffer.clear() await self._send_audio(buf, commit=True) except Exception as e: logger.exception("Error flushing ElevenLabs STT: %s", e) async def _connect_ws(self) -> None: if not self._session: self._session = aiohttp.ClientSession() language_code = None if self.detect_language else str(self.language_code) query_params = { "model_id": str(self.model_id), "audio_format": f"pcm_{self.sample_rate}", "commit_strategy": str(self.commit_strategy), "vad_silence_threshold_secs": self.vad_silence_threshold_secs, "vad_threshold": self.vad_threshold, "min_speech_duration_ms": self.min_speech_duration_ms, "min_silence_duration_ms": self.min_silence_duration_ms, "include_timestamps": True, "include_language_detection":self.detect_language } if language_code is not None: query_params["language_code"] = language_code ws_url = f"{self.base_url}?{urlencode(query_params)}" headers = {"xi-api-key": self.api_key} try: self._ws = await self._session.ws_connect(ws_url, headers=headers, heartbeat=self.heartbeat) logger.info("Connected to ElevenLabs Realtime STT WebSocket.") except Exception as e: logger.exception("Error connecting to ElevenLabs WebSocket: %s", e) raise async def _send_audio(self, audio_bytes: bytes, commit: bool = False) -> None: if not self._ws: return payload = { "message_type": "input_audio_chunk", "audio_base_64": base64.b64encode(audio_bytes).decode(), "sample_rate": self.sample_rate, "commit": commit, } try: await self._ws.send_str(json.dumps(payload)) except Exception as e: logger.exception("Error sending audio chunk: %s", e) self.emit("error", str(e)) await self.aclose() def _convert_to_mono(self, audio_bytes: bytes) -> bytes: """ Convert input audio bytes to mono. """ if not audio_bytes: return b"" try: raw_audio = np.frombuffer(audio_bytes, dtype=np.int16) if raw_audio.size == 0: return b"" if raw_audio.size % 2 == 0: try: stereo = raw_audio.reshape(-1, 2).astype(np.float32) mono = stereo.mean(axis=1) return mono.astype(np.int16).tobytes() except ValueError: pass return audio_bytes except Exception as e: logger.error("Error converting to mono: %s", e) return b"" async def _listen_for_responses(self) -> None: """ Listen for incoming WebSocket messages from ElevenLabs STT. """ if not self._ws: return try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: data = None try: data = msg.json() except Exception: try: data = json.loads(msg.data) except Exception: logger.debug("Received non-json ws text message") continue responses = await self._handle_ws_event(data) if responses: for r in responses: if self._transcript_callback: try: await self._transcript_callback(r) except Exception: logger.exception("Error in transcript callback") elif msg.type == aiohttp.WSMsgType.ERROR: logger.error("WebSocket error: %s", self._ws.exception()) self.emit("error", f"WebSocket error: {self._ws.exception()}") break elif msg.type == aiohttp.WSMsgType.CLOSED: logger.info("WebSocket closed by server.") break except asyncio.CancelledError: logger.debug("WebSocket listener cancelled") except Exception as e: logger.exception("Error in WebSocket listener: %s", e) self.emit("error", str(e)) finally: if self._ws: try: await self._ws.close() except Exception: pass self._ws = None self._ws_task = None async def _handle_ws_event(self, data: dict) -> List[STTResponse]: """ Process a single WebSocket event from ElevenLabs STT. Args: data: JSON-decoded WebSocket message. Returns: List of STTResponse objects for this event. """ responses: List[STTResponse] = [] message_type = data.get("message_type") logger.debug("Received WS event: %s", message_type) if message_type in STT_ERROR_MSGS: logger.error("ElevenLabs STT error: %s", data) self.emit("error", data) return responses if message_type == "session_started": global_event_emitter.emit("speech_session_started") return responses if message_type == "committed_transcript_with_timestamps": logger.debug("==== Received final transcript event: %s", data) text = data.get("text", "") clean_text = text.strip() if self.detect_language: language_code = data.get("language_code", None) self.language_code = language_code avg_conf, duration = self.get_avg_confidence_and_duration(data) now = time.time() if clean_text == "": global_event_emitter.emit("speech_stopped") self._last_final_text = "" self._last_final_time = now return responses resp = STTResponse( event_type=SpeechEventType.FINAL, data=SpeechData( text=clean_text, confidence=avg_conf, duration=duration, ), metadata={"model": self.model_id, "raw_event": data}, ) responses.append(resp) global_event_emitter.emit("speech_stopped") self._last_final_text = clean_text self._last_final_time = now return responses if message_type == "partial_transcript": text = data.get("text", "") clean_text = text.strip() if ( self._last_final_text and clean_text and clean_text == self._last_final_text and (time.time() - self._last_final_time) < self._duplicate_suppression_window ): logger.debug("Dropping duplicate partial matching recent final transcript") return responses resp = STTResponse( event_type=SpeechEventType.INTERIM, data=SpeechData( text=text, confidence=float(data.get("confidence", 0.0)), ), metadata={"model": self.model_id, "raw_event": data}, ) responses.append(resp) if clean_text: global_event_emitter.emit("speech_started") return responses logger.debug("Ignoring unrecognized message_type: %s", message_type) return responses async def aclose(self) -> None: """ Close the WebSocket connection and cleanup session resources. Cancels the listener task, closes WebSocket and HTTP session, and calls the parent class cleanup. """ if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None if self._session: try: await self._session.close() except Exception: pass finally: self._session = None await super().aclose() def get_avg_confidence_and_duration(self,payload: Dict) -> Tuple[float, float]: """ Computes: - Average confidence from log probabilities - Total transcript duration from last word end timestamp Args: payload (dict): Transcript JSON message Returns: (avg_confidence, duration) """ words = payload.get("words") or [] total_confidence = 0.0 word_count = 0 last_end_time = 0.0 for w in words: if w.get("type") != "word": continue logprob = w.get("logprob") if logprob is not None: total_confidence += math.exp(logprob) word_count += 1 last_end_time = max(last_end_time, w.get("end", 0.0)) avg_confidence = total_confidence / word_count if word_count > 0 else 0.0 return avg_confidence, last_end_timeElevenLabs Realtime Speech-to-Text (STT) client.
Initialize the ElevenLabs STT client.
Args
api_key- ElevenLabs API key for authentication. Defaults to env variable ELEVENLABS_API_KEY.
model_id- STT model identifier.
language_code- Language code for transcription.
sample_rate- Sample rate of input audio in Hz.
commit_strategy- Strategy for committing transcripts ('vad' is by default).
vad_silence_threshold_secs- Duration of silence to detect end-of-speech.
vad_threshold- Threshold for detecting voice activity.
min_speech_duration_ms- Minimum duration in milliseconds for a speech segment.
min_silence_duration_ms- Minimum duration in milliseconds of silence to consider end-of-speech.
base_url- WebSocket endpoint for ElevenLabs STT.
Raises
ValueError- If required parameters are missing or invalid.
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """ Close the WebSocket connection and cleanup session resources. Cancels the listener task, closes WebSocket and HTTP session, and calls the parent class cleanup. """ if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None if self._session: try: await self._session.close() except Exception: pass finally: self._session = None await super().aclose()Close the WebSocket connection and cleanup session resources.
Cancels the listener task, closes WebSocket and HTTP session, and calls the parent class cleanup.
async def flush(self) ‑> None-
Expand source code
async def flush(self) -> None: """Force ElevenLabs to commit the current audio buffer immediately.""" if not self._ws or self._ws.closed: return try: buf = bytes(self._stream_buffer) self._stream_buffer.clear() await self._send_audio(buf, commit=True) except Exception as e: logger.exception("Error flushing ElevenLabs STT: %s", e)Force ElevenLabs to commit the current audio buffer immediately.
def get_avg_confidence_and_duration(self, payload: Dict) ‑> Tuple[float, float]-
Expand source code
def get_avg_confidence_and_duration(self,payload: Dict) -> Tuple[float, float]: """ Computes: - Average confidence from log probabilities - Total transcript duration from last word end timestamp Args: payload (dict): Transcript JSON message Returns: (avg_confidence, duration) """ words = payload.get("words") or [] total_confidence = 0.0 word_count = 0 last_end_time = 0.0 for w in words: if w.get("type") != "word": continue logprob = w.get("logprob") if logprob is not None: total_confidence += math.exp(logprob) word_count += 1 last_end_time = max(last_end_time, w.get("end", 0.0)) avg_confidence = total_confidence / word_count if word_count > 0 else 0.0 return avg_confidence, last_end_timeComputes: - Average confidence from log probabilities - Total transcript duration from last word end timestamp
Args
payload:dict- Transcript JSON message
Returns
(avg_confidence, duration)
async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None-
Expand source code
async def process_audio( self, audio_frames: bytes, **kwargs: Any ) -> None: """ Process and send audio frames. Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead. """ if not self._ws or self._ws.closed: await self._connect_ws() if not self._ws_task or self._ws_task.done(): self._ws_task = asyncio.create_task(self._listen_for_responses()) elif self._ws_task and self._ws_task.done(): logger.warning("WebSocket listener stopped unexpectedly, restarting") self._ws_task = asyncio.create_task(self._listen_for_responses()) try: mono_audio = self._convert_to_mono(audio_frames) if not mono_audio: return self._stream_buffer.extend(mono_audio) while len(self._stream_buffer) >= self._target_chunk_size: chunk = self._stream_buffer[:self._target_chunk_size] await self._send_audio(chunk) self._stream_buffer = self._stream_buffer[self._target_chunk_size:] except Exception as e: logger.exception("Error in process_audio: %s", e) self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = NoneProcess and send audio frames. Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead.
class ElevenLabsTTS (*,
api_key: str | None = None,
model: str = 'eleven_flash_v2_5',
voice: str = 'ErXwobaYiN019PkySvjV',
speed: float = 1.0,
response_format: str = 'pcm_24000',
voice_settings: VoiceSettings | None = None,
base_url: str = 'https://api.elevenlabs.io/v1',
enable_streaming: bool = True,
inactivity_timeout: int = 300,
language: str = 'en',
enable_ssml_parsing: bool = False,
apply_text_normalization: "Literal['auto', 'on', 'off']" = 'auto',
auto_mode: bool = True,
word_timestamps: bool = False)-
Expand source code
class ElevenLabsTTS(TTS): def __init__( self, *, api_key: str | None = None, model: str = DEFAULT_MODEL, voice: str = DEFAULT_VOICE_ID, speed: float = 1.0, response_format: str = "pcm_24000", voice_settings: VoiceSettings | None = None, base_url: str = API_BASE_URL, enable_streaming: bool = True, inactivity_timeout: int = WS_INACTIVITY_TIMEOUT, language: str = "en", enable_ssml_parsing: bool = False, apply_text_normalization: Literal["auto", "on", "off"] = "auto", auto_mode: bool = True, word_timestamps: bool = False, ) -> None: """Initialize the ElevenLabs TTS plugin. Args: api_key (Optional[str], optional): ElevenLabs API key. Uses ELEVENLABS_API_KEY environment variable if not provided. Defaults to None. model (str): The model to use for the TTS plugin. Defaults to "eleven_flash_v2_5". voice (str): The voice to use for the TTS plugin. Defaults to "EXAVITQu4vr4xnSDxMaL". speed (float): The speed to use for the TTS plugin. Defaults to 1.0. response_format (str): The response format to use for the TTS plugin. Defaults to "pcm_24000". voice_settings (Optional[VoiceSettings], optional): The voice settings to use for the TTS plugin. Defaults to None. base_url (str): The base URL to use for the TTS plugin. Defaults to "https://api.elevenlabs.io/v1". enable_streaming (bool): Whether to enable streaming for the TTS plugin. Defaults to True. inactivity_timeout (int): The inactivity timeout to use for the TTS plugin. Defaults to 300. language (str): The language to use for the TTS plugin. Defaults to "en". enable_ssml_parsing(bool): Whether to enable SSML parsing. apply_text_normalization:This parameter controls text normalization with three modes - "auto", "on", and "off". When set to auto, the system will automatically decide whether to apply text normalization (e.g., spelling out numbers) with "on", text normalization will always be applied, while with "off", it will be skipped. For "eleven_turbo_v2_5" and "eleven_flash_v2_5" models, text normalization can only be enabled with Enterprise plans. Defaults to "auto". auto_mode (bool): Reduces latency by disabling the server-side chunk_length_schedule buffer (default [120, 160, 250, 290] chars before first audio). Defaults to True because the upstream pipeline orchestrator already feeds TTS sentence-bounded chunks. Set to False only if you are streaming raw mid-sentence tokens directly and want ElevenLabs to buffer for higher prosody quality at the cost of TTFB. """ super().__init__( sample_rate=ELEVENLABS_SAMPLE_RATE, num_channels=ELEVENLABS_CHANNELS, word_timestamps=word_timestamps, ) self.model = model self.voice = voice self.speed = speed self.language = language self.enable_ssml_parsing = enable_ssml_parsing self.apply_text_normalization = apply_text_normalization self.auto_mode = auto_mode self.audio_track = None self.loop = None self.response_format = response_format self.base_url = _normalize_base_url(base_url) self.enable_streaming = enable_streaming self.voice_settings = voice_settings or VoiceSettings() self.inactivity_timeout = inactivity_timeout self._first_chunk_sent = False self._ws_session = None self._ws_connection = None self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY") if not self.api_key: raise ValueError( "ElevenLabs API key must be provided either through api_key parameter or ELEVENLABS_API_KEY environment variable") self._session = httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=30.0, write=5.0, pool=5.0), follow_redirects=True, ) self._streams = weakref.WeakSet() self._send_task: asyncio.Task | None = None self._recv_task: asyncio.Task | None = None self._should_stop = False self._connection_lock = asyncio.Lock() self._ws_voice_id: str | None = None self._active_contexts: set[str] = set() self._context_futures: dict[str, asyncio.Future[None]] = {} self._audio_start_time: Optional[float] = None self._spoken_words: list[str] = [] self._word_schedule_tasks: list[asyncio.Task] = [] self._last_scheduled_start_sec: float = -1.0 self._pending_word_chars: list[str] = [] self._pending_word_start_sec: Optional[float] = None def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False self._audio_start_time = None self._spoken_words = [] for task in self._word_schedule_tasks: if not task.done(): task.cancel() self._word_schedule_tasks = [] self._last_scheduled_start_sec = -1.0 self._pending_word_chars = [] self._pending_word_start_sec = None async def prewarm(self) -> None: """Pre-establish the ElevenLabs WebSocket so the first ``synthesize()`` call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly.""" if not self.enable_streaming: return try: await self._ensure_connection(self.voice) except Exception as e: self.emit("error", f"ElevenLabs prewarm failed: {e}") async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return target_voice = voice_id or self.voice self._should_stop = False if self.enable_streaming: await self._stream_synthesis(text, target_voice) else: if isinstance(text, AsyncIterator): async for segment in segment_text(text): if self._should_stop: break await self._chunked_synthesis(segment, target_voice) else: await self._chunked_synthesis(text, target_voice) except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") raise async def _chunked_synthesis(self, text: str, voice_id: str) -> None: """Non-streaming synthesis using the standard API""" url = f"{self.base_url}/text-to-speech/{voice_id}/stream" params = { "model_id": self.model, "output_format": self.response_format, "language_code": self.language, "enable_ssml_parsing":self.enable_ssml_parsing, "apply_text_normalization":self.apply_text_normalization, "auto_mode":self.auto_mode } headers = { "xi-api-key": self.api_key, "Content-Type": "application/json", } payload = { "text": text, "voice_settings": { "stability": self.voice_settings.stability, "similarity_boost": self.voice_settings.similarity_boost, "style": self.voice_settings.style, "use_speaker_boost": self.voice_settings.use_speaker_boost, }, } try: async with self._session.stream( "POST", url, headers=headers, json=payload, params=params ) as response: response.raise_for_status() async for chunk in response.aiter_bytes(): if self._should_stop: break if chunk: await self._stream_audio_chunks(chunk) except httpx.HTTPStatusError as e: self.emit( "error", f"HTTP error {e.response.status_code}: {e.response.text}") raise except Exception as e: self.emit("error", f"Chunked synthesis failed: {str(e)}") raise async def _stream_synthesis(self, text: Union[AsyncIterator[str], str], voice_id: str) -> None: """WebSocket-based streaming synthesis using multi-context connection. Forwards text verbatim into a single context. The server buffers and normalises (currency, decimals, mixed-script numbers); client-side re-segmentation here would split tokens like ``₹50,000`` and break prosody, so we deliberately don't do it. One trailing ``flush_context`` + ``close_context`` finalises the turn. """ try: await self._ensure_connection(voice_id) context_id = uuid.uuid4().hex[:12] done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future() self.register_context(context_id, done_future) flush_on_chunk = bool(self.auto_mode) async def _send_chunks() -> None: try: first_message_sent = False if isinstance(text, str): if text: await self.send_text( context_id, f"{text} ", voice_settings=self._voice_settings_dict(), flush=flush_on_chunk, ) first_message_sent = True else: async for chunk in text: if self._should_stop: break if isinstance(chunk, FlushMarker): await self.flush_context(context_id) continue if not chunk: continue await self.send_text( context_id, f"{chunk} ", voice_settings=None if first_message_sent else self._voice_settings_dict(), flush=flush_on_chunk, ) first_message_sent = True if not self._should_stop: await self.flush_context(context_id) await self.close_context(context_id) except Exception as e: if not done_future.done(): done_future.set_exception(e) sender = asyncio.create_task(_send_chunks()) try: await done_future finally: if not sender.done(): sender.cancel() try: await sender except (asyncio.CancelledError, Exception): pass except asyncio.CancelledError: raise except Exception as e: self.emit("error", f"Streaming synthesis failed: {str(e)}") raise def _voice_settings_dict(self) -> dict[str, Any]: return { "stability": self.voice_settings.stability, "similarity_boost": self.voice_settings.similarity_boost, "style": self.voice_settings.style, "use_speaker_boost": self.voice_settings.use_speaker_boost, } async def _stream_audio_chunks(self, audio_bytes: bytes) -> None: if not audio_bytes or self._should_stop: return if not self._first_chunk_sent: self._first_chunk_sent = True self._audio_start_time = asyncio.get_event_loop().time() if hasattr(self, '_first_audio_callback') and self._first_audio_callback: asyncio.create_task(self._first_audio_callback()) if self.audio_track and self.loop: await self.audio_track.add_new_bytes(audio_bytes) async def _schedule_word_emit(self, word: str, start_sec: float) -> None: """Emit a ``word_spoken`` event at the moment its audio begins to play.""" while self._audio_start_time is None and not self._should_stop: await asyncio.sleep(0.01) if self._should_stop or self._audio_start_time is None: return target_time = self._audio_start_time + float(start_sec) now = asyncio.get_event_loop().time() delay = target_time - now if delay > 0: try: await asyncio.sleep(delay) except asyncio.CancelledError: return if self._should_stop: return self._spoken_words.append(word) cumulative = " ".join(self._spoken_words) try: self.emit("word_spoken", {"word": word, "cumulative_text": cumulative}) except Exception: pass def _schedule_words_from_alignment(self, alignment: dict) -> None: """Parse an ElevenLabs alignment payload and schedule per-word emits. ElevenLabs returns character-level alignment (``chars`` / ``charStartTimesMs``). We aggregate consecutive non-whitespace chars into whole words, then schedule each word at its first-char start time so the UI renders word-by-word synced with audio playback. """ if not alignment: return chars = alignment.get("chars") or alignment.get("characters") or [] if "charStartTimesMs" in alignment: starts_sec = [float(t) / 1000.0 for t in alignment["charStartTimesMs"]] elif "character_start_times_seconds" in alignment: starts_sec = [float(t) for t in alignment["character_start_times_seconds"]] else: return for ch, start_sec in zip(chars, starts_sec): if ch.isspace(): self._flush_pending_word() else: if not self._pending_word_chars: self._pending_word_start_sec = start_sec self._pending_word_chars.append(ch) def _flush_pending_word(self) -> None: """Schedule emission of any word currently being accumulated.""" if not self._pending_word_chars or self._pending_word_start_sec is None: self._pending_word_chars = [] self._pending_word_start_sec = None return word = "".join(self._pending_word_chars) start_sec = self._pending_word_start_sec self._pending_word_chars = [] self._pending_word_start_sec = None if start_sec <= self._last_scheduled_start_sec: return self._last_scheduled_start_sec = start_sec self._word_schedule_tasks.append( asyncio.create_task(self._schedule_word_emit(word, start_sec)) ) async def interrupt(self) -> None: """Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; tells ElevenLabs to stop generating via ``close_context`` (saves server compute), and drops any in-flight audio chunks client-side via context_id filtering in ``_recv_loop``.""" self._should_stop = True for task in self._word_schedule_tasks: if not task.done(): task.cancel() self._word_schedule_tasks = [] if self.audio_track: self.audio_track.interrupt() await self.close_all_contexts() for fut in list(self._context_futures.values()): if not fut.done(): fut.cancel() async def aclose(self) -> None: """Cleanup resources""" self._should_stop = True for task in [self._send_task, self._recv_task]: if task and not task.done(): task.cancel() for stream in list(self._streams): try: await stream.aclose() except Exception: pass self._streams.clear() if self._ws_connection and not self._ws_connection.closed: try: await self._ws_connection.send_str(json.dumps({"close_socket": True})) except Exception: pass await self._ws_connection.close() if self._ws_session and not self._ws_session.closed: await self._ws_session.close() self._ws_connection = None self._ws_session = None if self._session: await self._session.aclose() await super().aclose() async def _ensure_connection(self, voice_id: str) -> None: async with self._connection_lock: if self._ws_connection and not self._ws_connection.closed and self._ws_voice_id == voice_id: return if self._ws_connection and not self._ws_connection.closed: try: await self._ws_connection.send_str(json.dumps({"close_socket": True})) except Exception: pass await self._ws_connection.close() if self._ws_session and not self._ws_session.closed: await self._ws_session.close() self._ws_session = aiohttp.ClientSession() self._ws_voice_id = voice_id ws_url = f"{self.base_url}/text-to-speech/{voice_id}/multi-stream-input".replace("https://", "wss://").replace("http://", "ws://") params = { "model_id": self.model, "output_format": self.response_format, "inactivity_timeout": self.inactivity_timeout, "language_code": self.language, "enable_ssml_parsing": str(self.enable_ssml_parsing).lower(), "apply_text_normalization": self.apply_text_normalization, "auto_mode": str(self.auto_mode).lower(), } param_string = "&".join([f"{k}={v}" for k, v in params.items()]) full_ws_url = f"{ws_url}?{param_string}" headers = {"xi-api-key": self.api_key} self._ws_connection = await asyncio.wait_for(self._ws_session.ws_connect(full_ws_url, headers=headers), timeout=10.0) if self._recv_task and not self._recv_task.done(): self._recv_task.cancel() self._recv_task = asyncio.create_task(self._recv_loop()) def register_context(self, context_id: str, done_future: asyncio.Future[None]) -> None: self._context_futures[context_id] = done_future async def send_text( self, context_id: str, text: str, *, voice_settings: Optional[dict[str, Any]] = None, flush: bool = False, ) -> None: if not self._ws_connection or self._ws_connection.closed: raise RuntimeError("WebSocket connection is closed") if context_id not in self._active_contexts: init_msg = { "context_id": context_id, "text": " ", } if voice_settings: init_msg["voice_settings"] = voice_settings await self._ws_connection.send_str(json.dumps(init_msg)) self._active_contexts.add(context_id) pkt: dict[str, Any] = {"context_id": context_id, "text": text} if flush: pkt["flush"] = True await self._ws_connection.send_str(json.dumps(pkt)) async def flush_context(self, context_id: str) -> None: if not self._ws_connection or self._ws_connection.closed: return await self._ws_connection.send_str(json.dumps({"context_id": context_id, "flush": True})) async def close_context(self, context_id: str) -> None: if not self._ws_connection or self._ws_connection.closed: return await self._ws_connection.send_str(json.dumps({"context_id": context_id, "close_context": True})) async def close_all_contexts(self) -> None: try: for context_id in list(self._active_contexts): await self.close_context(context_id) except Exception: pass async def _recv_loop(self) -> None: try: while self._ws_connection and not self._ws_connection.closed: msg = await self._ws_connection.receive() if msg.type == aiohttp.WSMsgType.TEXT: data = json.loads(msg.data) if data.get("error"): ctx_id = data.get("contextId") fut = self._context_futures.get(ctx_id) if fut and not fut.done(): fut.set_exception(RuntimeError(data["error"])) continue ctx_id = data.get("contextId") fut = self._context_futures.get(ctx_id) if ctx_id else None ctx_alive = fut is not None and not fut.done() if data.get("audio") and ctx_alive: audio_chunk = base64.b64decode(data["audio"]) if isinstance(data["audio"], str) else None if audio_chunk: if not self._first_chunk_sent: self._first_chunk_sent = True self._audio_start_time = asyncio.get_event_loop().time() if hasattr(self, '_first_audio_callback') and self._first_audio_callback: asyncio.create_task(self._first_audio_callback()) if self.audio_track: await self.audio_track.add_new_bytes(audio_chunk) if self.supports_word_timestamps and ctx_alive: alignment = data.get("alignment") or data.get("normalizedAlignment") if alignment: self._schedule_words_from_alignment(alignment) if data.get("is_final") or data.get("isFinal"): if self.supports_word_timestamps: self._flush_pending_word() ctx_id = data.get("contextId") if ctx_id: fut = self._context_futures.pop(ctx_id, None) self._active_contexts.discard(ctx_id) if fut and not fut.done(): fut.set_result(None) elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING): break except Exception: for fut in self._context_futures.values(): if not fut.done(): fut.set_exception(RuntimeError("WebSocket receive loop error")) self._context_futures.clear()Base class for Text-to-Speech implementations
Initialize the ElevenLabs TTS plugin.
Args
api_key:Optional[str], optional- ElevenLabs API key. Uses ELEVENLABS_API_KEY environment variable if not provided. Defaults to None.
model:str- The model to use for the TTS plugin. Defaults to "eleven_flash_v2_5".
voice:str- The voice to use for the TTS plugin. Defaults to "EXAVITQu4vr4xnSDxMaL".
speed:float- The speed to use for the TTS plugin. Defaults to 1.0.
response_format:str- The response format to use for the TTS plugin. Defaults to "pcm_24000".
voice_settings:Optional[VoiceSettings], optional- The voice settings to use for the TTS plugin. Defaults to None.
base_url:str- The base URL to use for the TTS plugin. Defaults to "https://api.elevenlabs.io/v1".
enable_streaming:bool- Whether to enable streaming for the TTS plugin. Defaults to True.
inactivity_timeout:int- The inactivity timeout to use for the TTS plugin. Defaults to 300.
language:str- The language to use for the TTS plugin. Defaults to "en".
- enable_ssml_parsing(bool): Whether to enable SSML parsing.
- apply_text_normalization:This parameter controls text normalization with three modes - "auto", "on", and "off".
- When set to auto, the system will automatically decide whether to apply text normalization (e.g., spelling out numbers)
- with "on", text normalization will always be applied, while with "off", it will be skipped.
- For "eleven_turbo_v2_5" and "eleven_flash_v2_5" models, text normalization can only be enabled with Enterprise plans. Defaults to "auto".
auto_mode:bool- Reduces latency by disabling the server-side chunk_length_schedule buffer (default [120, 160, 250, 290] chars before first audio). Defaults to True because the upstream pipeline orchestrator already feeds TTS sentence-bounded chunks. Set to False only if you are streaming raw mid-sentence tokens directly and want ElevenLabs to buffer for higher prosody quality at the cost of TTFB.
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" self._should_stop = True for task in [self._send_task, self._recv_task]: if task and not task.done(): task.cancel() for stream in list(self._streams): try: await stream.aclose() except Exception: pass self._streams.clear() if self._ws_connection and not self._ws_connection.closed: try: await self._ws_connection.send_str(json.dumps({"close_socket": True})) except Exception: pass await self._ws_connection.close() if self._ws_session and not self._ws_session.closed: await self._ws_session.close() self._ws_connection = None self._ws_session = None if self._session: await self._session.aclose() await super().aclose()Cleanup resources
async def close_all_contexts(self) ‑> None-
Expand source code
async def close_all_contexts(self) -> None: try: for context_id in list(self._active_contexts): await self.close_context(context_id) except Exception: pass async def close_context(self, context_id: str) ‑> None-
Expand source code
async def close_context(self, context_id: str) -> None: if not self._ws_connection or self._ws_connection.closed: return await self._ws_connection.send_str(json.dumps({"context_id": context_id, "close_context": True})) async def flush_context(self, context_id: str) ‑> None-
Expand source code
async def flush_context(self, context_id: str) -> None: if not self._ws_connection or self._ws_connection.closed: return await self._ws_connection.send_str(json.dumps({"context_id": context_id, "flush": True})) async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; tells ElevenLabs to stop generating via ``close_context`` (saves server compute), and drops any in-flight audio chunks client-side via context_id filtering in ``_recv_loop``.""" self._should_stop = True for task in self._word_schedule_tasks: if not task.done(): task.cancel() self._word_schedule_tasks = [] if self.audio_track: self.audio_track.interrupt() await self.close_all_contexts() for fut in list(self._context_futures.values()): if not fut.done(): fut.cancel()Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; tells ElevenLabs to stop generating via
close_context(saves server compute), and drops any in-flight audio chunks client-side via context_id filtering in_recv_loop. async def prewarm(self) ‑> None-
Expand source code
async def prewarm(self) -> None: """Pre-establish the ElevenLabs WebSocket so the first ``synthesize()`` call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly.""" if not self.enable_streaming: return try: await self._ensure_connection(self.voice) except Exception as e: self.emit("error", f"ElevenLabs prewarm failed: {e}")Pre-establish the ElevenLabs WebSocket so the first
synthesize()call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly. def register_context(self, context_id: str, done_future: asyncio.Future[None]) ‑> None-
Expand source code
def register_context(self, context_id: str, done_future: asyncio.Future[None]) -> None: self._context_futures[context_id] = done_future def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False self._audio_start_time = None self._spoken_words = [] for task in self._word_schedule_tasks: if not task.done(): task.cancel() self._word_schedule_tasks = [] self._last_scheduled_start_sec = -1.0 self._pending_word_chars = [] self._pending_word_start_sec = NoneReset the first audio tracking state for next TTS task
async def send_text(self,
context_id: str,
text: str,
*,
voice_settings: Optional[dict[str, Any]] = None,
flush: bool = False) ‑> None-
Expand source code
async def send_text( self, context_id: str, text: str, *, voice_settings: Optional[dict[str, Any]] = None, flush: bool = False, ) -> None: if not self._ws_connection or self._ws_connection.closed: raise RuntimeError("WebSocket connection is closed") if context_id not in self._active_contexts: init_msg = { "context_id": context_id, "text": " ", } if voice_settings: init_msg["voice_settings"] = voice_settings await self._ws_connection.send_str(json.dumps(init_msg)) self._active_contexts.add(context_id) pkt: dict[str, Any] = {"context_id": context_id, "text": text} if flush: pkt["flush"] = True await self._ws_connection.send_str(json.dumps(pkt)) async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return target_voice = voice_id or self.voice self._should_stop = False if self.enable_streaming: await self._stream_synthesis(text, target_voice) else: if isinstance(text, AsyncIterator): async for segment in segment_text(text): if self._should_stop: break await self._chunked_synthesis(segment, target_voice) else: await self._chunked_synthesis(text, target_voice) except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") raiseConvert text to speech
Args
text- Text to convert to speech. Either a plain string or an async
iterator that may yield
strchunks andFlushMarkersegment-boundary markers. Plugins that don't support per-segment flushing should drop the markers with an inlineisinstancecheck (or rely onsegment_textwhich already drops them). voice_id- Optional voice identifier
**kwargs- Additional provider-specific arguments
Returns
None
class VoiceSettings (stability: float = 0.71,
similarity_boost: float = 0.5,
style: float = 0.0,
use_speaker_boost: bool = True)-
Expand source code
@dataclass class VoiceSettings: stability: float = 0.71 similarity_boost: float = 0.5 style: float = 0.0 use_speaker_boost: bool = TrueVoiceSettings(stability: 'float' = 0.71, similarity_boost: 'float' = 0.5, style: 'float' = 0.0, use_speaker_boost: 'bool' = True)
Instance variables
var similarity_boost : floatvar stability : floatvar style : floatvar use_speaker_boost : bool