Package videosdk.plugins.elevenlabs

Sub-modules

videosdk.plugins.elevenlabs.stt
videosdk.plugins.elevenlabs.tts

Classes

class ElevenLabsSTT (*,
api_key: str | None = None,
model_id: str = 'scribe_v2_realtime',
language_code: str = 'en',
sample_rate: int = 48000,
commit_strategy: str = 'vad',
vad_silence_threshold_secs: float = 0.8,
vad_threshold: float = 0.4,
min_speech_duration_ms: int = 50,
min_silence_duration_ms: int = 50,
base_url: str = 'wss://api.elevenlabs.io/v1/speech-to-text/realtime',
detect_language: bool = False)
Expand source code
class ElevenLabsSTT(BaseSTT):
    """
    ElevenLabs Realtime Speech-to-Text (STT) client.
    """

    def __init__(
        self,
        *,
        api_key: str | None = None,
        model_id: str = "scribe_v2_realtime",
        language_code: str = "en",
        sample_rate: int = 48000,
        commit_strategy: str = "vad",
        vad_silence_threshold_secs: float = 0.8,
        vad_threshold: float = 0.4,
        min_speech_duration_ms: int = 50,
        min_silence_duration_ms: int = 50,
        base_url: str = "wss://api.elevenlabs.io/v1/speech-to-text/realtime",
        detect_language:bool = False
    ) -> None:
        """
        Initialize the ElevenLabs STT client.

        Args:
            api_key: ElevenLabs API key for authentication. Defaults to env variable ELEVENLABS_API_KEY.
            model_id: STT model identifier.
            language_code: Language code for transcription.
            sample_rate: Sample rate of input audio in Hz.
            commit_strategy: Strategy for committing transcripts ('vad' is by default).
            vad_silence_threshold_secs: Duration of silence to detect end-of-speech.
            vad_threshold: Threshold for detecting voice activity.
            min_speech_duration_ms: Minimum duration in milliseconds for a speech segment.
            min_silence_duration_ms: Minimum duration in milliseconds of silence to consider end-of-speech.
            base_url: WebSocket endpoint for ElevenLabs STT.
        Raises:
            ValueError: If required parameters are missing or invalid.
        """
        super().__init__()

        self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY")
        if not self.api_key:
            raise ValueError("ElevenLabs API key must be provided via api_key or ELEVENLABS_API_KEY env var")

        self.model_id = model_id
        self.language_code = language_code
        self.commit_strategy = commit_strategy
        self.base_url = base_url
        self.sample_rate = sample_rate

        if self.sample_rate not in SUPPORTED_SAMPLE_RATES:
            raise ValueError(f"Unsupported sample_rate: {self.sample_rate}. Supported rates: {SUPPORTED_SAMPLE_RATES}")
        
        self.vad_silence_threshold_secs = vad_silence_threshold_secs
        self.vad_threshold = vad_threshold
        self.min_speech_duration_ms = min_speech_duration_ms
        self.min_silence_duration_ms = min_silence_duration_ms
        self.detect_language = detect_language

        self._last_final_text = ""
        self._last_final_time = 0.0
        self._duplicate_suppression_window = 0.75
        
        self._stream_buffer = bytearray()
        self._target_chunk_size = int(0.1 * self.sample_rate * 2) 

        self.heartbeat = 15.0
        self._session: Optional[aiohttp.ClientSession] = None
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self._ws_task: Optional[asyncio.Task] = None

    async def process_audio(
        self, 
        audio_frames: bytes, 
        **kwargs: Any
    ) -> None:
        """
        Process and send audio frames. 
        Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead.
        """

        if not self._ws or self._ws.closed:
            await self._connect_ws()
            if not self._ws_task or self._ws_task.done():
                self._ws_task = asyncio.create_task(self._listen_for_responses())

        elif self._ws_task and self._ws_task.done():
            logger.warning("WebSocket listener stopped unexpectedly, restarting")
            self._ws_task = asyncio.create_task(self._listen_for_responses())

        try:
            mono_audio = self._convert_to_mono(audio_frames)
            if not mono_audio:
                return
            
            self._stream_buffer.extend(mono_audio)
            
            while len(self._stream_buffer) >= self._target_chunk_size:
                chunk = self._stream_buffer[:self._target_chunk_size]
                await self._send_audio(chunk)
                self._stream_buffer = self._stream_buffer[self._target_chunk_size:]

        except Exception as e:
            logger.exception("Error in process_audio: %s", e)
            self.emit("error", str(e))
            if self._ws:
                await self._ws.close()
                self._ws = None

    async def _connect_ws(self) -> None:
        if not self._session:
            self._session = aiohttp.ClientSession()

        language_code = None if self.detect_language else str(self.language_code)
        query_params = {
            "model_id": str(self.model_id),
            "audio_format": f"pcm_{self.sample_rate}",
            "commit_strategy": str(self.commit_strategy),
            "vad_silence_threshold_secs": self.vad_silence_threshold_secs,
            "vad_threshold": self.vad_threshold,
            "min_speech_duration_ms": self.min_speech_duration_ms,
            "min_silence_duration_ms": self.min_silence_duration_ms,
            "include_timestamps": True,
            "include_language_detection":self.detect_language
        }

        if language_code is not None:
            query_params["language_code"] = language_code

        ws_url = f"{self.base_url}?{urlencode(query_params)}"
        headers = {"xi-api-key": self.api_key}

        try:
            self._ws = await self._session.ws_connect(ws_url, headers=headers, heartbeat=self.heartbeat)
            logger.info("Connected to ElevenLabs Realtime STT WebSocket.")
        except Exception as e:
            logger.exception("Error connecting to ElevenLabs WebSocket: %s", e)
            raise

    async def _send_audio(self, audio_bytes: bytes) -> None:
        if not self._ws:
            return

        payload = {
            "message_type": "input_audio_chunk",
            "audio_base_64": base64.b64encode(audio_bytes).decode(),
            "sample_rate": self.sample_rate,
        }

        try:
            await self._ws.send_str(json.dumps(payload))
        except Exception as e:
            logger.exception("Error sending audio chunk: %s", e)
            self.emit("error", str(e))
            await self.aclose()

    def _convert_to_mono(self, audio_bytes: bytes) -> bytes:
        """
        Convert input audio bytes to mono. 
        """
        if not audio_bytes:
            return b""
        try:
            raw_audio = np.frombuffer(audio_bytes, dtype=np.int16)
            if raw_audio.size == 0:
                return b""

            if raw_audio.size % 2 == 0:
                try:
                    stereo = raw_audio.reshape(-1, 2).astype(np.float32)
                    mono = stereo.mean(axis=1)
                    return mono.astype(np.int16).tobytes()
                except ValueError:
                    pass
            
            return audio_bytes
        except Exception as e:
            logger.error("Error converting to mono: %s", e)
            return b""
            
    async def _listen_for_responses(self) -> None:
        """
        Listen for incoming WebSocket messages from ElevenLabs STT.
        """
        if not self._ws:
            return

        try:
            async for msg in self._ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = None
                    try:
                        data = msg.json()
                    except Exception:
                        try:
                            data = json.loads(msg.data)
                        except Exception:
                            logger.debug("Received non-json ws text message")
                            continue

                    responses = await self._handle_ws_event(data)
                    if responses:
                        for r in responses:
                            if self._transcript_callback:
                                try:
                                    await self._transcript_callback(r)
                                except Exception:
                                    logger.exception("Error in transcript callback")
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    logger.error("WebSocket error: %s", self._ws.exception())
                    self.emit("error", f"WebSocket error: {self._ws.exception()}")
                    break
                elif msg.type == aiohttp.WSMsgType.CLOSED:
                    logger.info("WebSocket closed by server.")
                    break
        except asyncio.CancelledError:
            logger.debug("WebSocket listener cancelled")
        except Exception as e:
            logger.exception("Error in WebSocket listener: %s", e)
            self.emit("error", str(e))
        finally:
            if self._ws:
                try:
                    await self._ws.close()
                except Exception:
                    pass
                self._ws = None
            self._ws_task = None

    async def _handle_ws_event(self, data: dict) -> List[STTResponse]:
        """
        Process a single WebSocket event from ElevenLabs STT.

        Args:
            data: JSON-decoded WebSocket message.

        Returns:
            List of STTResponse objects for this event.
        """
        responses: List[STTResponse] = []
        message_type = data.get("message_type")
        logger.debug("Received WS event: %s", message_type)

        if message_type in STT_ERROR_MSGS:
            logger.error("ElevenLabs STT error: %s", data)
            self.emit("error", data)
            return responses
        
        if message_type == "session_started":
            global_event_emitter.emit("speech_session_started")
            return responses

        if message_type == "committed_transcript_with_timestamps":
            logger.debug("==== Received final transcript event: %s", data)
            text = data.get("text", "")
            clean_text = text.strip()
            if self.detect_language:
                language_code = data.get("language_code", None)
                self.language_code = language_code
            avg_conf, duration = self.get_avg_confidence_and_duration(data)
            now = time.time()

            if clean_text == "":
                global_event_emitter.emit("speech_stopped")
                self._last_final_text = ""
                self._last_final_time = now
                return responses

            resp = STTResponse(
                event_type=SpeechEventType.FINAL,
                data=SpeechData(
                    text=clean_text,
                    confidence=avg_conf,
                    duration=duration,
                ),
                metadata={"model": self.model_id, "raw_event": data},
            )
            responses.append(resp)

            global_event_emitter.emit("speech_stopped")
            self._last_final_text = clean_text
            self._last_final_time = now
            return responses


        if message_type == "partial_transcript":
            text = data.get("text", "")
            clean_text = text.strip()

            if (
                self._last_final_text
                and clean_text
                and clean_text == self._last_final_text
                and (time.time() - self._last_final_time) < self._duplicate_suppression_window
            ):
                logger.debug("Dropping duplicate partial matching recent final transcript")
                return responses

            resp = STTResponse(
                event_type=SpeechEventType.INTERIM,
                data=SpeechData(
                    text=text,
                    confidence=float(data.get("confidence", 0.0)),
                    ),
                metadata={"model": self.model_id, "raw_event": data},
            )
            responses.append(resp)

            if clean_text:
                global_event_emitter.emit("speech_started")

            return responses
        
        

        logger.debug("Ignoring unrecognized message_type: %s", message_type)
        return responses

    async def aclose(self) -> None:
        """
        Close the WebSocket connection and cleanup session resources.

        Cancels the listener task, closes WebSocket and HTTP session,
        and calls the parent class cleanup.
        """
        if self._ws_task:
            self._ws_task.cancel()
            try:
                await self._ws_task
            except asyncio.CancelledError:
                pass
            self._ws_task = None

        if self._ws:
            try:
                await self._ws.close()
            except Exception:
                pass
            self._ws = None

        if self._session:
            try:
                await self._session.close()
            except Exception:
                pass
            finally:
                self._session = None

        await super().aclose()
    
    def get_avg_confidence_and_duration(self,payload: Dict) -> Tuple[float, float]:
        """
        Computes:
        - Average confidence from log probabilities
        - Total transcript duration from last word end timestamp

        Args:
            payload (dict): Transcript JSON message

        Returns:
            (avg_confidence, duration)
        """

        words = payload.get("words", [])

        total_confidence = 0.0
        word_count = 0
        last_end_time = 0.0

        for w in words:
            if w.get("type") != "word":
                continue

            logprob = w.get("logprob")

            if logprob is not None:
                total_confidence += math.exp(logprob)
                word_count += 1


            last_end_time = max(last_end_time, w.get("end", 0.0))


        avg_confidence = total_confidence / word_count if word_count > 0 else 0.0

        return avg_confidence, last_end_time

