Module videosdk.plugins.elevenlabs.tts

Classes

class ElevenLabsTTS (*,
api_key: str | None = None,
model: str = 'eleven_flash_v2_5',
voice: str = 'ErXwobaYiN019PkySvjV',
speed: float = 1.0,
response_format: str = 'pcm_24000',
voice_settings: VoiceSettings | None = None,
base_url: str = 'https://api.elevenlabs.io/v1',
enable_streaming: bool = True,
inactivity_timeout: int = 300,
language: str = 'en',
enable_ssml_parsing: bool = False,
apply_text_normalization: "Literal['auto', 'on', 'off']" = 'auto',
auto_mode: bool = True,
word_timestamps: bool = False)
Expand source code
class ElevenLabsTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = DEFAULT_MODEL,
        voice: str = DEFAULT_VOICE_ID,
        speed: float = 1.0,
        response_format: str = "pcm_24000",
        voice_settings: VoiceSettings | None = None,
        base_url: str = API_BASE_URL,
        enable_streaming: bool = True,
        inactivity_timeout: int = WS_INACTIVITY_TIMEOUT,
        language: str = "en",
        enable_ssml_parsing: bool = False,
        apply_text_normalization: Literal["auto", "on", "off"] = "auto",
        auto_mode: bool = True,
        word_timestamps: bool = False,
    ) -> None:
        """Initialize the ElevenLabs TTS plugin.

        Args:
            api_key (Optional[str], optional): ElevenLabs API key. Uses ELEVENLABS_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the TTS plugin. Defaults to "eleven_flash_v2_5".
            voice (str): The voice to use for the TTS plugin. Defaults to "EXAVITQu4vr4xnSDxMaL".
            speed (float): The speed to use for the TTS plugin. Defaults to 1.0.
            response_format (str): The response format to use for the TTS plugin. Defaults to "pcm_24000".
            voice_settings (Optional[VoiceSettings], optional): The voice settings to use for the TTS plugin. Defaults to None.
            base_url (str): The base URL to use for the TTS plugin. Defaults to "https://api.elevenlabs.io/v1".
            enable_streaming (bool): Whether to enable streaming for the TTS plugin. Defaults to True.
            inactivity_timeout (int): The inactivity timeout to use for the TTS plugin. Defaults to 300.
            language (str): The language to use for the TTS plugin. Defaults to "en".
            enable_ssml_parsing(bool): Whether to enable SSML parsing.
            apply_text_normalization:This parameter controls text normalization with three modes - "auto", "on", and "off". 
                When set to  auto, the system will automatically decide whether to apply text normalization (e.g., spelling out numbers)
                with "on", text normalization will always be applied, while with "off", it will be skipped. 
                For "eleven_turbo_v2_5" and "eleven_flash_v2_5" models, text normalization can only be enabled with Enterprise plans. Defaults to "auto".
            auto_mode (bool): Reduces latency by disabling the server-side chunk_length_schedule
                buffer (default [120, 160, 250, 290] chars before first audio). Defaults to True
                because the upstream pipeline orchestrator already feeds TTS sentence-bounded
                chunks. Set to False only if you are streaming raw mid-sentence tokens directly
                and want ElevenLabs to buffer for higher prosody quality at the cost of TTFB.
        """
        super().__init__(
            sample_rate=ELEVENLABS_SAMPLE_RATE,
            num_channels=ELEVENLABS_CHANNELS,
            word_timestamps=word_timestamps,
        )

        self.model = model
        self.voice = voice
        self.speed = speed
        self.language = language
        self.enable_ssml_parsing = enable_ssml_parsing
        self.apply_text_normalization = apply_text_normalization
        self.auto_mode = auto_mode
        self.audio_track = None
        self.loop = None
        self.response_format = response_format
        self.base_url = _normalize_base_url(base_url)
        self.enable_streaming = enable_streaming
        self.voice_settings = voice_settings or VoiceSettings()
        self.inactivity_timeout = inactivity_timeout
        self._first_chunk_sent = False
        self._ws_session = None
        self._ws_connection = None
        self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY")
        if not self.api_key:
            raise ValueError(
                "ElevenLabs API key must be provided either through api_key parameter or ELEVENLABS_API_KEY environment variable")

        self._session = httpx.AsyncClient(
            timeout=httpx.Timeout(connect=15.0, read=30.0,
                                  write=5.0, pool=5.0),
            follow_redirects=True,
        )

        self._streams = weakref.WeakSet()
        self._send_task: asyncio.Task | None = None
        self._recv_task: asyncio.Task | None = None
        self._should_stop = False

        self._connection_lock = asyncio.Lock()
        self._ws_voice_id: str | None = None
        self._active_contexts: set[str] = set()
        self._context_futures: dict[str, asyncio.Future[None]] = {}

        self._audio_start_time: Optional[float] = None
        self._spoken_words: list[str] = []
        self._word_schedule_tasks: list[asyncio.Task] = []
        self._last_scheduled_start_sec: float = -1.0
        self._pending_word_chars: list[str] = []
        self._pending_word_start_sec: Optional[float] = None

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False
        self._audio_start_time = None
        self._spoken_words = []
        for task in self._word_schedule_tasks:
            if not task.done():
                task.cancel()
        self._word_schedule_tasks = []
        self._last_scheduled_start_sec = -1.0
        self._pending_word_chars = []
        self._pending_word_start_sec = None

    async def prewarm(self) -> None:
        """Pre-establish the ElevenLabs WebSocket so the first ``synthesize()`` call
        does not pay the TLS + auth + upgrade cost. Safe to call repeatedly."""
        if not self.enable_streaming:
            return
        try:
            await self._ensure_connection(self.voice)
        except Exception as e:
            self.emit("error", f"ElevenLabs prewarm failed: {e}")

    async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            target_voice = voice_id or self.voice
            self._should_stop = False

            if self.enable_streaming:
                await self._stream_synthesis(text, target_voice)
            else:
                if isinstance(text, AsyncIterator):
                    async for segment in segment_text(text):
                        if self._should_stop:
                            break
                        await self._chunked_synthesis(segment, target_voice)
                else:
                    await self._chunked_synthesis(text, target_voice)

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")
            raise 

    async def _chunked_synthesis(self, text: str, voice_id: str) -> None:
        """Non-streaming synthesis using the standard API"""
        url = f"{self.base_url}/text-to-speech/{voice_id}/stream"

        params = {
            "model_id": self.model,
            "output_format": self.response_format,
            "language_code": self.language,
            "enable_ssml_parsing":self.enable_ssml_parsing,
            "apply_text_normalization":self.apply_text_normalization,
            "auto_mode":self.auto_mode
        }

        headers = {
            "xi-api-key": self.api_key,
            "Content-Type": "application/json",
        }

        payload = {
            "text": text,
            "voice_settings": {
                "stability": self.voice_settings.stability,
                "similarity_boost": self.voice_settings.similarity_boost,
                "style": self.voice_settings.style,
                "use_speaker_boost": self.voice_settings.use_speaker_boost,
            },
        }

        try:
            async with self._session.stream(
                "POST",
                url,
                headers=headers,
                json=payload,
                params=params
            ) as response:
                response.raise_for_status()

                async for chunk in response.aiter_bytes():
                    if self._should_stop:
                        break
                    if chunk:
                        await self._stream_audio_chunks(chunk)

        except httpx.HTTPStatusError as e:
            self.emit(
                "error", f"HTTP error {e.response.status_code}: {e.response.text}")
            raise 
        except Exception as e:
            self.emit("error", f"Chunked synthesis failed: {str(e)}")
            raise

    async def _stream_synthesis(self, text: Union[AsyncIterator[str], str], voice_id: str) -> None:
        """WebSocket-based streaming synthesis using multi-context connection.

        Forwards text verbatim into a single context. The server buffers and
        normalises (currency, decimals, mixed-script numbers); client-side
        re-segmentation here would split tokens like ``₹50,000`` and break
        prosody, so we deliberately don't do it. One trailing ``flush_context``
        + ``close_context`` finalises the turn.
        """
        try:
            await self._ensure_connection(voice_id)

            context_id = uuid.uuid4().hex[:12]
            done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future()
            self.register_context(context_id, done_future)

            flush_on_chunk = bool(self.auto_mode)
            async def _send_chunks() -> None:
                try:
                    first_message_sent = False
                    if isinstance(text, str):
                        if text:
                            await self.send_text(
                                context_id,
                                f"{text} ",
                                voice_settings=self._voice_settings_dict(),
                                flush=flush_on_chunk,
                            )
                            first_message_sent = True
                    else:
                        async for chunk in text:
                            if self._should_stop:
                                break
                            if isinstance(chunk, FlushMarker):
                                await self.flush_context(context_id)
                                continue
                            if not chunk:
                                continue
                            await self.send_text(
                                context_id,
                                f"{chunk} ",
                                voice_settings=None if first_message_sent else self._voice_settings_dict(),
                                flush=flush_on_chunk,
                            )
                            first_message_sent = True

                    if not self._should_stop:
                        await self.flush_context(context_id)
                        await self.close_context(context_id)
                except Exception as e:
                    if not done_future.done():
                        done_future.set_exception(e)

            sender = asyncio.create_task(_send_chunks())

            try:
                await done_future
            finally:
                if not sender.done():
                    sender.cancel()
                try:
                    await sender
                except (asyncio.CancelledError, Exception):
                    pass

        except asyncio.CancelledError:
            raise
        except Exception as e:
            self.emit("error", f"Streaming synthesis failed: {str(e)}")
            raise

    def _voice_settings_dict(self) -> dict[str, Any]:
        return {
            "stability": self.voice_settings.stability,
            "similarity_boost": self.voice_settings.similarity_boost,
            "style": self.voice_settings.style,
            "use_speaker_boost": self.voice_settings.use_speaker_boost,
        }

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        if not audio_bytes or self._should_stop:
            return

        if not self._first_chunk_sent:
            self._first_chunk_sent = True
            self._audio_start_time = asyncio.get_event_loop().time()
            if hasattr(self, '_first_audio_callback') and self._first_audio_callback:
                asyncio.create_task(self._first_audio_callback())

        if self.audio_track and self.loop:
            await self.audio_track.add_new_bytes(audio_bytes)

    async def _schedule_word_emit(self, word: str, start_sec: float) -> None:
        """Emit a ``word_spoken`` event at the moment its audio begins to play."""
        while self._audio_start_time is None and not self._should_stop:
            await asyncio.sleep(0.01)
        if self._should_stop or self._audio_start_time is None:
            return

        target_time = self._audio_start_time + float(start_sec)
        now = asyncio.get_event_loop().time()
        delay = target_time - now
        if delay > 0:
            try:
                await asyncio.sleep(delay)
            except asyncio.CancelledError:
                return
        if self._should_stop:
            return

        self._spoken_words.append(word)
        cumulative = " ".join(self._spoken_words)
        try:
            self.emit("word_spoken", {"word": word, "cumulative_text": cumulative})
        except Exception:
            pass

    def _schedule_words_from_alignment(self, alignment: dict) -> None:
        """Parse an ElevenLabs alignment payload and schedule per-word emits.

        ElevenLabs returns character-level alignment (``chars`` /
        ``charStartTimesMs``). We aggregate consecutive non-whitespace chars
        into whole words, then schedule each word at its first-char start time
        so the UI renders word-by-word synced with audio playback.
        """
        if not alignment:
            return
        chars = alignment.get("chars") or alignment.get("characters") or []
        if "charStartTimesMs" in alignment:
            starts_sec = [float(t) / 1000.0 for t in alignment["charStartTimesMs"]]
        elif "character_start_times_seconds" in alignment:
            starts_sec = [float(t) for t in alignment["character_start_times_seconds"]]
        else:
            return

        for ch, start_sec in zip(chars, starts_sec):
            if ch.isspace():
                self._flush_pending_word()
            else:
                if not self._pending_word_chars:
                    self._pending_word_start_sec = start_sec
                self._pending_word_chars.append(ch)

    def _flush_pending_word(self) -> None:
        """Schedule emission of any word currently being accumulated."""
        if not self._pending_word_chars or self._pending_word_start_sec is None:
            self._pending_word_chars = []
            self._pending_word_start_sec = None
            return
        word = "".join(self._pending_word_chars)
        start_sec = self._pending_word_start_sec
        self._pending_word_chars = []
        self._pending_word_start_sec = None
        if start_sec <= self._last_scheduled_start_sec:
            return
        self._last_scheduled_start_sec = start_sec
        self._word_schedule_tasks.append(
            asyncio.create_task(self._schedule_word_emit(word, start_sec))
        )

    async def interrupt(self) -> None:
        """Stop emitting audio for the current synthesis. Keeps the WebSocket
        open so the next turn does not pay reconnect cost; tells ElevenLabs to
        stop generating via ``close_context`` (saves server compute), and drops
        any in-flight audio chunks client-side via context_id filtering in
        ``_recv_loop``."""
        self._should_stop = True

        for task in self._word_schedule_tasks:
            if not task.done():
                task.cancel()
        self._word_schedule_tasks = []

        if self.audio_track:
            self.audio_track.interrupt()

        await self.close_all_contexts()

        for fut in list(self._context_futures.values()):
            if not fut.done():
                fut.cancel()

    async def aclose(self) -> None:
        """Cleanup resources"""
        self._should_stop = True

        for task in [self._send_task, self._recv_task]:
            if task and not task.done():
                task.cancel()

        for stream in list(self._streams):
            try:
                await stream.aclose()
            except Exception:
                pass

        self._streams.clear()

        if self._ws_connection and not self._ws_connection.closed:
            try:
                await self._ws_connection.send_str(json.dumps({"close_socket": True}))
            except Exception:
                pass
            await self._ws_connection.close()
        if self._ws_session and not self._ws_session.closed:
            await self._ws_session.close()
        self._ws_connection = None
        self._ws_session = None
        if self._session:
            await self._session.aclose()
        await super().aclose()

    async def _ensure_connection(self, voice_id: str) -> None:
        async with self._connection_lock:
            if self._ws_connection and not self._ws_connection.closed and self._ws_voice_id == voice_id:
                return

            if self._ws_connection and not self._ws_connection.closed:
                try:
                    await self._ws_connection.send_str(json.dumps({"close_socket": True}))
                except Exception:
                    pass
                await self._ws_connection.close()
            if self._ws_session and not self._ws_session.closed:
                await self._ws_session.close()

            self._ws_session = aiohttp.ClientSession()
            self._ws_voice_id = voice_id

            ws_url = f"{self.base_url}/text-to-speech/{voice_id}/multi-stream-input".replace("https://", "wss://").replace("http://", "ws://")
            params = {
                "model_id": self.model,
                "output_format": self.response_format,
                "inactivity_timeout": self.inactivity_timeout,
                "language_code": self.language,
                "enable_ssml_parsing": str(self.enable_ssml_parsing).lower(),
                "apply_text_normalization": self.apply_text_normalization,
                "auto_mode": str(self.auto_mode).lower(),
            }
            param_string = "&".join([f"{k}={v}" for k, v in params.items()])
            full_ws_url = f"{ws_url}?{param_string}"
            headers = {"xi-api-key": self.api_key}
            self._ws_connection = await asyncio.wait_for(self._ws_session.ws_connect(full_ws_url, headers=headers), timeout=10.0)

            if self._recv_task and not self._recv_task.done():
                self._recv_task.cancel()
            self._recv_task = asyncio.create_task(self._recv_loop())

    def register_context(self, context_id: str, done_future: asyncio.Future[None]) -> None:
        self._context_futures[context_id] = done_future

    async def send_text(
        self,
        context_id: str,
        text: str,
        *,
        voice_settings: Optional[dict[str, Any]] = None,
        flush: bool = False,
    ) -> None:
        if not self._ws_connection or self._ws_connection.closed:
            raise RuntimeError("WebSocket connection is closed")

        if context_id not in self._active_contexts:
            init_msg = {
                "context_id": context_id,
                "text": " ",
            }
            if voice_settings:
                init_msg["voice_settings"] = voice_settings
            await self._ws_connection.send_str(json.dumps(init_msg))
            self._active_contexts.add(context_id)

        pkt: dict[str, Any] = {"context_id": context_id, "text": text}
        if flush:
            pkt["flush"] = True
        await self._ws_connection.send_str(json.dumps(pkt))

    async def flush_context(self, context_id: str) -> None:
        if not self._ws_connection or self._ws_connection.closed:
            return
        await self._ws_connection.send_str(json.dumps({"context_id": context_id, "flush": True}))

    async def close_context(self, context_id: str) -> None:
        if not self._ws_connection or self._ws_connection.closed:
            return
        await self._ws_connection.send_str(json.dumps({"context_id": context_id, "close_context": True}))

    async def close_all_contexts(self) -> None:
        try:
            for context_id in list(self._active_contexts):
                await self.close_context(context_id)
        except Exception:
            pass

    async def _recv_loop(self) -> None:
        try:
            while self._ws_connection and not self._ws_connection.closed:
                msg = await self._ws_connection.receive()
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(msg.data)

                    if data.get("error"):
                        ctx_id = data.get("contextId")
                        fut = self._context_futures.get(ctx_id)
                        if fut and not fut.done():
                            fut.set_exception(RuntimeError(data["error"]))
                        continue

                    ctx_id = data.get("contextId")
                    fut = self._context_futures.get(ctx_id) if ctx_id else None
                    ctx_alive = fut is not None and not fut.done()

                    if data.get("audio") and ctx_alive:
                        audio_chunk = base64.b64decode(data["audio"]) if isinstance(data["audio"], str) else None
                        if audio_chunk:
                            if not self._first_chunk_sent:
                                self._first_chunk_sent = True
                                self._audio_start_time = asyncio.get_event_loop().time()
                                if hasattr(self, '_first_audio_callback') and self._first_audio_callback:
                                    asyncio.create_task(self._first_audio_callback())
                            if self.audio_track:
                                await self.audio_track.add_new_bytes(audio_chunk)
                    if self.supports_word_timestamps and ctx_alive:
                        alignment = data.get("alignment") or data.get("normalizedAlignment")
                        if alignment:
                            self._schedule_words_from_alignment(alignment)

                    if data.get("is_final") or data.get("isFinal"):
                        if self.supports_word_timestamps:
                            self._flush_pending_word()
                        ctx_id = data.get("contextId")
                        if ctx_id:
                            fut = self._context_futures.pop(ctx_id, None)
                            self._active_contexts.discard(ctx_id)
                            if fut and not fut.done():
                                fut.set_result(None)

                elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING):
                    break
        except Exception:
            for fut in self._context_futures.values():
                if not fut.done():
                    fut.set_exception(RuntimeError("WebSocket receive loop error"))
            self._context_futures.clear()

