Package videosdk.plugins.papla

Sub-modules

videosdk.plugins.papla.tts

Classes

class PaplaTTS (*,
api_key: str | None = None,
model_id: str = 'papla_p1',
base_url: str = 'https://api.papla.media/v1')
Expand source code
class PaplaTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model_id: str = DEFAULT_MODEL,
        base_url: str = API_BASE_URL,
    ) -> None:
        """Initialize the Papla TTS plugin.

        Args:
            api_key (Optional[str], optional): Papla API key. Defaults to None.
            model_id (str): The model ID to use for the TTS plugin. Defaults to "papla_p1".
            base_url (str): The base URL to use for the TTS plugin. Defaults to "https://api.papla.media/v1".
        """
        super().__init__(sample_rate=PAPLA_SAMPLE_RATE, num_channels=PAPLA_CHANNELS)

        self.model_id = model_id
        self.audio_track = None
        self.loop = None
        self.base_url = base_url
        self._first_chunk_sent = False
        self._interrupted = False

        self.api_key = api_key or os.getenv("PAPLA_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Papla API key must be provided either through the 'api_key' "
                "parameter or the 'PAPLA_API_KEY' environment variable."
            )

        self._client = httpx.AsyncClient(
            timeout=httpx.Timeout(connect=15.0, read=30.0,
                                  write=5.0, pool=5.0),
            follow_redirects=True,
            limits=httpx.Limits(
                max_connections=50,
                max_keepalive_connections=50,
                keepalive_expiry=120,
            ),
        )

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self,
        text: AsyncIterator[Union[str, FlushMarker]] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        """
        Convert text to speech using Papla's streaming TTS API.
        Decodes the received MP3 audio to raw PCM before streaming.
        ``FlushMarker`` segment-boundary markers are dropped — Papla's HTTP API
        has no per-segment flush primitive.
        """
        if not self.audio_track or not self.loop:
            self.emit(
                "error", "Audio track or event loop not set by the framework.")
            return

        self._interrupted = False
        try:
            if isinstance(text, str):
                if not self._interrupted:
                    await self._synthesize_segment(text, voice_id, **kwargs)
            else:
                async for segment in segment_text(text):
                    if self._interrupted:
                        break
                    await self._synthesize_segment(segment, voice_id, **kwargs)

        except Exception as e:
            self.emit("error", f"Papla TTS synthesis failed: {str(e)}")

    async def _synthesize_segment(self, text: str, voice_id: Optional[str] = None, **kwargs: Any) -> None:
        """Synthesize a single text segment"""
        if not text.strip() or self._interrupted:
            return

        target_voice = voice_id or DEFAULT_VOICE_ID
        url = f"{self.base_url}/text-to-speech/{target_voice}/stream"

        headers = {
            "papla-api-key": self.api_key,
            "Content-Type": "application/json",
        }

        payload = {
            "text": text,
            "model_id": self.model_id,
        }

        for attempt in range(2):
            try:
                async with self._client.stream("POST", url, headers=headers, json=payload) as response:
                    response.raise_for_status()

                    mp3_data = b""
                    async for chunk in response.aiter_bytes():
                        if self._interrupted:
                            return
                        if chunk:
                            mp3_data += chunk

                    if mp3_data and not self._interrupted:
                        asyncio.create_task(self._decode_and_stream_pcm(mp3_data))
                return

            except (httpx.NetworkError, httpx.ConnectError, httpx.ReadTimeout) as e:
                if attempt == 0 and not self._interrupted:
                    await asyncio.sleep(0.25 * (2 ** attempt))
                    continue
                self.emit("error", f"Papla TTS network failure: {str(e)}")
                return
            except httpx.HTTPStatusError as e:
                if not self._interrupted:
                    self.emit(
                        "error",
                        f"Papla TTS HTTP error {e.response.status_code}: {e.response.text}",
                    )
                return

    async def _decode_and_stream_pcm(self, audio_bytes: bytes) -> None:
        """Decodes compressed audio (MP3) into raw PCM and streams it to the audio track."""
        if self._interrupted:
            return
        try:
            audio = AudioSegment.from_file(
                io.BytesIO(audio_bytes), format=AUDIO_FORMAT)

            audio = audio.set_frame_rate(PAPLA_SAMPLE_RATE)
            audio = audio.set_channels(PAPLA_CHANNELS)
            audio = audio.set_sample_width(2)

            pcm_data = audio.raw_data

            chunk_size = int(PAPLA_SAMPLE_RATE *
                             PAPLA_CHANNELS * 2 * 20 / 1000)

            for i in range(0, len(pcm_data), chunk_size):
                if self._interrupted:
                    return
                chunk = pcm_data[i:i + chunk_size]

                if 0 < len(chunk) < chunk_size:
                    padding = b"\x00" * (chunk_size - len(chunk))
                    chunk += padding

                if len(chunk) == chunk_size and self.audio_track:
                    if not self._first_chunk_sent and self._first_audio_callback:
                        self._first_chunk_sent = True
                        await self._first_audio_callback()

                    asyncio.create_task(self.audio_track.add_new_bytes(chunk))
                    await asyncio.sleep(0.01)

        except Exception as e:
            self.emit(
                "error", f"Failed to decode or stream Papla audio: {str(e)}")

    async def aclose(self) -> None:
        if self._client and not self._client.is_closed:
            await self._client.aclose()
        await super().aclose()

    async def interrupt(self) -> None:
        self._interrupted = True
        if self.audio_track:
            self.audio_track.interrupt()

Base class for Text-to-Speech implementations

Initialize the Papla TTS plugin.

Args

api_key : Optional[str], optional
Papla API key. Defaults to None.
model_id : str
The model ID to use for the TTS plugin. Defaults to "papla_p1".
base_url : str
The base URL to use for the TTS plugin. Defaults to "https://api.papla.media/v1".

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._client and not self._client.is_closed:
        await self._client.aclose()
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    self._interrupted = True
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt the TTS process

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self,
text: AsyncIterator[Union[str, FlushMarker]] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[Union[str, FlushMarker]] | str,
    voice_id: Optional[str] = None,
    **kwargs: Any,
) -> None:
    """
    Convert text to speech using Papla's streaming TTS API.
    Decodes the received MP3 audio to raw PCM before streaming.
    ``FlushMarker`` segment-boundary markers are dropped — Papla's HTTP API
    has no per-segment flush primitive.
    """
    if not self.audio_track or not self.loop:
        self.emit(
            "error", "Audio track or event loop not set by the framework.")
        return

    self._interrupted = False
    try:
        if isinstance(text, str):
            if not self._interrupted:
                await self._synthesize_segment(text, voice_id, **kwargs)
        else:
            async for segment in segment_text(text):
                if self._interrupted:
                    break
                await self._synthesize_segment(segment, voice_id, **kwargs)

    except Exception as e:
        self.emit("error", f"Papla TTS synthesis failed: {str(e)}")

Convert text to speech using Papla's streaming TTS API. Decodes the received MP3 audio to raw PCM before streaming. FlushMarker segment-boundary markers are dropped — Papla's HTTP API has no per-segment flush primitive.