Package videosdk.plugins.papla

Sub-modules

videosdk.plugins.papla.tts

Classes

class PaplaTTS (*,
api_key: str | None = None,
model_id: str = 'papla_p1',
base_url: str = 'https://api.papla.media/v1')
Expand source code
class PaplaTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model_id: str = DEFAULT_MODEL,
        base_url: str = API_BASE_URL,
    ) -> None:
        """Initialize the Papla TTS plugin.

        Args:
            api_key (Optional[str], optional): Papla API key. Defaults to None.
            model_id (str): The model ID to use for the TTS plugin. Defaults to "papla_p1".
            base_url (str): The base URL to use for the TTS plugin. Defaults to "https://api.papla.media/v1".
        """
        super().__init__(sample_rate=PAPLA_SAMPLE_RATE, num_channels=PAPLA_CHANNELS)

        self.model_id = model_id
        self.audio_track = None
        self.loop = None
        self.base_url = base_url
        self._first_chunk_sent = False

        self.api_key = api_key or os.getenv("PAPLA_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Papla API key must be provided either through the 'api_key' "
                "parameter or the 'PAPLA_API_KEY' environment variable."
            )

        self._client = httpx.AsyncClient(
            timeout=httpx.Timeout(connect=15.0, read=30.0,
                                  write=5.0, pool=5.0),
            follow_redirects=True,
        )

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        """
        Convert text to speech using Papla's streaming TTS API.
        This now includes decoding the received MP3 audio to raw PCM.
        """
        try:
            if not self.audio_track or not self.loop:
                self.emit(
                    "error", "Audio track or event loop not set by the framework.")
                return

            if isinstance(text, AsyncIterator):
                async for segment in segment_text(text):
                    await self._synthesize_segment(segment, voice_id, **kwargs)
            else:
                await self._synthesize_segment(text, voice_id, **kwargs)

        except Exception as e:
            self.emit("error", f"Papla TTS synthesis failed: {str(e)}")

    async def _synthesize_segment(self, text: str, voice_id: Optional[str] = None, **kwargs: Any) -> None:
        """Synthesize a single text segment"""
        if not text.strip():
            return

        target_voice = voice_id or DEFAULT_VOICE_ID
        url = f"{self.base_url}/text-to-speech/{target_voice}/stream"

        headers = {
            "papla-api-key": self.api_key,
            "Content-Type": "application/json",
        }

        payload = {
            "text": text,
            "model_id": self.model_id,
        }

        async with self._client.stream("POST", url, headers=headers, json=payload) as response:
            response.raise_for_status()

            mp3_data = b""
            async for chunk in response.aiter_bytes():
                if chunk:
                    mp3_data += chunk

            if mp3_data:
                asyncio.create_task(self._decode_and_stream_pcm(mp3_data))

    async def _decode_and_stream_pcm(self, audio_bytes: bytes) -> None:
        """Decodes compressed audio (MP3) into raw PCM and streams it to the audio track."""
        try:
            audio = AudioSegment.from_file(
                io.BytesIO(audio_bytes), format=AUDIO_FORMAT)

            audio = audio.set_frame_rate(PAPLA_SAMPLE_RATE)
            audio = audio.set_channels(PAPLA_CHANNELS)
            audio = audio.set_sample_width(2)

            pcm_data = audio.raw_data

            chunk_size = int(PAPLA_SAMPLE_RATE *
                             PAPLA_CHANNELS * 2 * 20 / 1000)

            for i in range(0, len(pcm_data), chunk_size):
                chunk = pcm_data[i:i + chunk_size]

                if 0 < len(chunk) < chunk_size:
                    padding = b"\x00" * (chunk_size - len(chunk))
                    chunk += padding

                if len(chunk) == chunk_size and self.audio_track:
                    if not self._first_chunk_sent and self._first_audio_callback:
                        self._first_chunk_sent = True
                        await self._first_audio_callback()

                    asyncio.create_task(self.audio_track.add_new_bytes(chunk))
                    await asyncio.sleep(0.01)

        except Exception as e:
            self.emit(
                "error", f"Failed to decode or stream Papla audio: {str(e)}")

    async def aclose(self) -> None:
        if self._client and not self._client.is_closed:
            await self._client.aclose()
        await super().aclose()

    async def interrupt(self) -> None:
        if self.audio_track:
            self.audio_track.interrupt()

Base class for Text-to-Speech implementations

Initialize the Papla TTS plugin.

Args

api_key : Optional[str], optional
Papla API key. Defaults to None.
model_id : str
The model ID to use for the TTS plugin. Defaults to "papla_p1".
base_url : str
The base URL to use for the TTS plugin. Defaults to "https://api.papla.media/v1".

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._client and not self._client.is_closed:
        await self._client.aclose()
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt the TTS process

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[str] | str,
    voice_id: Optional[str] = None,
    **kwargs: Any,
) -> None:
    """
    Convert text to speech using Papla's streaming TTS API.
    This now includes decoding the received MP3 audio to raw PCM.
    """
    try:
        if not self.audio_track or not self.loop:
            self.emit(
                "error", "Audio track or event loop not set by the framework.")
            return

        if isinstance(text, AsyncIterator):
            async for segment in segment_text(text):
                await self._synthesize_segment(segment, voice_id, **kwargs)
        else:
            await self._synthesize_segment(text, voice_id, **kwargs)

    except Exception as e:
        self.emit("error", f"Papla TTS synthesis failed: {str(e)}")

Convert text to speech using Papla's streaming TTS API. This now includes decoding the received MP3 audio to raw PCM.