Package videosdk.plugins.elevenlabs

Sub-modules

videosdk.plugins.elevenlabs.stt
videosdk.plugins.elevenlabs.tts

Classes

class ElevenLabsSTT (*,
api_key: str | None = None,
model_id: str = 'scribe_v2_realtime',
language_code: str = 'en',
sample_rate: int = 48000,
commit_strategy: str = 'vad',
vad_silence_threshold_secs: float = 0.8,
vad_threshold: float = 0.4,
min_speech_duration_ms: int = 50,
min_silence_duration_ms: int = 50,
base_url: str = 'wss://api.elevenlabs.io/v1/speech-to-text/realtime')
Expand source code
class ElevenLabsSTT(BaseSTT):
    """
    ElevenLabs Realtime Speech-to-Text (STT) client.
    """

    def __init__(
        self,
        *,
        api_key: str | None = None,
        model_id: str = "scribe_v2_realtime",
        language_code: str = "en",
        sample_rate: int = 48000,
        commit_strategy: str = "vad",
        vad_silence_threshold_secs: float = 0.8,
        vad_threshold: float = 0.4,
        min_speech_duration_ms: int = 50,
        min_silence_duration_ms: int = 50,
        base_url: str = "wss://api.elevenlabs.io/v1/speech-to-text/realtime",
    ) -> None:
        """
        Initialize the ElevenLabs STT client.

        Args:
            api_key: ElevenLabs API key for authentication. Defaults to env variable ELEVENLABS_API_KEY.
            model_id: STT model identifier.
            language_code: Language code for transcription.
            sample_rate: Sample rate of input audio in Hz.
            commit_strategy: Strategy for committing transcripts ('vad' is by default).
            vad_silence_threshold_secs: Duration of silence to detect end-of-speech.
            vad_threshold: Threshold for detecting voice activity.
            min_speech_duration_ms: Minimum duration in milliseconds for a speech segment.
            min_silence_duration_ms: Minimum duration in milliseconds of silence to consider end-of-speech.
            base_url: WebSocket endpoint for ElevenLabs STT.
        Raises:
            ValueError: If required parameters are missing or invalid.
        """
        super().__init__()

        self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY")
        if not self.api_key:
            raise ValueError("ElevenLabs API key must be provided via api_key or ELEVENLABS_API_KEY env var")

        self.model_id = model_id
        self.language_code = language_code
        self.commit_strategy = commit_strategy
        self.base_url = base_url
        self.sample_rate = sample_rate

        if self.sample_rate not in SUPPORTED_SAMPLE_RATES:
            raise ValueError(f"Unsupported sample_rate: {self.sample_rate}. Supported rates: {SUPPORTED_SAMPLE_RATES}")
        
        self.vad_silence_threshold_secs = vad_silence_threshold_secs
        self.vad_threshold = vad_threshold
        self.min_speech_duration_ms = min_speech_duration_ms
        self.min_silence_duration_ms = min_silence_duration_ms

        self._last_final_text = ""
        self._last_final_time = 0.0
        self._duplicate_suppression_window = 0.75
        
        self._stream_buffer = bytearray()
        self._target_chunk_size = int(0.1 * self.sample_rate * 2) 

        self.heartbeat = 15.0
        self._session: Optional[aiohttp.ClientSession] = None
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self._ws_task: Optional[asyncio.Task] = None

    async def process_audio(
        self, 
        audio_frames: bytes, 
        **kwargs: Any
    ) -> None:
        """
        Process and send audio frames. 
        Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead.
        """

        if not self._ws or self._ws.closed:
            await self._connect_ws()
            if not self._ws_task or self._ws_task.done():
                self._ws_task = asyncio.create_task(self._listen_for_responses())

        elif self._ws_task and self._ws_task.done():
            logger.warning("WebSocket listener stopped unexpectedly, restarting")
            self._ws_task = asyncio.create_task(self._listen_for_responses())

        try:
            mono_audio = self._convert_to_mono(audio_frames)
            if not mono_audio:
                return
            
            self._stream_buffer.extend(mono_audio)
            
            while len(self._stream_buffer) >= self._target_chunk_size:
                chunk = self._stream_buffer[:self._target_chunk_size]
                await self._send_audio(chunk)
                self._stream_buffer = self._stream_buffer[self._target_chunk_size:]

        except Exception as e:
            logger.exception("Error in process_audio: %s", e)
            self.emit("error", str(e))
            if self._ws:
                await self._ws.close()
                self._ws = None

    async def _connect_ws(self) -> None:
        if not self._session:
            self._session = aiohttp.ClientSession()

        query_params = {
            "model_id": str(self.model_id),
            "language_code": str(self.language_code),
            "audio_format": f"pcm_{self.sample_rate}",
            "commit_strategy": str(self.commit_strategy),
            "vad_silence_threshold_secs": self.vad_silence_threshold_secs,
            "vad_threshold": self.vad_threshold,
            "min_speech_duration_ms": self.min_speech_duration_ms,
            "min_silence_duration_ms": self.min_silence_duration_ms,
        }

        ws_url = f"{self.base_url}?{urlencode(query_params)}"
        headers = {"xi-api-key": self.api_key}

        try:
            self._ws = await self._session.ws_connect(ws_url, headers=headers, heartbeat=self.heartbeat)
            logger.info("Connected to ElevenLabs Realtime STT WebSocket.")
        except Exception as e:
            logger.exception("Error connecting to ElevenLabs WebSocket: %s", e)
            raise

    async def _send_audio(self, audio_bytes: bytes) -> None:
        if not self._ws:
            return

        payload = {
            "message_type": "input_audio_chunk",
            "audio_base_64": base64.b64encode(audio_bytes).decode(),
            "sample_rate": self.sample_rate,
        }

        try:
            await self._ws.send_str(json.dumps(payload))
        except Exception as e:
            logger.exception("Error sending audio chunk: %s", e)
            self.emit("error", str(e))
            await self.aclose()

    def _convert_to_mono(self, audio_bytes: bytes) -> bytes:
        """
        Convert input audio bytes to mono. 
        """
        if not audio_bytes:
            return b""
        try:
            raw_audio = np.frombuffer(audio_bytes, dtype=np.int16)
            if raw_audio.size == 0:
                return b""

            if raw_audio.size % 2 == 0:
                try:
                    stereo = raw_audio.reshape(-1, 2).astype(np.float32)
                    mono = stereo.mean(axis=1)
                    return mono.astype(np.int16).tobytes()
                except ValueError:
                    pass
            
            return audio_bytes
        except Exception as e:
            logger.error("Error converting to mono: %s", e)
            return b""
            
    async def _listen_for_responses(self) -> None:
        """
        Listen for incoming WebSocket messages from ElevenLabs STT.
        """
        if not self._ws:
            return

        try:
            async for msg in self._ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = None
                    try:
                        data = msg.json()
                    except Exception:
                        try:
                            data = json.loads(msg.data)
                        except Exception:
                            logger.debug("Received non-json ws text message")
                            continue

                    responses = await self._handle_ws_event(data)
                    if responses:
                        for r in responses:
                            if self._transcript_callback:
                                try:
                                    await self._transcript_callback(r)
                                except Exception:
                                    logger.exception("Error in transcript callback")
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    logger.error("WebSocket error: %s", self._ws.exception())
                    self.emit("error", f"WebSocket error: {self._ws.exception()}")
                    break
                elif msg.type == aiohttp.WSMsgType.CLOSED:
                    logger.info("WebSocket closed by server.")
                    break
        except asyncio.CancelledError:
            logger.debug("WebSocket listener cancelled")
        except Exception as e:
            logger.exception("Error in WebSocket listener: %s", e)
            self.emit("error", str(e))
        finally:
            if self._ws:
                try:
                    await self._ws.close()
                except Exception:
                    pass
                self._ws = None
            self._ws_task = None

    async def _handle_ws_event(self, data: dict) -> List[STTResponse]:
        """
        Process a single WebSocket event from ElevenLabs STT.

        Args:
            data: JSON-decoded WebSocket message.

        Returns:
            List of STTResponse objects for this event.
        """
        responses: List[STTResponse] = []
        message_type = data.get("message_type")
        logger.debug("Received WS event: %s", message_type)

        if message_type in STT_ERROR_MSGS:
            logger.error("ElevenLabs STT error: %s", data)
            self.emit("error", data)
            return responses
        
        if message_type == "session_started":
            global_event_emitter.emit("speech_session_started")
            return responses

        if message_type == "committed_transcript":
            logger.info("==== Received final transcript event: %s", data)
            text = data.get("text", "")
            clean_text = text.strip()
            confidence = float(data.get("confidence", 0.0))
            now = time.time()

            if clean_text == "":
                global_event_emitter.emit("speech_stopped")
                self._last_final_text = ""
                self._last_final_time = now
                return responses

            resp = STTResponse(
                event_type=SpeechEventType.FINAL,
                data=SpeechData(
                    text=clean_text,
                    confidence=confidence,
                ),
                metadata={"model": self.model_id, "raw_event": data},
            )
            responses.append(resp)

            global_event_emitter.emit("speech_stopped")
            self._last_final_text = clean_text
            self._last_final_time = now
            return responses


        if message_type == "partial_transcript":
            text = data.get("text", "")
            clean_text = text.strip()

            if (
                self._last_final_text
                and clean_text
                and clean_text == self._last_final_text
                and (time.time() - self._last_final_time) < self._duplicate_suppression_window
            ):
                logger.debug("Dropping duplicate partial matching recent final transcript")
                return responses

            resp = STTResponse(
                event_type=SpeechEventType.INTERIM,
                data=SpeechData(
                    text=text,
                    confidence=float(data.get("confidence", 0.0)),
                ),
                metadata={"model": self.model_id, "raw_event": data},
            )
            responses.append(resp)

            if clean_text:
                global_event_emitter.emit("speech_started")

            return responses
        
        

        logger.debug("Ignoring unrecognized message_type: %s", message_type)
        return responses

    async def aclose(self) -> None:
        """
        Close the WebSocket connection and cleanup session resources.

        Cancels the listener task, closes WebSocket and HTTP session,
        and calls the parent class cleanup.
        """
        if self._ws_task:
            self._ws_task.cancel()
            try:
                await self._ws_task
            except asyncio.CancelledError:
                pass
            self._ws_task = None

        if self._ws:
            try:
                await self._ws.close()
            except Exception:
                pass
            self._ws = None

        if self._session:
            try:
                await self._session.close()
            except Exception:
                pass
            finally:
                self._session = None

        await super().aclose()

