Package videosdk.plugins.rime

Sub-modules

videosdk.plugins.rime.tts

Classes

class RimeTTS (*,
api_key: str | None = None,
speaker: str = 'river',
model_id: str = 'mist',
lang: str = 'eng',
sampling_rate: int = 24000,
speed_alpha: float = 1.0,
reduce_latency: bool = True,
pause_between_brackets: bool = False,
phonemize_between_brackets: bool = False,
inline_speed_alpha: str | None = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
repetition_penalty: Optional[float] = None,
max_tokens: Optional[int] = None)
Expand source code
class RimeTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        speaker: str = DEFAULT_SPEAKER,
        model_id: str = DEFAULT_MODEL,
        lang: str = DEFAULT_LANGUAGE,
        sampling_rate: int = RIME_SAMPLE_RATE,
        speed_alpha: float = 1.0,
        reduce_latency: bool = True,
        pause_between_brackets: bool = False,
        phonemize_between_brackets: bool = False,
        inline_speed_alpha: str | None = None,
        temperature: Optional[float] = None,
        top_p: Optional[float] = None,
        repetition_penalty: Optional[float] = None,
        max_tokens: Optional[int] = None,
    ) -> None:
        """Initialize the Rime TTS plugin.

        Args:
            api_key (Optional[str], optional): Rime AI API key. Defaults to None.
            speaker (str): The speaker to use for the TTS plugin. Defaults to "river".
            model_id (str): The model ID to use for the TTS plugin. Defaults to "mist".
            lang (str): The language to use for the TTS plugin. Defaults to "eng".
            sampling_rate (int): The sampling rate to use for the TTS plugin. Defaults to 24000.
            speed_alpha (float): The speed alpha to use for the TTS plugin. Defaults to 1.0.
            reduce_latency (bool): Whether to reduce latency for the TTS plugin. Defaults to True.
            pause_between_brackets (bool): Whether to pause between brackets for the TTS plugin. Defaults to False.
            phonemize_between_brackets (bool): Whether to phonemize between brackets for the TTS plugin. Defaults to False.
            inline_speed_alpha (Optional[str], optional): The inline speed alpha to use for the TTS plugin. Defaults to None.
            temperature (Optional[float], optional): Sampling temperature (Arcana model only). Server applies its own
                default when omitted.
            top_p (Optional[float], optional): Nucleus sampling cutoff (Arcana model only).
            repetition_penalty (Optional[float], optional): Repetition penalty (Arcana model only).
            max_tokens (Optional[int], optional): Token limit for output (Arcana model only).
        """
        actual_sample_rate = sampling_rate
        super().__init__(sample_rate=actual_sample_rate, num_channels=RIME_CHANNELS)

        self.speaker = speaker
        self.model_id = model_id
        self.lang = lang
        self.sampling_rate = sampling_rate
        self.speed_alpha = speed_alpha
        self.reduce_latency = reduce_latency
        self.pause_between_brackets = pause_between_brackets
        self.phonemize_between_brackets = phonemize_between_brackets
        self.inline_speed_alpha = inline_speed_alpha
        self.temperature = temperature
        self.top_p = top_p
        self.repetition_penalty = repetition_penalty
        self.max_tokens = max_tokens
        self.audio_track = None
        self.loop = None
        self._first_chunk_sent = False
        self._interrupted = False

        self.api_key = api_key or os.getenv("RIME_API_KEY")

        if not self.api_key:
            raise ValueError(
                "Rime AI API key required. Provide either:\n"
                "1. api_key parameter, OR\n"
                "2. RIME_API_KEY environment variable"
            )

        if model_id in KNOWN_SPEAKERS and speaker not in KNOWN_SPEAKERS[model_id]:
            available = ", ".join(KNOWN_SPEAKERS[model_id])
            print(
                f" Warning: Speaker '{speaker}' may not be available for model '{model_id}'. "
                f"Known speakers: {available}"
            )

        # Read timeout adapts to model: Arcana long-form synthesis can take
        # >60s, while Mist family typically returns within ~5s.
        read_timeout = MODEL_READ_TIMEOUT_SEC.get(model_id, 30.0)
        self._http_client = httpx.AsyncClient(
            timeout=httpx.Timeout(
                connect=15.0, read=read_timeout, write=5.0, pool=5.0
            ),
            follow_redirects=True,
            limits=httpx.Limits(
                max_connections=50,
                max_keepalive_connections=50,
                keepalive_expiry=120,
            ),
        )

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self,
        text: AsyncIterator[Union[str, FlushMarker]] | str,
        **kwargs: Any,
    ) -> None:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or loop not initialized")
            return

        self._interrupted = False
        try:
            if isinstance(text, str):
                if not self._interrupted:
                    await self._synthesize_audio(text)
            else:
                async for segment in segment_text(text):
                    if self._interrupted:
                        break
                    await self._synthesize_audio(segment)

        except Exception as e:
            self.emit("error", f"Rime TTS synthesis failed: {str(e)}")

    async def _synthesize_audio(self, text: str) -> None:
        """Synthesize text to speech using Rime AI streaming API"""
        if not text or not text.strip() or self._interrupted:
            return
        if len(text) > 500:
            self.emit(
                "error", f"Text exceeds 500 character limit. Got {len(text)} characters.")
            return

        payload: dict[str, Any] = {
            "speaker": self.speaker,
            "text": text,
            "modelId": self.model_id,
            "lang": self.lang,
            "samplingRate": self.sampling_rate,
            "speedAlpha": self.speed_alpha,
            "reduceLatency": self.reduce_latency,
            "pauseBetweenBrackets": self.pause_between_brackets,
            "phonemizeBetweenBrackets": self.phonemize_between_brackets,
        }

        if self.inline_speed_alpha:
            payload["inlineSpeedAlpha"] = self.inline_speed_alpha
        # Arcana-only sampling controls are sent only when the caller set them
        # — letting Rime apply its model defaults on Mist requests, which would
        # otherwise reject the unknown fields.
        if self.temperature is not None:
            payload["temperature"] = self.temperature
        if self.top_p is not None:
            payload["top_p"] = self.top_p
        if self.repetition_penalty is not None:
            payload["repetition_penalty"] = self.repetition_penalty
        if self.max_tokens is not None:
            payload["max_tokens"] = self.max_tokens

        headers = {
            "Accept": "audio/pcm",
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }

        for attempt in range(2):
            try:
                async with self._http_client.stream(
                    "POST", RIME_TTS_ENDPOINT, headers=headers, json=payload
                ) as response:
                    response.raise_for_status()

                    async for chunk in response.aiter_bytes():
                        if self._interrupted:
                            return
                        if chunk:
                            await self._process_audio_chunk(chunk)
                return

            except (httpx.NetworkError, httpx.ConnectError, httpx.ReadTimeout) as e:
                if attempt == 0 and not self._interrupted:
                    await asyncio.sleep(0.25 * (2 ** attempt))
                    continue
                self.emit("error", f"Rime TTS network failure: {str(e)}")
                return
            except httpx.HTTPStatusError as e:
                if e.response.status_code == 401:
                    self.emit(
                        "error", "Rime TTS authentication failed. Please check your API key.")
                elif e.response.status_code == 400:
                    error_text = e.response.text
                    if "doesn't match list" in error_text:
                        available = ", ".join(
                            KNOWN_SPEAKERS.get(self.model_id, []))
                        self.emit("error", f"Speaker '{self.speaker}' not available for model '{self.model_id}'. "
                                  f"Try one of: {available}")
                    else:
                        self.emit("error", f"Rime TTS bad request: {error_text}")
                else:
                    self.emit(
                        "error", f"Rime TTS HTTP error: {e.response.status_code} - {e.response.text}")
                return
            except Exception as e:
                if not self._interrupted:
                    self.emit("error", f"Rime TTS request failed: {str(e)}")
                return

    async def _process_audio_chunk(self, audio_chunk: bytes) -> None:
        """Process individual audio chunks in real-time for minimal latency"""
        if not audio_chunk or self._interrupted:
            return

        processed_chunk = self._remove_wav_header(audio_chunk)

        if not processed_chunk:
            return

        if not self._first_chunk_sent and self._first_audio_callback:
            self._first_chunk_sent = True
            await self._first_audio_callback()

        if self.audio_track and self.loop:
            asyncio.create_task(
                self.audio_track.add_new_bytes(processed_chunk))

    def _remove_wav_header(self, audio_bytes: bytes) -> bytes:
        """Remove WAV header if present to get raw PCM data"""
        if audio_bytes.startswith(b"RIFF"):
            data_pos = audio_bytes.find(b"data")
            if data_pos != -1:
                return audio_bytes[data_pos + 8:]

        return audio_bytes

    async def aclose(self) -> None:
        """Cleanup HTTP client resources"""
        if self._http_client:
            await self._http_client.aclose()
        await super().aclose()

    async def interrupt(self) -> None:
        """Interrupt the TTS audio stream"""
        self._interrupted = True
        if self.audio_track:
            self.audio_track.interrupt()