ElevenLabs Realtime Speech-to-Text (STT) client.

Initialize the ElevenLabs STT client.

Args

api_key
ElevenLabs API key for authentication. Defaults to env variable ELEVENLABS_API_KEY.
model_id
STT model identifier.
language_code
Language code for transcription.
sample_rate
Sample rate of input audio in Hz.
commit_strategy
Strategy for committing transcripts ('vad' is by default).
vad_silence_threshold_secs
Duration of silence to detect end-of-speech.
vad_threshold
Threshold for detecting voice activity.
min_speech_duration_ms
Minimum duration in milliseconds for a speech segment.
min_silence_duration_ms
Minimum duration in milliseconds of silence to consider end-of-speech.
base_url
WebSocket endpoint for ElevenLabs STT.

Raises

ValueError
If required parameters are missing or invalid.

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """
    Close the WebSocket connection and cleanup session resources.

    Cancels the listener task, closes WebSocket and HTTP session,
    and calls the parent class cleanup.
    """
    if self._ws_task:
        self._ws_task.cancel()
        try:
            await self._ws_task
        except asyncio.CancelledError:
            pass
        self._ws_task = None

    if self._ws:
        try:
            await self._ws.close()
        except Exception:
            pass
        self._ws = None

    if self._session:
        try:
            await self._session.close()
        except Exception:
            pass
        finally:
            self._session = None

    await super().aclose()