ElevenLabs Realtime Speech-to-Text (STT) client.

Initialize the ElevenLabs STT client.

Args

api_key
ElevenLabs API key for authentication. Defaults to env variable ELEVENLABS_API_KEY.
model_id
STT model identifier.
language_code
Language code for transcription.
sample_rate
Sample rate of input audio in Hz.
commit_strategy
Strategy for committing transcripts ('vad' is by default).
vad_silence_threshold_secs
Duration of silence to detect end-of-speech.
vad_threshold
Threshold for detecting voice activity.
min_speech_duration_ms
Minimum duration in milliseconds for a speech segment.
min_silence_duration_ms
Minimum duration in milliseconds of silence to consider end-of-speech.
base_url
WebSocket endpoint for ElevenLabs STT.

Raises

ValueError
If required parameters are missing or invalid.

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """
    Close the WebSocket connection and cleanup session resources.

    Cancels the listener task, closes WebSocket and HTTP session,
    and calls the parent class cleanup.
    """
    if self._ws_task:
        self._ws_task.cancel()
        try:
            await self._ws_task
        except asyncio.CancelledError:
            pass
        self._ws_task = None

    if self._ws:
        try:
            await self._ws.close()
        except Exception:
            pass
        self._ws = None

    if self._session:
        try:
            await self._session.close()
        except Exception:
            pass
        finally:
            self._session = None

    await super().aclose()