Base class for Text-to-Speech implementations

Initialize the ElevenLabs TTS plugin.

Args

api_key : Optional[str], optional
ElevenLabs API key. Uses ELEVENLABS_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the TTS plugin. Defaults to "eleven_flash_v2_5".
voice : str
The voice to use for the TTS plugin. Defaults to "EXAVITQu4vr4xnSDxMaL".
speed : float
The speed to use for the TTS plugin. Defaults to 1.0.
response_format : str
The response format to use for the TTS plugin. Defaults to "pcm_24000".
voice_settings : Optional[VoiceSettings], optional
The voice settings to use for the TTS plugin. Defaults to None.
base_url : str
The base URL to use for the TTS plugin. Defaults to "https://api.elevenlabs.io/v1".
enable_streaming : bool
Whether to enable streaming for the TTS plugin. Defaults to True.
inactivity_timeout : int
The inactivity timeout to use for the TTS plugin. Defaults to 300.
language : str
The language to use for the TTS plugin. Defaults to "en".
enable_ssml_parsing(bool): Whether to enable SSML parsing.
apply_text_normalization:This parameter controls text normalization with three modes - "auto", "on", and "off".
When set to auto, the system will automatically decide whether to apply text normalization (e.g., spelling out numbers)
with "on", text normalization will always be applied, while with "off", it will be skipped.
For "eleven_turbo_v2_5" and "eleven_flash_v2_5" models, text normalization can only be enabled with Enterprise plans. Defaults to "auto".
auto_mode : bool
Reduces latency by disabling the server-side chunk_length_schedule buffer (default [120, 160, 250, 290] chars before first audio). Defaults to True because the upstream pipeline orchestrator already feeds TTS sentence-bounded chunks. Set to False only if you are streaming raw mid-sentence tokens directly and want ElevenLabs to buffer for higher prosody quality at the cost of TTFB.

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    self._should_stop = True

    for task in [self._send_task, self._recv_task]:
        if task and not task.done():
            task.cancel()

    for stream in list(self._streams):
        try:
            await stream.aclose()
        except Exception:
            pass

    self._streams.clear()

    if self._ws_connection and not self._ws_connection.closed:
        try:
            await self._ws_connection.send_str(json.dumps({"close_socket": True}))
        except Exception:
            pass
        await self._ws_connection.close()
    if self._ws_session and not self._ws_session.closed:
        await self._ws_session.close()
    self._ws_connection = None
    self._ws_session = None
    if self._session:
        await self._session.aclose()
    await super().aclose()