Close the WebSocket connection and cleanup session resources.

Cancels the listener task, closes WebSocket and HTTP session, and calls the parent class cleanup.

def get_avg_confidence_and_duration(self, payload: Dict) ‑> Tuple[float, float]
Expand source code
def get_avg_confidence_and_duration(self,payload: Dict) -> Tuple[float, float]:
    """
    Computes:
    - Average confidence from log probabilities
    - Total transcript duration from last word end timestamp

    Args:
        payload (dict): Transcript JSON message

    Returns:
        (avg_confidence, duration)
    """

    words = payload.get("words", [])

    total_confidence = 0.0
    word_count = 0
    last_end_time = 0.0

    for w in words:
        if w.get("type") != "word":
            continue

        logprob = w.get("logprob")

        if logprob is not None:
            total_confidence += math.exp(logprob)
            word_count += 1


        last_end_time = max(last_end_time, w.get("end", 0.0))


    avg_confidence = total_confidence / word_count if word_count > 0 else 0.0

    return avg_confidence, last_end_time

Computes: - Average confidence from log probabilities - Total transcript duration from last word end timestamp

Args

payload : dict
Transcript JSON message

Returns

(avg_confidence, duration)

async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None
Expand source code
async def process_audio(
    self, 
    audio_frames: bytes, 
    **kwargs: Any
) -> None:
    """
    Process and send audio frames. 
    Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead.
    """

    if not self._ws or self._ws.closed:
        await self._connect_ws()
        if not self._ws_task or self._ws_task.done():
            self._ws_task = asyncio.create_task(self._listen_for_responses())

    elif self._ws_task and self._ws_task.done():
        logger.warning("WebSocket listener stopped unexpectedly, restarting")
        self._ws_task = asyncio.create_task(self._listen_for_responses())

    try:
        mono_audio = self._convert_to_mono(audio_frames)
        if not mono_audio:
            return
        
        self._stream_buffer.extend(mono_audio)
        
        while len(self._stream_buffer) >= self._target_chunk_size:
            chunk = self._stream_buffer[:self._target_chunk_size]
            await self._send_audio(chunk)
            self._stream_buffer = self._stream_buffer[self._target_chunk_size:]

    except Exception as e:
        logger.exception("Error in process_audio: %s", e)
        self.emit("error", str(e))
        if self._ws:
            await self._ws.close()
            self._ws = None