Close the WebSocket connection and cleanup session resources.

Cancels the listener task, closes WebSocket and HTTP session, and calls the parent class cleanup.

async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None
Expand source code
async def process_audio(
    self, 
    audio_frames: bytes, 
    **kwargs: Any
) -> None:
    """
    Process and send audio frames. 
    Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead.
    """

    if not self._ws or self._ws.closed:
        await self._connect_ws()
        if not self._ws_task or self._ws_task.done():
            self._ws_task = asyncio.create_task(self._listen_for_responses())

    elif self._ws_task and self._ws_task.done():
        logger.warning("WebSocket listener stopped unexpectedly, restarting")
        self._ws_task = asyncio.create_task(self._listen_for_responses())

    try:
        mono_audio = self._convert_to_mono(audio_frames)
        if not mono_audio:
            return
        
        self._stream_buffer.extend(mono_audio)
        
        while len(self._stream_buffer) >= self._target_chunk_size:
            chunk = self._stream_buffer[:self._target_chunk_size]
            await self._send_audio(chunk)
            self._stream_buffer = self._stream_buffer[self._target_chunk_size:]

    except Exception as e:
        logger.exception("Error in process_audio: %s", e)
        self.emit("error", str(e))
        if self._ws:
            await self._ws.close()
            self._ws = None

