Package videosdk.plugins.elevenlabs

Sub-modules

videosdk.plugins.elevenlabs.tts

Classes

class ElevenLabsTTS (*,
api_key: str | None = None,
model: str = 'eleven_flash_v2_5',
voice: str = 'EXAVITQu4vr4xnSDxMaL',
speed: float = 1.0,
response_format: str = 'pcm_24000',
voice_settings: VoiceSettings | None = None,
base_url: str = 'https://api.elevenlabs.io/v1',
enable_streaming: bool = True,
inactivity_timeout: int = 300)
Expand source code
class ElevenLabsTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = DEFAULT_MODEL,
        voice: str = DEFAULT_VOICE_ID,
        speed: float = 1.0,
        response_format: str = "pcm_24000",
        voice_settings: VoiceSettings | None = None,
        base_url: str = API_BASE_URL,
        enable_streaming: bool = True,
        inactivity_timeout: int = WS_INACTIVITY_TIMEOUT,
    ) -> None:
        """Initialize the ElevenLabs TTS plugin.

        Args:
            api_key (Optional[str], optional): ElevenLabs API key. Uses ELEVENLABS_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the TTS plugin. Defaults to "eleven_flash_v2_5".
            voice (str): The voice to use for the TTS plugin. Defaults to "EXAVITQu4vr4xnSDxMaL".
            speed (float): The speed to use for the TTS plugin. Defaults to 1.0.
            response_format (str): The response format to use for the TTS plugin. Defaults to "pcm_24000".
            voice_settings (Optional[VoiceSettings], optional): The voice settings to use for the TTS plugin. Defaults to None.
            base_url (str): The base URL to use for the TTS plugin. Defaults to "https://api.elevenlabs.io/v1".
            enable_streaming (bool): Whether to enable streaming for the TTS plugin. Defaults to True.
            inactivity_timeout (int): The inactivity timeout to use for the TTS plugin. Defaults to 300.
        """
        super().__init__(
            sample_rate=ELEVENLABS_SAMPLE_RATE, num_channels=ELEVENLABS_CHANNELS
        )

        self.model = model
        self.voice = voice
        self.speed = speed
        self.audio_track = None
        self.loop = None
        self.response_format = response_format
        self.base_url = base_url
        self.enable_streaming = enable_streaming
        self.voice_settings = voice_settings or VoiceSettings()
        self.inactivity_timeout = inactivity_timeout
        self._first_chunk_sent = False
        self._ws_session = None
        self._ws_connection = None
        self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY")
        if not self.api_key:
            raise ValueError(
                "ElevenLabs API key must be provided either through api_key parameter or ELEVENLABS_API_KEY environment variable")

        self._session = httpx.AsyncClient(
            timeout=httpx.Timeout(connect=15.0, read=30.0,
                                  write=5.0, pool=5.0),
            follow_redirects=True,
        )

        self._streams = weakref.WeakSet()
        self._send_task: asyncio.Task | None = None
        self._recv_task: asyncio.Task | None = None
        self._should_stop = False

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            target_voice = voice_id or self.voice
            self._should_stop = False

            if self.enable_streaming:
                await self._stream_synthesis(text, target_voice)
            else:
                if isinstance(text, AsyncIterator):
                    async for segment in segment_text(text):
                        if self._should_stop:
                            break
                        await self._chunked_synthesis(segment, target_voice)
                else:
                    await self._chunked_synthesis(text, target_voice)

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")

    async def _chunked_synthesis(self, text: str, voice_id: str) -> None:
        """Non-streaming synthesis using the standard API"""
        url = f"{self.base_url}/text-to-speech/{voice_id}/stream"

        params = {
            "model_id": self.model,
            "output_format": self.response_format,
        }

        headers = {
            "xi-api-key": self.api_key,
            "Content-Type": "application/json",
        }

        payload = {
            "text": text,
            "voice_settings": {
                "stability": self.voice_settings.stability,
                "similarity_boost": self.voice_settings.similarity_boost,
                "style": self.voice_settings.style,
                "use_speaker_boost": self.voice_settings.use_speaker_boost,
            },
        }

        try:
            async with self._session.stream(
                "POST",
                url,
                headers=headers,
                json=payload,
                params=params
            ) as response:
                response.raise_for_status()

                async for chunk in response.aiter_bytes():
                    if self._should_stop:
                        break
                    if chunk:
                        await self._stream_audio_chunks(chunk)

        except httpx.HTTPStatusError as e:
            self.emit(
                "error", f"HTTP error {e.response.status_code}: {e.response.text}")
        except Exception as e:
            self.emit("error", f"Chunked synthesis failed: {str(e)}")

    async def _stream_synthesis(self, text: Union[AsyncIterator[str], str], voice_id: str) -> None:
        """WebSocket-based streaming synthesis"""

        ws_session = None
        ws_connection = None

        try:
            ws_url = f"wss://api.elevenlabs.io/v1/text-to-speech/{voice_id}/stream-input"
            params = {
                "model_id": self.model,
                "output_format": self.response_format,
                "inactivity_timeout": self.inactivity_timeout,
            }
            param_string = "&".join([f"{k}={v}" for k, v in params.items()])
            full_ws_url = f"{ws_url}?{param_string}"

            headers = {"xi-api-key": self.api_key}

            ws_session = aiohttp.ClientSession()
            ws_connection = await asyncio.wait_for(
                ws_session.ws_connect(full_ws_url, headers=headers),
                timeout=10.0
            )

            init_message = {
                "text": " ",
                "voice_settings": {
                    "stability": self.voice_settings.stability,
                    "similarity_boost": self.voice_settings.similarity_boost,
                    "style": self.voice_settings.style,
                    "use_speaker_boost": self.voice_settings.use_speaker_boost,
                },
            }
            await ws_connection.send_str(json.dumps(init_message))

            self._send_task = asyncio.create_task(
                self._send_text_task(ws_connection, text))
            self._recv_task = asyncio.create_task(
                self._receive_audio_task(ws_connection))

            await asyncio.gather(self._send_task, self._recv_task)

        except Exception as e:
            self.emit("error", f"Streaming synthesis failed: {str(e)}")

            if isinstance(text, str):
                await self._chunked_synthesis(text, voice_id)
            else:
                async for segment in segment_text(text):
                    if self._should_stop:
                        break
                    await self._chunked_synthesis(segment, voice_id)

        finally:
            for task in [self._send_task, self._recv_task]:
                if task and not task.done():
                    task.cancel()

            try:
                await asyncio.wait_for(
                    asyncio.gather(
                        *(t for t in [self._send_task, self._recv_task] if t),
                        return_exceptions=True
                    ),
                    timeout=0.3
                )
            except asyncio.TimeoutError:
                pass

            self._send_task = None
            self._recv_task = None

            if ws_connection and not ws_connection.closed:
                await ws_connection.close()
            if ws_session and not ws_session.closed:
                await ws_session.close()

    async def _send_text_task(self, ws_connection: aiohttp.ClientWebSocketResponse, text: Union[AsyncIterator[str], str]) -> None:
        """Task for sending text to WebSocket"""
        try:
            if isinstance(text, str):
                if not self._should_stop:
                    text_message = {"text": f"{text} "}
                    await ws_connection.send_str(json.dumps(text_message))
            else:
                async for chunk in text:
                    if ws_connection.closed or self._should_stop:
                        break

                    chunk_message = {"text": f"{chunk} "}
                    await ws_connection.send_str(json.dumps(chunk_message))

            if not ws_connection.closed and not self._should_stop:
                eos_message = {"text": ""}
                await ws_connection.send_str(json.dumps(eos_message))

        except Exception as e:
            if not self._should_stop:
                self.emit("error", f"Send task error: {str(e)}")
            raise

    async def _receive_audio_task(self, ws_connection: aiohttp.ClientWebSocketResponse) -> None:
        """Task for receiving audio from WebSocket"""
        try:
            while not ws_connection.closed and not self._should_stop:
                try:
                    msg = await ws_connection.receive()

                    if msg.type == aiohttp.WSMsgType.TEXT:
                        data = json.loads(msg.data)

                        if data.get("audio"):
                            import base64
                            audio_chunk = base64.b64decode(data["audio"])
                            if not self._should_stop:
                                await self._stream_audio_chunks(audio_chunk)

                        elif data.get("isFinal"):
                            break

                        elif data.get("error"):
                            self.emit(
                                "error", f"ElevenLabs error: {data['error']}")
                            raise ValueError(
                                f"ElevenLabs error: {data['error']}")

                    elif msg.type == aiohttp.WSMsgType.ERROR:
                        raise ConnectionError(
                            f"WebSocket error: {ws_connection.exception()}")

                    elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING):
                        break

                except asyncio.TimeoutError:
                    if not self._should_stop:
                        self.emit("error", "WebSocket receive timeout")
                    break

        except Exception as e:
            if not self._should_stop:
                self.emit("error", f"Receive task error: {str(e)}")
            raise

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        if not audio_bytes or self._should_stop:
            return

        if not self._first_chunk_sent and hasattr(self, '_first_audio_callback') and self._first_audio_callback:
            self._first_chunk_sent = True
            asyncio.create_task(self._first_audio_callback())

        if self.audio_track and self.loop:
            await self.audio_track.add_new_bytes(audio_bytes)

    async def interrupt(self) -> None:
        """Simple but effective interruption"""
        self._should_stop = True

        if self.audio_track:
            self.audio_track.interrupt()

        for task in [self._send_task, self._recv_task]:
            if task and not task.done():
                task.cancel()

        if self._ws_connection and not self._ws_connection.closed:
            await self._ws_connection.close()

    async def aclose(self) -> None:
        """Cleanup resources"""
        self._should_stop = True

        for task in [self._send_task, self._recv_task]:
            if task and not task.done():
                task.cancel()

        for stream in list(self._streams):
            try:
                await stream.aclose()
            except Exception:
                pass

        self._streams.clear()

        if self._ws_connection and not self._ws_connection.closed:
            await self._ws_connection.close()
        if self._ws_session:
            await self._ws_session.close()
        if self._session:
            await self._session.aclose()
        await super().aclose()