Process and send audio frames. Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead.

class ElevenLabsTTS (*,
api_key: str | None = None,
model: str = 'eleven_flash_v2_5',
voice: str = 'ErXwobaYiN019PkySvjV',
speed: float = 1.0,
response_format: str = 'pcm_24000',
voice_settings: VoiceSettings | None = None,
base_url: str = 'https://api.elevenlabs.io/v1',
enable_streaming: bool = True,
inactivity_timeout: int = 300,
language: str = 'en',
enable_ssml_parsing: bool = False,
apply_text_normalization: "Literal['auto', 'on', 'off']" = 'auto',
auto_mode: bool = False)
Expand source code
class ElevenLabsTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = DEFAULT_MODEL,
        voice: str = DEFAULT_VOICE_ID,
        speed: float = 1.0,
        response_format: str = "pcm_24000",
        voice_settings: VoiceSettings | None = None,
        base_url: str = API_BASE_URL,
        enable_streaming: bool = True,
        inactivity_timeout: int = WS_INACTIVITY_TIMEOUT,
        language: str = "en",
        enable_ssml_parsing:bool=False,
        apply_text_normalization:Literal["auto","on","off"]="auto",
        auto_mode:bool=False
    ) -> None:
        """Initialize the ElevenLabs TTS plugin.

        Args:
            api_key (Optional[str], optional): ElevenLabs API key. Uses ELEVENLABS_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the TTS plugin. Defaults to "eleven_flash_v2_5".
            voice (str): The voice to use for the TTS plugin. Defaults to "EXAVITQu4vr4xnSDxMaL".
            speed (float): The speed to use for the TTS plugin. Defaults to 1.0.
            response_format (str): The response format to use for the TTS plugin. Defaults to "pcm_24000".
            voice_settings (Optional[VoiceSettings], optional): The voice settings to use for the TTS plugin. Defaults to None.
            base_url (str): The base URL to use for the TTS plugin. Defaults to "https://api.elevenlabs.io/v1".
            enable_streaming (bool): Whether to enable streaming for the TTS plugin. Defaults to True.
            inactivity_timeout (int): The inactivity timeout to use for the TTS plugin. Defaults to 300.
            language (str): The language to use for the TTS plugin. Defaults to "en".
            enable_ssml_parsing(bool): Whether to enable SSML parsing.
            apply_text_normalization:This parameter controls text normalization with three modes - "auto", "on", and "off". 
                When set to  auto, the system will automatically decide whether to apply text normalization (e.g., spelling out numbers)
                with "on", text normalization will always be applied, while with "off", it will be skipped. 
                For "eleven_turbo_v2_5" and "eleven_flash_v2_5" models, text normalization can only be enabled with Enterprise plans. Defaults to "auto".
            auto_mode (bool): Reduces latency by disabling chunk schedule and buffers. Recommended for full sentences/phrases.
        """
        super().__init__(
            sample_rate=ELEVENLABS_SAMPLE_RATE, num_channels=ELEVENLABS_CHANNELS
        )

        self.model = model
        self.voice = voice
        self.speed = speed
        self.language = language
        self.enable_ssml_parsing = enable_ssml_parsing
        self.apply_text_normalization = apply_text_normalization
        self.auto_mode = auto_mode
        self.audio_track = None
        self.loop = None
        self.response_format = response_format
        self.base_url = base_url
        self.enable_streaming = enable_streaming
        self.voice_settings = voice_settings or VoiceSettings()
        self.inactivity_timeout = inactivity_timeout
        self._first_chunk_sent = False
        self._ws_session = None
        self._ws_connection = None
        self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY")
        if not self.api_key:
            raise ValueError(
                "ElevenLabs API key must be provided either through api_key parameter or ELEVENLABS_API_KEY environment variable")

        self._session = httpx.AsyncClient(
            timeout=httpx.Timeout(connect=15.0, read=30.0,
                                  write=5.0, pool=5.0),
            follow_redirects=True,
        )

        self._streams = weakref.WeakSet()
        self._send_task: asyncio.Task | None = None
        self._recv_task: asyncio.Task | None = None
        self._should_stop = False

        self._connection_lock = asyncio.Lock()
        self._ws_voice_id: str | None = None
        self._active_contexts: set[str] = set()
        self._context_futures: dict[str, asyncio.Future[None]] = {}

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            target_voice = voice_id or self.voice
            self._should_stop = False

            if self.enable_streaming:
                await self._stream_synthesis(text, target_voice)
            else:
                if isinstance(text, AsyncIterator):
                    async for segment in segment_text(text):
                        if self._should_stop:
                            break
                        await self._chunked_synthesis(segment, target_voice)
                else:
                    await self._chunked_synthesis(text, target_voice)

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")
            raise 

    async def _chunked_synthesis(self, text: str, voice_id: str) -> None:
        """Non-streaming synthesis using the standard API"""
        url = f"{self.base_url}/text-to-speech/{voice_id}/stream"

        params = {
            "model_id": self.model,
            "output_format": self.response_format,
            "language_code": self.language,
            "enable_ssml_parsing":self.enable_ssml_parsing,
            "apply_text_normalization":self.apply_text_normalization,
            "auto_mode":self.auto_mode
        }

        headers = {
            "xi-api-key": self.api_key,
            "Content-Type": "application/json",
        }

        payload = {
            "text": text,
            "voice_settings": {
                "stability": self.voice_settings.stability,
                "similarity_boost": self.voice_settings.similarity_boost,
                "style": self.voice_settings.style,
                "use_speaker_boost": self.voice_settings.use_speaker_boost,
            },
        }

        try:
            async with self._session.stream(
                "POST",
                url,
                headers=headers,
                json=payload,
                params=params
            ) as response:
                response.raise_for_status()

                async for chunk in response.aiter_bytes():
                    if self._should_stop:
                        break
                    if chunk:
                        await self._stream_audio_chunks(chunk)

        except httpx.HTTPStatusError as e:
            self.emit(
                "error", f"HTTP error {e.response.status_code}: {e.response.text}")
            raise 
        except Exception as e:
            self.emit("error", f"Chunked synthesis failed: {str(e)}")
            raise

    async def _stream_synthesis(self, text: Union[AsyncIterator[str], str], voice_id: str) -> None:
        """WebSocket-based streaming synthesis using multi-context connection"""
        try:
            await self._ensure_connection(voice_id)

            context_id = uuid.uuid4().hex[:12]
            done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future()
            self.register_context(context_id, done_future)

            async def _single_chunk_gen(s: str) -> AsyncIterator[str]:
                yield s

            async def _send_chunks() -> None:
                try:
                    first_message_sent = False
                    if isinstance(text, str):
                        async for segment in segment_text(_single_chunk_gen(text)):
                            if self._should_stop:
                                break
                            await self.send_text(context_id, f"{segment} ",
                                                 voice_settings=None if first_message_sent else self._voice_settings_dict(),
                                                 flush=True)
                            first_message_sent = True
                    else:
                        async for chunk in text:
                            if self._should_stop:
                                break
                            await self.send_text(context_id, f"{chunk} ",
                                                 voice_settings=None if first_message_sent else self._voice_settings_dict())
                            first_message_sent = True

                    if not self._should_stop:
                        await self.flush_context(context_id)
                        await self.close_context(context_id)
                except Exception as e:
                    if not done_future.done():
                        done_future.set_exception(e)

            sender = asyncio.create_task(_send_chunks())

            await done_future
            await sender

        except Exception as e:
            self.emit("error", f"Streaming synthesis failed: {str(e)}")

            raise

    def _voice_settings_dict(self) -> dict[str, Any]:
        return {
            "stability": self.voice_settings.stability,
            "similarity_boost": self.voice_settings.similarity_boost,
            "style": self.voice_settings.style,
            "use_speaker_boost": self.voice_settings.use_speaker_boost,
        }

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        if not audio_bytes or self._should_stop:
            return

        if not self._first_chunk_sent and hasattr(self, '_first_audio_callback') and self._first_audio_callback:
            self._first_chunk_sent = True
            asyncio.create_task(self._first_audio_callback())

        if self.audio_track and self.loop:
            await self.audio_track.add_new_bytes(audio_bytes)

    async def interrupt(self) -> None:
        """Simple but effective interruption"""
        self._should_stop = True

        if self.audio_track:
            self.audio_track.interrupt()

        await self.close_all_contexts()

    async def aclose(self) -> None:
        """Cleanup resources"""
        self._should_stop = True

        for task in [self._send_task, self._recv_task]:
            if task and not task.done():
                task.cancel()

        for stream in list(self._streams):
            try:
                await stream.aclose()
            except Exception:
                pass

        self._streams.clear()

        if self._ws_connection and not self._ws_connection.closed:
            try:
                await self._ws_connection.send_str(json.dumps({"close_socket": True}))
            except Exception:
                pass
            await self._ws_connection.close()
        if self._ws_session and not self._ws_session.closed:
            await self._ws_session.close()
        self._ws_connection = None
        self._ws_session = None
        if self._session:
            await self._session.aclose()
        await super().aclose()

    async def _ensure_connection(self, voice_id: str) -> None:
        async with self._connection_lock:
            if self._ws_connection and not self._ws_connection.closed and self._ws_voice_id == voice_id:
                return

            if self._ws_connection and not self._ws_connection.closed:
                try:
                    await self._ws_connection.send_str(json.dumps({"close_socket": True}))
                except Exception:
                    pass
                await self._ws_connection.close()
            if self._ws_session and not self._ws_session.closed:
                await self._ws_session.close()

            self._ws_session = aiohttp.ClientSession()
            self._ws_voice_id = voice_id

            ws_url = f"{self.base_url}/text-to-speech/{voice_id}/multi-stream-input".replace("https://", "wss://").replace("http://", "ws://")
            params = {
                "model_id": self.model,
                "output_format": self.response_format,
                "inactivity_timeout": self.inactivity_timeout,
            }
            param_string = "&".join([f"{k}={v}" for k, v in params.items()])
            full_ws_url = f"{ws_url}?{param_string}"
            headers = {"xi-api-key": self.api_key}
            self._ws_connection = await asyncio.wait_for(self._ws_session.ws_connect(full_ws_url, headers=headers), timeout=10.0)

            if self._recv_task and not self._recv_task.done():
                self._recv_task.cancel()
            self._recv_task = asyncio.create_task(self._recv_loop())

    def register_context(self, context_id: str, done_future: asyncio.Future[None]) -> None:
        self._context_futures[context_id] = done_future

    async def send_text(
        self,
        context_id: str,
        text: str,
        *,
        voice_settings: Optional[dict[str, Any]] = None,
        flush: bool = False,
    ) -> None:
        if not self._ws_connection or self._ws_connection.closed:
            raise RuntimeError("WebSocket connection is closed")

        if context_id not in self._active_contexts:
            init_msg = {
                "context_id": context_id,
                "text": " ",
            }
            if voice_settings:
                init_msg["voice_settings"] = voice_settings
            await self._ws_connection.send_str(json.dumps(init_msg))
            self._active_contexts.add(context_id)

        pkt: dict[str, Any] = {"context_id": context_id, "text": text}
        if flush:
            pkt["flush"] = True
        await self._ws_connection.send_str(json.dumps(pkt))

    async def flush_context(self, context_id: str) -> None:
        if not self._ws_connection or self._ws_connection.closed:
            return
        await self._ws_connection.send_str(json.dumps({"context_id": context_id, "flush": True}))

    async def close_context(self, context_id: str) -> None:
        if not self._ws_connection or self._ws_connection.closed:
            return
        await self._ws_connection.send_str(json.dumps({"context_id": context_id, "close_context": True}))

    async def close_all_contexts(self) -> None:
        try:
            for context_id in list(self._active_contexts):
                await self.close_context(context_id)
        except Exception:
            pass

    async def _recv_loop(self) -> None:
        try:
            while self._ws_connection and not self._ws_connection.closed:
                msg = await self._ws_connection.receive()
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(msg.data)

                    if data.get("error"):
                        ctx_id = data.get("contextId")
                        fut = self._context_futures.get(ctx_id)
                        if fut and not fut.done():
                            fut.set_exception(RuntimeError(data["error"]))
                        continue

                    if data.get("audio"):
                        audio_chunk = base64.b64decode(data["audio"]) if isinstance(data["audio"], str) else None
                        if audio_chunk:
                            if not self._first_chunk_sent and hasattr(self, '_first_audio_callback') and self._first_audio_callback:
                                self._first_chunk_sent = True
                                asyncio.create_task(self._first_audio_callback())
                            if self.audio_track:
                                await self.audio_track.add_new_bytes(audio_chunk)

                    if data.get("is_final") or data.get("isFinal"):
                        ctx_id = data.get("contextId")
                        if ctx_id:
                            fut = self._context_futures.pop(ctx_id, None)
                            self._active_contexts.discard(ctx_id)
                            if fut and not fut.done():
                                fut.set_result(None)

                elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING):
                    break
        except Exception:
            for fut in self._context_futures.values():
                if not fut.done():
                    fut.set_exception(RuntimeError("WebSocket receive loop error"))
            self._context_futures.clear()