Process and send audio frames. Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead.

class ElevenLabsTTS (*,
api_key: str | None = None,
model: str = 'eleven_flash_v2_5',
voice: str = 'EXAVITQu4vr4xnSDxMaL',
speed: float = 1.0,
response_format: str = 'pcm_24000',
voice_settings: VoiceSettings | None = None,
base_url: str = 'https://api.elevenlabs.io/v1',
enable_streaming: bool = True,
inactivity_timeout: int = 300)
Expand source code
class ElevenLabsTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = DEFAULT_MODEL,
        voice: str = DEFAULT_VOICE_ID,
        speed: float = 1.0,
        response_format: str = "pcm_24000",
        voice_settings: VoiceSettings | None = None,
        base_url: str = API_BASE_URL,
        enable_streaming: bool = True,
        inactivity_timeout: int = WS_INACTIVITY_TIMEOUT,
    ) -> None:
        """Initialize the ElevenLabs TTS plugin.

        Args:
            api_key (Optional[str], optional): ElevenLabs API key. Uses ELEVENLABS_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the TTS plugin. Defaults to "eleven_flash_v2_5".
            voice (str): The voice to use for the TTS plugin. Defaults to "EXAVITQu4vr4xnSDxMaL".
            speed (float): The speed to use for the TTS plugin. Defaults to 1.0.
            response_format (str): The response format to use for the TTS plugin. Defaults to "pcm_24000".
            voice_settings (Optional[VoiceSettings], optional): The voice settings to use for the TTS plugin. Defaults to None.
            base_url (str): The base URL to use for the TTS plugin. Defaults to "https://api.elevenlabs.io/v1".
            enable_streaming (bool): Whether to enable streaming for the TTS plugin. Defaults to True.
            inactivity_timeout (int): The inactivity timeout to use for the TTS plugin. Defaults to 300.
        """
        super().__init__(
            sample_rate=ELEVENLABS_SAMPLE_RATE, num_channels=ELEVENLABS_CHANNELS
        )

        self.model = model
        self.voice = voice
        self.speed = speed
        self.audio_track = None
        self.loop = None
        self.response_format = response_format
        self.base_url = base_url
        self.enable_streaming = enable_streaming
        self.voice_settings = voice_settings or VoiceSettings()
        self.inactivity_timeout = inactivity_timeout
        self._first_chunk_sent = False
        self._ws_session = None
        self._ws_connection = None
        self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY")
        if not self.api_key:
            raise ValueError(
                "ElevenLabs API key must be provided either through api_key parameter or ELEVENLABS_API_KEY environment variable")

        self._session = httpx.AsyncClient(
            timeout=httpx.Timeout(connect=15.0, read=30.0,
                                  write=5.0, pool=5.0),
            follow_redirects=True,
        )

        self._streams = weakref.WeakSet()
        self._send_task: asyncio.Task | None = None
        self._recv_task: asyncio.Task | None = None
        self._should_stop = False

        self._connection_lock = asyncio.Lock()
        self._ws_voice_id: str | None = None
        self._active_contexts: set[str] = set()
        self._context_futures: dict[str, asyncio.Future[None]] = {}

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            target_voice = voice_id or self.voice
            self._should_stop = False

            if self.enable_streaming:
                await self._stream_synthesis(text, target_voice)
            else:
                if isinstance(text, AsyncIterator):
                    async for segment in segment_text(text):
                        if self._should_stop:
                            break
                        await self._chunked_synthesis(segment, target_voice)
                else:
                    await self._chunked_synthesis(text, target_voice)

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")

    async def _chunked_synthesis(self, text: str, voice_id: str) -> None:
        """Non-streaming synthesis using the standard API"""
        url = f"{self.base_url}/text-to-speech/{voice_id}/stream"

        params = {
            "model_id": self.model,
            "output_format": self.response_format,
        }

        headers = {
            "xi-api-key": self.api_key,
            "Content-Type": "application/json",
        }

        payload = {
            "text": text,
            "voice_settings": {
                "stability": self.voice_settings.stability,
                "similarity_boost": self.voice_settings.similarity_boost,
                "style": self.voice_settings.style,
                "use_speaker_boost": self.voice_settings.use_speaker_boost,
            },
        }

        try:
            async with self._session.stream(
                "POST",
                url,
                headers=headers,
                json=payload,
                params=params
            ) as response:
                response.raise_for_status()

                async for chunk in response.aiter_bytes():
                    if self._should_stop:
                        break
                    if chunk:
                        await self._stream_audio_chunks(chunk)

        except httpx.HTTPStatusError as e:
            self.emit(
                "error", f"HTTP error {e.response.status_code}: {e.response.text}")
        except Exception as e:
            self.emit("error", f"Chunked synthesis failed: {str(e)}")

    async def _stream_synthesis(self, text: Union[AsyncIterator[str], str], voice_id: str) -> None:
        """WebSocket-based streaming synthesis using multi-context connection"""
        try:
            await self._ensure_connection(voice_id)

            context_id = uuid.uuid4().hex[:12]
            done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future()
            self.register_context(context_id, done_future)

            async def _single_chunk_gen(s: str) -> AsyncIterator[str]:
                yield s

            async def _send_chunks() -> None:
                try:
                    first_message_sent = False
                    if isinstance(text, str):
                        async for segment in segment_text(_single_chunk_gen(text)):
                            if self._should_stop:
                                break
                            await self.send_text(context_id, f"{segment} ",
                                                 voice_settings=None if first_message_sent else self._voice_settings_dict(),
                                                 flush=True)
                            first_message_sent = True
                    else:
                        async for chunk in text:
                            if self._should_stop:
                                break
                            await self.send_text(context_id, f"{chunk} ",
                                                 voice_settings=None if first_message_sent else self._voice_settings_dict())
                            first_message_sent = True

                    if not self._should_stop:
                        await self.flush_context(context_id)
                        await self.close_context(context_id)
                except Exception as e:
                    if not done_future.done():
                        done_future.set_exception(e)

            sender = asyncio.create_task(_send_chunks())

            await done_future
            await sender

        except Exception as e:
            self.emit("error", f"Streaming synthesis failed: {str(e)}")

            if isinstance(text, str):
                await self._chunked_synthesis(text, voice_id)
            else:
                async for segment in segment_text(text):
                    if self._should_stop:
                        break
                    await self._chunked_synthesis(segment, voice_id)

    def _voice_settings_dict(self) -> dict[str, Any]:
        return {
            "stability": self.voice_settings.stability,
            "similarity_boost": self.voice_settings.similarity_boost,
            "style": self.voice_settings.style,
            "use_speaker_boost": self.voice_settings.use_speaker_boost,
        }

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        if not audio_bytes or self._should_stop:
            return

        if not self._first_chunk_sent and hasattr(self, '_first_audio_callback') and self._first_audio_callback:
            self._first_chunk_sent = True
            asyncio.create_task(self._first_audio_callback())

        if self.audio_track and self.loop:
            await self.audio_track.add_new_bytes(audio_bytes)

    async def interrupt(self) -> None:
        """Simple but effective interruption"""
        self._should_stop = True

        if self.audio_track:
            self.audio_track.interrupt()

        await self.close_all_contexts()

    async def aclose(self) -> None:
        """Cleanup resources"""
        self._should_stop = True

        for task in [self._send_task, self._recv_task]:
            if task and not task.done():
                task.cancel()

        for stream in list(self._streams):
            try:
                await stream.aclose()
            except Exception:
                pass

        self._streams.clear()

        if self._ws_connection and not self._ws_connection.closed:
            try:
                await self._ws_connection.send_str(json.dumps({"close_socket": True}))
            except Exception:
                pass
            await self._ws_connection.close()
        if self._ws_session and not self._ws_session.closed:
            await self._ws_session.close()
        self._ws_connection = None
        self._ws_session = None
        if self._session:
            await self._session.aclose()
        await super().aclose()

    async def _ensure_connection(self, voice_id: str) -> None:
        async with self._connection_lock:
            if self._ws_connection and not self._ws_connection.closed and self._ws_voice_id == voice_id:
                return

            if self._ws_connection and not self._ws_connection.closed:
                try:
                    await self._ws_connection.send_str(json.dumps({"close_socket": True}))
                except Exception:
                    pass
                await self._ws_connection.close()
            if self._ws_session and not self._ws_session.closed:
                await self._ws_session.close()

            self._ws_session = aiohttp.ClientSession()
            self._ws_voice_id = voice_id

            ws_url = f"{self.base_url}/text-to-speech/{voice_id}/multi-stream-input".replace("https://", "wss://").replace("http://", "ws://")
            params = {
                "model_id": self.model,
                "output_format": self.response_format,
                "inactivity_timeout": self.inactivity_timeout,
            }
            param_string = "&".join([f"{k}={v}" for k, v in params.items()])
            full_ws_url = f"{ws_url}?{param_string}"
            headers = {"xi-api-key": self.api_key}
            self._ws_connection = await asyncio.wait_for(self._ws_session.ws_connect(full_ws_url, headers=headers), timeout=10.0)

            if self._recv_task and not self._recv_task.done():
                self._recv_task.cancel()
            self._recv_task = asyncio.create_task(self._recv_loop())

    def register_context(self, context_id: str, done_future: asyncio.Future[None]) -> None:
        self._context_futures[context_id] = done_future

    async def send_text(
        self,
        context_id: str,
        text: str,
        *,
        voice_settings: Optional[dict[str, Any]] = None,
        flush: bool = False,
    ) -> None:
        if not self._ws_connection or self._ws_connection.closed:
            raise RuntimeError("WebSocket connection is closed")

        if context_id not in self._active_contexts:
            init_msg = {
                "context_id": context_id,
                "text": " ",
            }
            if voice_settings:
                init_msg["voice_settings"] = voice_settings
            await self._ws_connection.send_str(json.dumps(init_msg))
            self._active_contexts.add(context_id)

        pkt: dict[str, Any] = {"context_id": context_id, "text": text}
        if flush:
            pkt["flush"] = True
        await self._ws_connection.send_str(json.dumps(pkt))

    async def flush_context(self, context_id: str) -> None:
        if not self._ws_connection or self._ws_connection.closed:
            return
        await self._ws_connection.send_str(json.dumps({"context_id": context_id, "flush": True}))

    async def close_context(self, context_id: str) -> None:
        if not self._ws_connection or self._ws_connection.closed:
            return
        await self._ws_connection.send_str(json.dumps({"context_id": context_id, "close_context": True}))

    async def close_all_contexts(self) -> None:
        try:
            for context_id in list(self._active_contexts):
                await self.close_context(context_id)
        except Exception:
            pass

    async def _recv_loop(self) -> None:
        try:
            while self._ws_connection and not self._ws_connection.closed:
                msg = await self._ws_connection.receive()
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(msg.data)

                    if data.get("error"):
                        ctx_id = data.get("contextId")
                        fut = self._context_futures.get(ctx_id)
                        if fut and not fut.done():
                            fut.set_exception(RuntimeError(data["error"]))
                        continue

                    if data.get("audio"):
                        audio_chunk = base64.b64decode(data["audio"]) if isinstance(data["audio"], str) else None
                        if audio_chunk:
                            if not self._first_chunk_sent and hasattr(self, '_first_audio_callback') and self._first_audio_callback:
                                self._first_chunk_sent = True
                                asyncio.create_task(self._first_audio_callback())
                            if self.audio_track:
                                await self.audio_track.add_new_bytes(audio_chunk)

                    if data.get("is_final") or data.get("isFinal"):
                        ctx_id = data.get("contextId")
                        if ctx_id:
                            fut = self._context_futures.pop(ctx_id, None)
                            self._active_contexts.discard(ctx_id)
                            if fut and not fut.done():
                                fut.set_result(None)

                elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING):
                    break
        except Exception:
            for fut in self._context_futures.values():
                if not fut.done():
                    fut.set_exception(RuntimeError("WebSocket receive loop error"))
            self._context_futures.clear()

