Package videosdk.plugins.groq
Sub-modules
videosdk.plugins.groq.tts
Classes
class GroqTTS (*,
api_key: str | None = None,
model: str = 'playai-tts',
voice: str = 'Fritz-PlayAI',
speed: float = 1.0,
response_format: "Literal['flac', 'mp3', 'mulaw', 'ogg', 'wav']" = 'wav',
sample_rate: int = 24000)-
Expand source code
class GroqTTS(TTS): def __init__( self, *, api_key: str | None = None, model: str = DEFAULT_MODEL, voice: str = DEFAULT_VOICE, speed: float = 1.0, response_format: Literal["flac", "mp3", "mulaw", "ogg", "wav"] = "wav", sample_rate: int = 24000, ) -> None: """Initialize the Groq TTS plugin. Args: api_key (Optional[str], optional): Groq API key. Defaults to None. model (str): The model to use for the TTS plugin. Defaults to "playai-tts". voice (str): The voice to use for the TTS plugin. Defaults to "Fritz-PlayAI". speed (float): The speed to use for the TTS plugin. Must be between 0.5 and 5.0. Defaults to 1.0. response_format (Literal["flac", "mp3", "mulaw", "ogg", "wav"]): The response format to use for the TTS plugin. Defaults to "wav". sample_rate (int): The sample rate to use for the TTS plugin. Must be one of: 8000, 16000, 22050, 24000, 32000, 44100, 48000. Defaults to 24000. """ if sample_rate not in SAMPLE_RATE_MAP: raise ValueError( f"Invalid sample rate: {sample_rate}. Must be one of: {list(SAMPLE_RATE_MAP.keys())}" ) if not 0.5 <= speed <= 5.0: raise ValueError(f"Speed must be between 0.5 and 5.0, got {speed}") super().__init__(sample_rate=sample_rate, num_channels=GROQ_TTS_CHANNELS) self.model = model self.voice = voice self.speed = speed self.audio_track = None self.loop = None self.response_format = response_format self._groq_sample_rate = sample_rate self._first_chunk_sent = False self.api_key = api_key or os.getenv("GROQ_API_KEY") if not self.api_key: raise ValueError( "Groq API key must be provided either through api_key parameter or GROQ_API_KEY environment variable" ) self._client = httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=30.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ) def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """ Convert text to speech using Groq's TTS API and stream to audio track Args: text: Text to convert to speech voice_id: Optional voice override **kwargs: Additional provider-specific arguments """ try: if isinstance(text, AsyncIterator): async for segment in segment_text(text): await self._synthesize_audio(segment, voice_id) else: await self._synthesize_audio(text, voice_id) if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") async def _synthesize_audio( self, text: str, voice_id: Optional[str] = None ) -> None: """Call Groq API to synthesize audio""" try: headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } payload = { "model": self.model, "input": text, "voice": voice_id or self.voice, "response_format": self.response_format, "sample_rate": self._groq_sample_rate, "speed": self.speed, } async with self._client.stream( "POST", GROQ_TTS_ENDPOINT, headers=headers, json=payload, ) as response: response.raise_for_status() if self.response_format == "wav": audio_data = b"" async for chunk in response.aiter_bytes(): audio_data += chunk pcm_data = self._extract_pcm_from_wav(audio_data) await self._stream_audio_chunks(pcm_data) else: self.emit( "error", f"Format {self.response_format} requires decoding, which is not implemented yet", ) except httpx.HTTPStatusError as e: if e.response.status_code == 401: self.emit( "error", "Groq API authentication failed. Please check your API key.", ) elif e.response.status_code == 400: try: error_data = e.response.json() error_msg = error_data.get("error", {}).get( "message", "Bad request" ) self.emit("error", f"Groq TTS request error: {error_msg}") except: self.emit( "error", f"Groq TTS bad request: {e.response.text}") elif e.response.status_code == 429: self.emit( "error", "Groq TTS rate limit exceeded. Please try again later." ) else: self.emit( "error", f"Groq TTS HTTP error {e.response.status_code}: {e.response.text}", ) except Exception as e: self.emit("error", f"Groq TTS API call failed: {str(e)}") async def _stream_audio_chunks(self, audio_bytes: bytes) -> None: """Stream audio data in chunks at 24kHz""" chunk_size = int(24000 * GROQ_TTS_CHANNELS * 2 * 20 / 1000) for i in range(0, len(audio_bytes), chunk_size): chunk = audio_bytes[i: i + chunk_size] if len(chunk) < chunk_size and len(chunk) > 0: padding_needed = chunk_size - len(chunk) chunk += b"\x00" * padding_needed if chunk: if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() asyncio.create_task(self.audio_track.add_new_bytes(chunk)) await asyncio.sleep(0.001) def _extract_pcm_from_wav(self, wav_data: bytes) -> bytes: """Extract PCM data from WAV file format""" if len(wav_data) < 44: return wav_data if wav_data[:4] != b"RIFF": return wav_data data_pos = wav_data.find(b"data") if data_pos == -1: return wav_data return wav_data[data_pos + 8:] async def aclose(self) -> None: """Cleanup resources""" await self._client.aclose() await super().aclose() async def interrupt(self) -> None: """Interrupt the TTS process""" if self.audio_track: self.audio_track.interrupt()
Base class for Text-to-Speech implementations
Initialize the Groq TTS plugin.
Args
api_key
:Optional[str]
, optional- Groq API key. Defaults to None.
model
:str
- The model to use for the TTS plugin. Defaults to "playai-tts".
voice
:str
- The voice to use for the TTS plugin. Defaults to "Fritz-PlayAI".
speed
:float
- The speed to use for the TTS plugin. Must be between 0.5 and 5.0. Defaults to 1.0.
- response_format (Literal["flac", "mp3", "mulaw", "ogg", "wav"]): The response format to use for the TTS plugin. Defaults to "wav".
sample_rate
:int
- The sample rate to use for the TTS plugin. Must be one of: 8000, 16000, 22050, 24000, 32000, 44100, 48000. Defaults to 24000.
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" await self._client.aclose() await super().aclose()
Cleanup resources
async def interrupt(self) ‑> None
-
Expand source code
async def interrupt(self) -> None: """Interrupt the TTS process""" if self.audio_track: self.audio_track.interrupt()
Interrupt the TTS process
def reset_first_audio_tracking(self) ‑> None
-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False
Reset the first audio tracking state for next TTS task
async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """ Convert text to speech using Groq's TTS API and stream to audio track Args: text: Text to convert to speech voice_id: Optional voice override **kwargs: Additional provider-specific arguments """ try: if isinstance(text, AsyncIterator): async for segment in segment_text(text): await self._synthesize_audio(segment, voice_id) else: await self._synthesize_audio(text, voice_id) if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}")
Convert text to speech using Groq's TTS API and stream to audio track
Args
text
- Text to convert to speech
voice_id
- Optional voice override
**kwargs
- Additional provider-specific arguments