Base class for Text-to-Speech implementations

Initialize the ElevenLabs TTS plugin.

Args

api_key : Optional[str], optional
ElevenLabs API key. Uses ELEVENLABS_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the TTS plugin. Defaults to "eleven_flash_v2_5".
voice : str
The voice to use for the TTS plugin. Defaults to "EXAVITQu4vr4xnSDxMaL".
speed : float
The speed to use for the TTS plugin. Defaults to 1.0.
response_format : str
The response format to use for the TTS plugin. Defaults to "pcm_24000".
voice_settings : Optional[VoiceSettings], optional
The voice settings to use for the TTS plugin. Defaults to None.
base_url : str
The base URL to use for the TTS plugin. Defaults to "https://api.elevenlabs.io/v1".
enable_streaming : bool
Whether to enable streaming for the TTS plugin. Defaults to True.
inactivity_timeout : int
The inactivity timeout to use for the TTS plugin. Defaults to 300.
language : str
The language to use for the TTS plugin. Defaults to "en".
enable_ssml_parsing(bool): Whether to enable SSML parsing.
apply_text_normalization:This parameter controls text normalization with three modes - "auto", "on", and "off".
When set to auto, the system will automatically decide whether to apply text normalization (e.g., spelling out numbers)
with "on", text normalization will always be applied, while with "off", it will be skipped.
For "eleven_turbo_v2_5" and "eleven_flash_v2_5" models, text normalization can only be enabled with Enterprise plans. Defaults to "auto".
auto_mode : bool
Reduces latency by disabling chunk schedule and buffers. Recommended for full sentences/phrases.

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    self._should_stop = True

    for task in [self._send_task, self._recv_task]:
        if task and not task.done():
            task.cancel()

    for stream in list(self._streams):
        try:
            await stream.aclose()
        except Exception:
            pass

    self._streams.clear()

    if self._ws_connection and not self._ws_connection.closed:
        try:
            await self._ws_connection.send_str(json.dumps({"close_socket": True}))
        except Exception:
            pass
        await self._ws_connection.close()
    if self._ws_session and not self._ws_session.closed:
        await self._ws_session.close()
    self._ws_connection = None
    self._ws_session = None
    if self._session:
        await self._session.aclose()
    await super().aclose()

