Package videosdk.plugins.groq

Sub-modules

videosdk.plugins.groq.tts

Classes

class GroqTTS (*,
api_key: str | None = None,
model: str = 'canopylabs/orpheus-v1-english',
voice: str = 'hannah',
speed: float = 1.0,
response_format: "Literal['flac', 'mp3', 'mulaw', 'ogg', 'wav']" = 'wav',
sample_rate: int = 24000)
Expand source code
class GroqTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = DEFAULT_MODEL,
        voice: str = DEFAULT_VOICE,
        speed: float = 1.0,
        response_format: Literal["flac", "mp3", "mulaw", "ogg", "wav"] = "wav",
        sample_rate: int = 24000,
    ) -> None:
        """Initialize the Groq TTS plugin.

        Args:
            api_key (Optional[str], optional): Groq API key. Defaults to None.
            model (str): The model to use for the TTS plugin. Defaults to
                ``canopylabs/orpheus-v1-english``. Use ``canopylabs/orpheus-arabic-saudi``
                for Arabic. (``playai-tts`` was decommissioned by Groq.)
            voice (str): The voice to use for the TTS plugin. Defaults to ``hannah``.
                Other Orpheus voices include ``troy``, ``austin``. See
                https://console.groq.com/docs/text-to-speech/orpheus for the full list.
            speed (float): The speed to use for the TTS plugin. Must be between 0.5 and 5.0. Defaults to 1.0.
            response_format (Literal["flac", "mp3", "mulaw", "ogg", "wav"]): The response format to use for the TTS plugin. Defaults to "wav".
            sample_rate (int): The sample rate to use for the TTS plugin. Must be one of: 8000, 16000, 22050, 24000, 32000, 44100, 48000. Defaults to 24000.
        """
        if sample_rate not in SAMPLE_RATE_MAP:
            raise ValueError(
                f"Invalid sample rate: {sample_rate}. Must be one of: {list(SAMPLE_RATE_MAP.keys())}"
            )

        if not 0.5 <= speed <= 5.0:
            raise ValueError(f"Speed must be between 0.5 and 5.0, got {speed}")

        super().__init__(sample_rate=sample_rate, num_channels=GROQ_TTS_CHANNELS)

        self.model = model
        self.voice = voice
        self.speed = speed
        self.audio_track = None
        self.loop = None
        self.response_format = response_format
        self._groq_sample_rate = sample_rate
        self._first_chunk_sent = False
        self._interrupted = False

        self.api_key = api_key or os.getenv("GROQ_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Groq API key must be provided either through api_key parameter or GROQ_API_KEY environment variable"
            )

        self._client = httpx.AsyncClient(
            timeout=httpx.Timeout(connect=15.0, read=30.0,
                                  write=5.0, pool=5.0),
            follow_redirects=True,
            limits=httpx.Limits(
                max_connections=50,
                max_keepalive_connections=50,
                keepalive_expiry=120,
            ),
        )

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self,
        text: AsyncIterator[Union[str, FlushMarker]] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        """
        Convert text to speech using Groq's TTS API and stream to audio track

        Args:
            text: Text to convert to speech, or an async iterator of text chunks.
                ``FlushMarker`` segment boundaries are silently dropped — Groq's
                HTTP API has no per-segment flush primitive.
            voice_id: Optional voice override
            **kwargs: Additional provider-specific arguments
        """
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        self._interrupted = False
        try:
            if isinstance(text, str):
                if not self._interrupted:
                    await self._synthesize_audio(text, voice_id)
            else:
                async for segment in segment_text(text):
                    if self._interrupted:
                        break
                    await self._synthesize_audio(segment, voice_id)

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")

    async def _synthesize_audio(
        self, text: str, voice_id: Optional[str] = None
    ) -> None:
        """Call Groq API to synthesize audio"""
        if not text or not text.strip() or self._interrupted:
            return
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }

        payload = {
            "model": self.model,
            "input": text,
            "voice": voice_id or self.voice,
            "response_format": self.response_format,
            "sample_rate": self._groq_sample_rate,
            "speed": self.speed,
        }

        for attempt in range(2):
            try:
                async with self._client.stream(
                    "POST",
                    GROQ_TTS_ENDPOINT,
                    headers=headers,
                    json=payload,
                ) as response:
                    if response.status_code >= 400:
                        # Body must be read before .text/.json() are usable
                        # on a streaming response.
                        await response.aread()
                    response.raise_for_status()

                    if self.response_format == "wav":
                        audio_data = b""
                        async for chunk in response.aiter_bytes():
                            if self._interrupted:
                                break
                            audio_data += chunk

                        if not self._interrupted:
                            pcm_data = self._extract_pcm_from_wav(audio_data)
                            await self._stream_audio_chunks(pcm_data)
                    else:
                        self.emit(
                            "error",
                            f"Format {self.response_format} requires decoding, which is not implemented yet",
                        )
                return

            except (httpx.NetworkError, httpx.ConnectError, httpx.ReadTimeout) as e:
                if attempt == 0 and not self._interrupted:
                    await asyncio.sleep(0.25 * (2 ** attempt))
                    continue
                self.emit("error", f"Groq TTS network failure: {str(e)}")
                return
            except httpx.HTTPStatusError as e:
                if e.response.status_code == 401:
                    self.emit(
                        "error",
                        "Groq API authentication failed. Please check your API key.",
                    )
                elif e.response.status_code == 400:
                    try:
                        error_data = e.response.json()
                        error_msg = error_data.get("error", {}).get(
                            "message", "Bad request"
                        )
                        self.emit("error", f"Groq TTS request error: {error_msg}")
                    except Exception:
                        self.emit(
                            "error", f"Groq TTS bad request: {e.response.text}")
                elif e.response.status_code == 429:
                    self.emit(
                        "error", "Groq TTS rate limit exceeded. Please try again later."
                    )
                else:
                    self.emit(
                        "error",
                        f"Groq TTS HTTP error {e.response.status_code}: {e.response.text}",
                    )
                return
            except Exception as e:
                if not self._interrupted:
                    self.emit("error", f"Groq TTS API call failed: {str(e)}")
                return

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        """Stream audio data in chunks at 24kHz"""
        chunk_size = int(24000 * GROQ_TTS_CHANNELS * 2 * 20 / 1000)

        for i in range(0, len(audio_bytes), chunk_size):
            if self._interrupted:
                return
            chunk = audio_bytes[i: i + chunk_size]

            if len(chunk) < chunk_size and len(chunk) > 0:
                padding_needed = chunk_size - len(chunk)
                chunk += b"\x00" * padding_needed

            if chunk:
                if not self._first_chunk_sent and self._first_audio_callback:
                    self._first_chunk_sent = True
                    await self._first_audio_callback()

                asyncio.create_task(self.audio_track.add_new_bytes(chunk))
                await asyncio.sleep(0.001)

    def _extract_pcm_from_wav(self, wav_data: bytes) -> bytes:
        """Extract PCM data from WAV file format"""
        if len(wav_data) < 44:
            return wav_data

        if wav_data[:4] != b"RIFF":
            return wav_data

        data_pos = wav_data.find(b"data")
        if data_pos == -1:
            return wav_data

        return wav_data[data_pos + 8:]

    async def aclose(self) -> None:
        """Cleanup resources"""
        await self._client.aclose()
        await super().aclose()

    async def interrupt(self) -> None:
        """Interrupt the TTS process"""
        self._interrupted = True
        if self.audio_track:
            self.audio_track.interrupt()