Base class for Text-to-Speech implementations

Initialize the ElevenLabs TTS plugin.

Args

api_key : Optional[str], optional
ElevenLabs API key. Uses ELEVENLABS_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the TTS plugin. Defaults to "eleven_flash_v2_5".
voice : str
The voice to use for the TTS plugin. Defaults to "EXAVITQu4vr4xnSDxMaL".
speed : float
The speed to use for the TTS plugin. Defaults to 1.0.
response_format : str
The response format to use for the TTS plugin. Defaults to "pcm_24000".
voice_settings : Optional[VoiceSettings], optional
The voice settings to use for the TTS plugin. Defaults to None.
base_url : str
The base URL to use for the TTS plugin. Defaults to "https://api.elevenlabs.io/v1".
enable_streaming : bool
Whether to enable streaming for the TTS plugin. Defaults to True.
inactivity_timeout : int
The inactivity timeout to use for the TTS plugin. Defaults to 300.

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    self._should_stop = True

    for task in [self._send_task, self._recv_task]:
        if task and not task.done():
            task.cancel()

    for stream in list(self._streams):
        try:
            await stream.aclose()
        except Exception:
            pass

    self._streams.clear()

    if self._ws_connection and not self._ws_connection.closed:
        try:
            await self._ws_connection.send_str(json.dumps({"close_socket": True}))
        except Exception:
            pass
        await self._ws_connection.close()
    if self._ws_session and not self._ws_session.closed:
        await self._ws_session.close()
    self._ws_connection = None
    self._ws_session = None
    if self._session:
        await self._session.aclose()
    await super().aclose()

