Package videosdk.plugins.papla
Sub-modules
videosdk.plugins.papla.tts
Classes
class PaplaTTS (*,
api_key: str | None = None,
model_id: str = 'papla_p1',
base_url: str = 'https://api.papla.media/v1')-
Expand source code
class PaplaTTS(TTS): def __init__( self, *, api_key: str | None = None, model_id: str = DEFAULT_MODEL, base_url: str = API_BASE_URL, ) -> None: """Initialize the Papla TTS plugin. Args: api_key (Optional[str], optional): Papla API key. Defaults to None. model_id (str): The model ID to use for the TTS plugin. Defaults to "papla_p1". base_url (str): The base URL to use for the TTS plugin. Defaults to "https://api.papla.media/v1". """ super().__init__(sample_rate=PAPLA_SAMPLE_RATE, num_channels=PAPLA_CHANNELS) self.model_id = model_id self.audio_track = None self.loop = None self.base_url = base_url self._first_chunk_sent = False self._interrupted = False self.api_key = api_key or os.getenv("PAPLA_API_KEY") if not self.api_key: raise ValueError( "Papla API key must be provided either through the 'api_key' " "parameter or the 'PAPLA_API_KEY' environment variable." ) self._client = httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=30.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ) def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """ Convert text to speech using Papla's streaming TTS API. Decodes the received MP3 audio to raw PCM before streaming. ``FlushMarker`` segment-boundary markers are dropped — Papla's HTTP API has no per-segment flush primitive. """ if not self.audio_track or not self.loop: self.emit( "error", "Audio track or event loop not set by the framework.") return self._interrupted = False try: if isinstance(text, str): if not self._interrupted: await self._synthesize_segment(text, voice_id, **kwargs) else: async for segment in segment_text(text): if self._interrupted: break await self._synthesize_segment(segment, voice_id, **kwargs) except Exception as e: self.emit("error", f"Papla TTS synthesis failed: {str(e)}") async def _synthesize_segment(self, text: str, voice_id: Optional[str] = None, **kwargs: Any) -> None: """Synthesize a single text segment""" if not text.strip() or self._interrupted: return target_voice = voice_id or DEFAULT_VOICE_ID url = f"{self.base_url}/text-to-speech/{target_voice}/stream" headers = { "papla-api-key": self.api_key, "Content-Type": "application/json", } payload = { "text": text, "model_id": self.model_id, } for attempt in range(2): try: async with self._client.stream("POST", url, headers=headers, json=payload) as response: response.raise_for_status() mp3_data = b"" async for chunk in response.aiter_bytes(): if self._interrupted: return if chunk: mp3_data += chunk if mp3_data and not self._interrupted: asyncio.create_task(self._decode_and_stream_pcm(mp3_data)) return except (httpx.NetworkError, httpx.ConnectError, httpx.ReadTimeout) as e: if attempt == 0 and not self._interrupted: await asyncio.sleep(0.25 * (2 ** attempt)) continue self.emit("error", f"Papla TTS network failure: {str(e)}") return except httpx.HTTPStatusError as e: if not self._interrupted: self.emit( "error", f"Papla TTS HTTP error {e.response.status_code}: {e.response.text}", ) return async def _decode_and_stream_pcm(self, audio_bytes: bytes) -> None: """Decodes compressed audio (MP3) into raw PCM and streams it to the audio track.""" if self._interrupted: return try: audio = AudioSegment.from_file( io.BytesIO(audio_bytes), format=AUDIO_FORMAT) audio = audio.set_frame_rate(PAPLA_SAMPLE_RATE) audio = audio.set_channels(PAPLA_CHANNELS) audio = audio.set_sample_width(2) pcm_data = audio.raw_data chunk_size = int(PAPLA_SAMPLE_RATE * PAPLA_CHANNELS * 2 * 20 / 1000) for i in range(0, len(pcm_data), chunk_size): if self._interrupted: return chunk = pcm_data[i:i + chunk_size] if 0 < len(chunk) < chunk_size: padding = b"\x00" * (chunk_size - len(chunk)) chunk += padding if len(chunk) == chunk_size and self.audio_track: if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() asyncio.create_task(self.audio_track.add_new_bytes(chunk)) await asyncio.sleep(0.01) except Exception as e: self.emit( "error", f"Failed to decode or stream Papla audio: {str(e)}") async def aclose(self) -> None: if self._client and not self._client.is_closed: await self._client.aclose() await super().aclose() async def interrupt(self) -> None: self._interrupted = True if self.audio_track: self.audio_track.interrupt()Base class for Text-to-Speech implementations
Initialize the Papla TTS plugin.
Args
api_key:Optional[str], optional- Papla API key. Defaults to None.
model_id:str- The model ID to use for the TTS plugin. Defaults to "papla_p1".
base_url:str- The base URL to use for the TTS plugin. Defaults to "https://api.papla.media/v1".
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if self._client and not self._client.is_closed: await self._client.aclose() await super().aclose()Cleanup resources
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: self._interrupted = True if self.audio_track: self.audio_track.interrupt()Interrupt the TTS process
def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = FalseReset the first audio tracking state for next TTS task
async def synthesize(self,
text: AsyncIterator[Union[str, FlushMarker]] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """ Convert text to speech using Papla's streaming TTS API. Decodes the received MP3 audio to raw PCM before streaming. ``FlushMarker`` segment-boundary markers are dropped — Papla's HTTP API has no per-segment flush primitive. """ if not self.audio_track or not self.loop: self.emit( "error", "Audio track or event loop not set by the framework.") return self._interrupted = False try: if isinstance(text, str): if not self._interrupted: await self._synthesize_segment(text, voice_id, **kwargs) else: async for segment in segment_text(text): if self._interrupted: break await self._synthesize_segment(segment, voice_id, **kwargs) except Exception as e: self.emit("error", f"Papla TTS synthesis failed: {str(e)}")Convert text to speech using Papla's streaming TTS API. Decodes the received MP3 audio to raw PCM before streaming.
FlushMarkersegment-boundary markers are dropped — Papla's HTTP API has no per-segment flush primitive.