Base class for Text-to-Speech implementations

Initialize the Rime TTS plugin.

Args

api_key : Optional[str], optional
Rime AI API key. Defaults to None.
speaker : str
The speaker to use for the TTS plugin. Defaults to "river".
model_id : str
The model ID to use for the TTS plugin. Defaults to "mist".
lang : str
The language to use for the TTS plugin. Defaults to "eng".
sampling_rate : int
The sampling rate to use for the TTS plugin. Defaults to 24000.
speed_alpha : float
The speed alpha to use for the TTS plugin. Defaults to 1.0.
reduce_latency : bool
Whether to reduce latency for the TTS plugin. Defaults to True.
pause_between_brackets : bool
Whether to pause between brackets for the TTS plugin. Defaults to False.
phonemize_between_brackets : bool
Whether to phonemize between brackets for the TTS plugin. Defaults to False.
inline_speed_alpha : Optional[str], optional
The inline speed alpha to use for the TTS plugin. Defaults to None.
temperature : Optional[float], optional
Sampling temperature (Arcana model only). Server applies its own default when omitted.
top_p : Optional[float], optional
Nucleus sampling cutoff (Arcana model only).
repetition_penalty : Optional[float], optional
Repetition penalty (Arcana model only).
max_tokens : Optional[int], optional
Token limit for output (Arcana model only).

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup HTTP client resources"""
    if self._http_client:
        await self._http_client.aclose()
    await super().aclose()

Cleanup HTTP client resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt the TTS audio stream"""
    self._interrupted = True
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt the TTS audio stream

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self, text: AsyncIterator[Union[str, FlushMarker]] | str, **kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[Union[str, FlushMarker]] | str,
    **kwargs: Any,
) -> None:
    if not self.audio_track or not self.loop:
        self.emit("error", "Audio track or loop not initialized")
        return

    self._interrupted = False
    try:
        if isinstance(text, str):
            if not self._interrupted:
                await self._synthesize_audio(text)
        else:
            async for segment in segment_text(text):
                if self._interrupted:
                    break
                await self._synthesize_audio(segment)

    except Exception as e:
        self.emit("error", f"Rime TTS synthesis failed: {str(e)}")

Convert text to speech

Args

text
Text to convert to speech. Either a plain string or an async iterator that may yield str chunks and FlushMarker segment-boundary markers. Plugins that don't support per-segment flushing should drop the markers with an inline isinstance check (or rely on segment_text which already drops them).
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None