Module videosdk.plugins.resemble.tts

Classes

class ResembleTTS (*,
api_key: str | None = None,
voice_uuid: str = '55592656',
sample_rate: int = 22050,
precision: str = 'PCM_16',
model: Optional[str] = None)
Expand source code
class ResembleTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        voice_uuid: str = DEFAULT_VOICE_UUID,
        sample_rate: int = DEFAULT_SAMPLE_RATE,
        precision: str = DEFAULT_PRECISION,
        model: Optional[str] = None,
    ) -> None:
        """Initialize the Resemble TTS plugin.

        Args:
            api_key (Optional[str], optional): Resemble API key. Defaults to None.
            voice_uuid (str): The voice UUID to use for the TTS plugin. Defaults to "55592656".
            sample_rate (int): The sample rate to use for the TTS plugin. Defaults to 22050.
            precision (str): The precision to use for the TTS plugin. Defaults to "PCM_16".
            model (Optional[str], optional): Optional Resemble model name to override the
                account default (e.g. ``"chatterbox"``, ``"chatterbox-turbo"``). When
                ``None``, the server applies the account-level default model.
        """
        super().__init__(sample_rate=sample_rate, num_channels=1)

        self.api_key = api_key or os.getenv("RESEMBLE_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Resemble API key is required. Provide either `api_key` or set `RESEMBLE_API_KEY` environment variable.")

        self.voice_uuid = voice_uuid
        self.precision = precision
        self.model = model

        self.audio_track = None
        self.loop = None
        self._first_chunk_sent = False
        self._interrupted = False
        self._current_synthesis_task: asyncio.Task | None = None
        self._http_client = httpx.AsyncClient(
            timeout=httpx.Timeout(connect=15.0, read=30.0,
                                  write=5.0, pool=5.0),
            follow_redirects=True,
            limits=httpx.Limits(
                max_connections=50,
                max_keepalive_connections=50,
                keepalive_expiry=120,
            ),
        )

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self,
        text: AsyncIterator[Union[str, FlushMarker]] | str,
        **kwargs: Any,
    ) -> None:
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            self._interrupted = False

            if isinstance(text, str):
                if not self._interrupted:
                    await self._synthesize_segment(text, **kwargs)
            else:
                async for segment in segment_text(text):
                    if self._interrupted:
                        break
                    await self._synthesize_segment(segment, **kwargs)

        except Exception as e:
            self.emit("error", f"Resemble TTS synthesis failed: {str(e)}")

    async def _synthesize_segment(self, text: str, **kwargs: Any) -> None:
        """Synthesize a single text segment"""
        if not text.strip() or self._interrupted:
            return

        try:
            await self._http_stream_synthesis(text)
        except Exception as e:
            if not self._interrupted:
                self.emit("error", f"Segment synthesis failed: {str(e)}")

    async def _http_stream_synthesis(self, text: str) -> None:
        headers = {
            "Authorization": f"Token {self.api_key}",
            "Content-Type": "application/json",
        }

        payload: dict[str, Any] = {
            "voice_uuid": self.voice_uuid,
            "data": text,
            "precision": self.precision,
            "sample_rate": self.sample_rate,
        }
        if self.model:
            payload["model"] = self.model

        for attempt in range(2):
            try:
                async with self._http_client.stream(
                    "POST",
                    RESEMBLE_HTTP_STREAMING_URL,
                    headers=headers,
                    json=payload
                ) as response:
                    if response.status_code >= 400:
                        # In streaming mode, body must be read before
                        # .text/.json() are usable in the error handler.
                        await response.aread()
                    response.raise_for_status()

                    audio_data = b""
                    header_processed = False

                    async for chunk in response.aiter_bytes():
                        if self._interrupted:
                            break
                        if not header_processed:
                            audio_data += chunk
                            data_pos = audio_data.find(b"data")
                            if data_pos != -1:
                                header_size = data_pos + 8
                                audio_data = audio_data[header_size:]
                                header_processed = True
                        else:
                            if chunk:
                                audio_data += chunk

                    if audio_data and not self._interrupted:
                        await self._stream_audio_chunks(audio_data)
                return

            except (httpx.NetworkError, httpx.ConnectError, httpx.ReadTimeout) as e:
                if attempt == 0 and not self._interrupted:
                    await asyncio.sleep(0.25 * (2 ** attempt))
                    continue
                if not self._interrupted:
                    self.emit("error", f"Resemble TTS network failure: {str(e)}")
                return
            except httpx.HTTPStatusError as e:
                if not self._interrupted:
                    self.emit(
                        "error", f"HTTP error {e.response.status_code}: {e.response.text}")
                return
            except Exception as e:
                if not self._interrupted:
                    self.emit(
                        "error", f"HTTP streaming synthesis failed: {str(e)}")
                return

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        """Stream audio data in chunks for smooth playback """
        chunk_size = int(self.sample_rate * 1 * 2 * 20 / 1000)

        for i in range(0, len(audio_bytes), chunk_size):
            if self._interrupted:
                break

            chunk = audio_bytes[i:i + chunk_size]

            if len(chunk) < chunk_size and len(chunk) > 0:
                padding_needed = chunk_size - len(chunk)
                chunk += b'\x00' * padding_needed

            if len(chunk) == chunk_size:
                if not self._first_chunk_sent and self._first_audio_callback:
                    self._first_chunk_sent = True
                    await self._first_audio_callback()

                asyncio.create_task(self.audio_track.add_new_bytes(chunk))
                await asyncio.sleep(0.001)

    async def aclose(self) -> None:
        if self._http_client:
            await self._http_client.aclose()
        await super().aclose()

    async def interrupt(self) -> None:
        """Interrupt TTS synthesis"""
        self._interrupted = True
        if self._current_synthesis_task and not self._current_synthesis_task.done():
            self._current_synthesis_task.cancel()
        if self.audio_track:
            self.audio_track.interrupt()

Base class for Text-to-Speech implementations

Initialize the Resemble TTS plugin.

Args

api_key : Optional[str], optional
Resemble API key. Defaults to None.
voice_uuid : str
The voice UUID to use for the TTS plugin. Defaults to "55592656".
sample_rate : int
The sample rate to use for the TTS plugin. Defaults to 22050.
precision : str
The precision to use for the TTS plugin. Defaults to "PCM_16".
model : Optional[str], optional
Optional Resemble model name to override the account default (e.g. "chatterbox", "chatterbox-turbo"). When None, the server applies the account-level default model.

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._http_client:
        await self._http_client.aclose()
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt TTS synthesis"""
    self._interrupted = True
    if self._current_synthesis_task and not self._current_synthesis_task.done():
        self._current_synthesis_task.cancel()
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt TTS synthesis

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self, text: AsyncIterator[Union[str, FlushMarker]] | str, **kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[Union[str, FlushMarker]] | str,
    **kwargs: Any,
) -> None:
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        self._interrupted = False

        if isinstance(text, str):
            if not self._interrupted:
                await self._synthesize_segment(text, **kwargs)
        else:
            async for segment in segment_text(text):
                if self._interrupted:
                    break
                await self._synthesize_segment(segment, **kwargs)

    except Exception as e:
        self.emit("error", f"Resemble TTS synthesis failed: {str(e)}")

Convert text to speech

Args

text
Text to convert to speech. Either a plain string or an async iterator that may yield str chunks and FlushMarker segment-boundary markers. Plugins that don't support per-segment flushing should drop the markers with an inline isinstance check (or rely on segment_text which already drops them).
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None