Cleanup resources

async def close_all_contexts(self) ‑> None
Expand source code
async def close_all_contexts(self) -> None:
    try:
        for context_id in list(self._active_contexts):
            await self.close_context(context_id)
    except Exception:
        pass
async def close_context(self, context_id: str) ‑> None
Expand source code
async def close_context(self, context_id: str) -> None:
    if not self._ws_connection or self._ws_connection.closed:
        return
    await self._ws_connection.send_str(json.dumps({"context_id": context_id, "close_context": True}))
async def flush_context(self, context_id: str) ‑> None
Expand source code
async def flush_context(self, context_id: str) -> None:
    if not self._ws_connection or self._ws_connection.closed:
        return
    await self._ws_connection.send_str(json.dumps({"context_id": context_id, "flush": True}))
async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Simple but effective interruption"""
    self._should_stop = True

    if self.audio_track:
        self.audio_track.interrupt()

    await self.close_all_contexts()

Simple but effective interruption

def register_context(self, context_id: str, done_future: asyncio.Future[None]) ‑> None
Expand source code
def register_context(self, context_id: str, done_future: asyncio.Future[None]) -> None:
    self._context_futures[context_id] = done_future
def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def send_text(self,
context_id: str,
text: str,
*,
voice_settings: Optional[dict[str, Any]] = None,
flush: bool = False) ‑> None
Expand source code
async def send_text(
    self,
    context_id: str,
    text: str,
    *,
    voice_settings: Optional[dict[str, Any]] = None,
    flush: bool = False,
) -> None:
    if not self._ws_connection or self._ws_connection.closed:
        raise RuntimeError("WebSocket connection is closed")

    if context_id not in self._active_contexts:
        init_msg = {
            "context_id": context_id,
            "text": " ",
        }
        if voice_settings:
            init_msg["voice_settings"] = voice_settings
        await self._ws_connection.send_str(json.dumps(init_msg))
        self._active_contexts.add(context_id)

    pkt: dict[str, Any] = {"context_id": context_id, "text": text}
    if flush:
        pkt["flush"] = True
    await self._ws_connection.send_str(json.dumps(pkt))
async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[str] | str,
    voice_id: Optional[str] = None,
    **kwargs: Any,
) -> None:
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        target_voice = voice_id or self.voice
        self._should_stop = False

        if self.enable_streaming:
            await self._stream_synthesis(text, target_voice)
        else:
            if isinstance(text, AsyncIterator):
                async for segment in segment_text(text):
                    if self._should_stop:
                        break
                    await self._chunked_synthesis(segment, target_voice)
            else:
                await self._chunked_synthesis(text, target_voice)

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")
        raise 

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None

class VoiceSettings (stability: float = 0.71,
similarity_boost: float = 0.5,
style: float = 0.0,
use_speaker_boost: bool = True)
Expand source code
@dataclass
class VoiceSettings:
    stability: float = 0.71
    similarity_boost: float = 0.5
    style: float = 0.0
    use_speaker_boost: bool = True

VoiceSettings(stability: 'float' = 0.71, similarity_boost: 'float' = 0.5, style: 'float' = 0.0, use_speaker_boost: 'bool' = True)

Instance variables

var similarity_boost : float
var stability : float
var style : float
var use_speaker_boost : bool