Cleanup resources

async def close_all_contexts(self) ‑> None
Expand source code
async def close_all_contexts(self) -> None:
    try:
        for context_id in list(self._active_contexts):
            await self.close_context(context_id)
    except Exception:
        pass
async def close_context(self, context_id: str) ‑> None
Expand source code
async def close_context(self, context_id: str) -> None:
    if not self._ws_connection or self._ws_connection.closed:
        return
    await self._ws_connection.send_str(json.dumps({"context_id": context_id, "close_context": True}))
async def flush_context(self, context_id: str) ‑> None
Expand source code
async def flush_context(self, context_id: str) -> None:
    if not self._ws_connection or self._ws_connection.closed:
        return
    await self._ws_connection.send_str(json.dumps({"context_id": context_id, "flush": True}))
async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Simple but effective interruption"""
    self._should_stop = True

    if self.audio_track:
        self.audio_track.interrupt()

    await self.close_all_contexts()

Simple but effective interruption

def register_context(self, context_id: str, done_future: asyncio.Future[None]) ‑> None
Expand source code
def register_context(self, context_id: str, done_future: asyncio.Future[None]) -> None:
    self._context_futures[context_id] = done_future
def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def send_text(self,
context_id: str,
text: str,
*,
voice_settings: Optional[dict[str, Any]] = None,
flush: bool = False) ‑> None
Expand source code
async def send_text(
    self,
    context_id: str,
    text: str,
    *,
    voice_settings: Optional[dict[str, Any]] = None,
    flush: bool = False,
) -> None:
    if not self._ws_connection or self._ws_connection.closed:
        raise RuntimeError("WebSocket connection is closed")

    if context_id not in self._active_contexts:
        init_msg = {
            "context_id": context_id,
            "text": " ",
        }
        if voice_settings:
            init_msg["voice_settings"] = voice_settings
        await self._ws_connection.send_str(json.dumps(init_msg))
        self._active_contexts.add(context_id)

    pkt: dict[str, Any] = {"context_id": context_id, "text": text}
    if flush:
        pkt["flush"] = True
    await self._ws_connection.send_str(json.dumps(pkt))
async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[str] | str,
    voice_id: Optional[str] = None,
    **kwargs: Any,
) -> None:
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        target_voice = voice_id or self.voice
        self._should_stop = False

        if self.enable_streaming:
            await self._stream_synthesis(text, target_voice)
        else:
            if isinstance(text, AsyncIterator):
                async for segment in segment_text(text):
                    if self._should_stop:
                        break
                    await self._chunked_synthesis(segment, target_voice)
            else:
                await self._chunked_synthesis(text, target_voice)

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None

class VoiceSettings (stability: float = 0.71,
similarity_boost: float = 0.5,
style: float = 0.0,
use_speaker_boost: bool = True)
Expand source code
@dataclass
class VoiceSettings:
    stability: float = 0.71
    similarity_boost: float = 0.5
    style: float = 0.0
    use_speaker_boost: bool = True

VoiceSettings(stability: 'float' = 0.71, similarity_boost: 'float' = 0.5, style: 'float' = 0.0, use_speaker_boost: 'bool' = True)

Instance variables

var similarity_boost : float
var stability : float
var style : float
var use_speaker_boost : bool