Base class for Text-to-Speech implementations

Initialize the Groq TTS plugin.

Args

api_key : Optional[str], optional
Groq API key. Defaults to None.
model : str
The model to use for the TTS plugin. Defaults to canopylabs/orpheus-v1-english. Use canopylabs/orpheus-arabic-saudi for Arabic. (playai-tts was decommissioned by Groq.)
voice : str
The voice to use for the TTS plugin. Defaults to hannah. Other Orpheus voices include troy, austin. See https://console.groq.com/docs/text-to-speech/orpheus for the full list.
speed : float
The speed to use for the TTS plugin. Must be between 0.5 and 5.0. Defaults to 1.0.
response_format (Literal["flac", "mp3", "mulaw", "ogg", "wav"]): The response format to use for the TTS plugin. Defaults to "wav".
sample_rate : int
The sample rate to use for the TTS plugin. Must be one of: 8000, 16000, 22050, 24000, 32000, 44100, 48000. Defaults to 24000.

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    await self._client.aclose()
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt the TTS process"""
    self._interrupted = True
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt the TTS process

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self,
text: AsyncIterator[Union[str, FlushMarker]] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[Union[str, FlushMarker]] | str,
    voice_id: Optional[str] = None,
    **kwargs: Any,
) -> None:
    """
    Convert text to speech using Groq's TTS API and stream to audio track

    Args:
        text: Text to convert to speech, or an async iterator of text chunks.
            ``FlushMarker`` segment boundaries are silently dropped — Groq's
            HTTP API has no per-segment flush primitive.
        voice_id: Optional voice override
        **kwargs: Additional provider-specific arguments
    """
    if not self.audio_track or not self.loop:
        self.emit("error", "Audio track or event loop not set")
        return

    self._interrupted = False
    try:
        if isinstance(text, str):
            if not self._interrupted:
                await self._synthesize_audio(text, voice_id)
        else:
            async for segment in segment_text(text):
                if self._interrupted:
                    break
                await self._synthesize_audio(segment, voice_id)

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")

Convert text to speech using Groq's TTS API and stream to audio track

Args

text
Text to convert to speech, or an async iterator of text chunks. FlushMarker segment boundaries are silently dropped — Groq's HTTP API has no per-segment flush primitive.
voice_id
Optional voice override
**kwargs
Additional provider-specific arguments