Cleanup resources

async def close_all_contexts(self) ‑> None
Expand source code
async def close_all_contexts(self) -> None:
    try:
        for context_id in list(self._active_contexts):
            await self.close_context(context_id)
    except Exception:
        pass
async def close_context(self, context_id: str) ‑> None
Expand source code
async def close_context(self, context_id: str) -> None:
    if not self._ws_connection or self._ws_connection.closed:
        return
    await self._ws_connection.send_str(json.dumps({"context_id": context_id, "close_context": True}))
async def flush_context(self, context_id: str) ‑> None
Expand source code
async def flush_context(self, context_id: str) -> None:
    if not self._ws_connection or self._ws_connection.closed:
        return
    await self._ws_connection.send_str(json.dumps({"context_id": context_id, "flush": True}))
async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Stop emitting audio for the current synthesis. Keeps the WebSocket
    open so the next turn does not pay reconnect cost; tells ElevenLabs to
    stop generating via ``close_context`` (saves server compute), and drops
    any in-flight audio chunks client-side via context_id filtering in
    ``_recv_loop``."""
    self._should_stop = True

    for task in self._word_schedule_tasks:
        if not task.done():
            task.cancel()
    self._word_schedule_tasks = []

    if self.audio_track:
        self.audio_track.interrupt()

    await self.close_all_contexts()

    for fut in list(self._context_futures.values()):
        if not fut.done():
            fut.cancel()

Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; tells ElevenLabs to stop generating via close_context (saves server compute), and drops any in-flight audio chunks client-side via context_id filtering in _recv_loop.

async def prewarm(self) ‑> None
Expand source code
async def prewarm(self) -> None:
    """Pre-establish the ElevenLabs WebSocket so the first ``synthesize()`` call
    does not pay the TLS + auth + upgrade cost. Safe to call repeatedly."""
    if not self.enable_streaming:
        return
    try:
        await self._ensure_connection(self.voice)
    except Exception as e:
        self.emit("error", f"ElevenLabs prewarm failed: {e}")

Pre-establish the ElevenLabs WebSocket so the first synthesize() call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly.

def register_context(self, context_id: str, done_future: asyncio.Future[None]) ‑> None
Expand source code
def register_context(self, context_id: str, done_future: asyncio.Future[None]) -> None:
    self._context_futures[context_id] = done_future
def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False
    self._audio_start_time = None
    self._spoken_words = []
    for task in self._word_schedule_tasks:
        if not task.done():
            task.cancel()
    self._word_schedule_tasks = []
    self._last_scheduled_start_sec = -1.0
    self._pending_word_chars = []
    self._pending_word_start_sec = None

Reset the first audio tracking state for next TTS task

async def send_text(self,
context_id: str,
text: str,
*,
voice_settings: Optional[dict[str, Any]] = None,
flush: bool = False) ‑> None
Expand source code
async def send_text(
    self,
    context_id: str,
    text: str,
    *,
    voice_settings: Optional[dict[str, Any]] = None,
    flush: bool = False,
) -> None:
    if not self._ws_connection or self._ws_connection.closed:
        raise RuntimeError("WebSocket connection is closed")

    if context_id not in self._active_contexts:
        init_msg = {
            "context_id": context_id,
            "text": " ",
        }
        if voice_settings:
            init_msg["voice_settings"] = voice_settings
        await self._ws_connection.send_str(json.dumps(init_msg))
        self._active_contexts.add(context_id)

    pkt: dict[str, Any] = {"context_id": context_id, "text": text}
    if flush:
        pkt["flush"] = True
    await self._ws_connection.send_str(json.dumps(pkt))
async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[str] | str,
    voice_id: Optional[str] = None,
    **kwargs: Any,
) -> None:
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        target_voice = voice_id or self.voice
        self._should_stop = False

        if self.enable_streaming:
            await self._stream_synthesis(text, target_voice)
        else:
            if isinstance(text, AsyncIterator):
                async for segment in segment_text(text):
                    if self._should_stop:
                        break
                    await self._chunked_synthesis(segment, target_voice)
            else:
                await self._chunked_synthesis(text, target_voice)

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")
        raise 

Convert text to speech

Args

text
Text to convert to speech. Either a plain string or an async iterator that may yield str chunks and FlushMarker segment-boundary markers. Plugins that don't support per-segment flushing should drop the markers with an inline isinstance check (or rely on segment_text which already drops them).
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None

class VoiceSettings (stability: float = 0.71,
similarity_boost: float = 0.5,
style: float = 0.0,
use_speaker_boost: bool = True)
Expand source code
@dataclass
class VoiceSettings:
    stability: float = 0.71
    similarity_boost: float = 0.5
    style: float = 0.0
    use_speaker_boost: bool = True

VoiceSettings(stability: 'float' = 0.71, similarity_boost: 'float' = 0.5, style: 'float' = 0.0, use_speaker_boost: 'bool' = True)

Instance variables

var similarity_boost : float
var stability : float
var style : float
var use_speaker_boost : bool