Base class for Text-to-Speech implementations

Initialize the ElevenLabs TTS plugin.

Args

api_key : Optional[str], optional
ElevenLabs API key. Uses ELEVENLABS_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the TTS plugin. Defaults to "eleven_flash_v2_5".
voice : str
The voice to use for the TTS plugin. Defaults to "EXAVITQu4vr4xnSDxMaL".
speed : float
The speed to use for the TTS plugin. Defaults to 1.0.
response_format : str
The response format to use for the TTS plugin. Defaults to "pcm_24000".
voice_settings : Optional[VoiceSettings], optional
The voice settings to use for the TTS plugin. Defaults to None.
base_url : str
The base URL to use for the TTS plugin. Defaults to "https://api.elevenlabs.io/v1".
enable_streaming : bool
Whether to enable streaming for the TTS plugin. Defaults to True.
inactivity_timeout : int
The inactivity timeout to use for the TTS plugin. Defaults to 300.

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    self._should_stop = True

    for task in [self._send_task, self._recv_task]:
        if task and not task.done():
            task.cancel()

    for stream in list(self._streams):
        try:
            await stream.aclose()
        except Exception:
            pass

    self._streams.clear()

    if self._ws_connection and not self._ws_connection.closed:
        await self._ws_connection.close()
    if self._ws_session:
        await self._ws_session.close()
    if self._session:
        await self._session.aclose()
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Simple but effective interruption"""
    self._should_stop = True

    if self.audio_track:
        self.audio_track.interrupt()

    for task in [self._send_task, self._recv_task]:
        if task and not task.done():
            task.cancel()

    if self._ws_connection and not self._ws_connection.closed:
        await self._ws_connection.close()

Simple but effective interruption

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[str] | str,
    voice_id: Optional[str] = None,
    **kwargs: Any,
) -> None:
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        target_voice = voice_id or self.voice
        self._should_stop = False

        if self.enable_streaming:
            await self._stream_synthesis(text, target_voice)
        else:
            if isinstance(text, AsyncIterator):
                async for segment in segment_text(text):
                    if self._should_stop:
                        break
                    await self._chunked_synthesis(segment, target_voice)
            else:
                await self._chunked_synthesis(text, target_voice)

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None

class VoiceSettings (stability: float = 0.71,
similarity_boost: float = 0.5,
style: float = 0.0,
use_speaker_boost: bool = True)
Expand source code
@dataclass
class VoiceSettings:
    stability: float = 0.71
    similarity_boost: float = 0.5
    style: float = 0.0
    use_speaker_boost: bool = True

VoiceSettings(stability: 'float' = 0.71, similarity_boost: 'float' = 0.5, style: 'float' = 0.0, use_speaker_boost: 'bool' = True)

Instance variables

var similarity_boost : float
var stability : float
var style : float
var use_speaker_boost : bool