Module videosdk.plugins.nvidia.tts

Classes

class NvidiaTTS (*,
api_key: str | None = None,
server: str = 'grpc.nvcf.nvidia.com:443',
function_id: str = '877104f7-e885-42b9-8de8-f6e4c6303969',
voice_name: str = 'Magpie-Multilingual.EN-US.Aria',
language_code: str = 'en-US',
sample_rate: int = 24000,
use_ssl: bool = True)
Expand source code
class NvidiaTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        server: str = "grpc.nvcf.nvidia.com:443",
        function_id: str = "877104f7-e885-42b9-8de8-f6e4c6303969", 
        voice_name: str = "Magpie-Multilingual.EN-US.Aria",
        language_code: str = "en-US",
        sample_rate: int = RIVA_SAMPLE_RATE,
        use_ssl: bool = True,
    ) -> None:
        super().__init__(sample_rate=sample_rate, num_channels=RIVA_CHANNELS)

        if riva is None:
             raise ImportError("nvidia-riva-client is not installed.")

        self.api_key = api_key or os.getenv("NVIDIA_API_KEY")
        if not self.api_key:
            raise ValueError("NVIDIA API key must be provided")

        self.server = server
        self.function_id = function_id
        self.voice_name = voice_name
        self.language_code = language_code
        self.use_ssl = use_ssl        
        self._service = None
        self._main_loop = asyncio.get_event_loop()
        self._interrupted = False

        self._initialize_client()

    def _initialize_client(self):
        auth = riva.client.Auth(
            ssl_root_cert=None, 
            use_ssl=self.use_ssl, 
            uri=self.server, 
            metadata_args=[
                ["function-id", self.function_id],
                ["authorization", f"Bearer {self.api_key}"],
            ]
        )
        self._service = riva.client.SpeechSynthesisService(auth)

    async def synthesize(
        self, 
        text: AsyncIterator[str] | str, 
        **kwargs: Any,
    ) -> None:
        """Synthesize text to speech using NVIDIA Riva."""
        try:
            if not self.audio_track:
                self.emit("error", "Audio track not set")
                return

            self._interrupted = False

            input_text = ""
            if inspect.isasyncgen(text):
                async for chunk in text:
                    if self._interrupted:
                        break
                    # Pipeline yields a terminal FlushMarker at end of LLM
                    # stream — skip it; Riva's online API takes the full
                    # utterance text and segments server-side, so per-segment
                    # flush markers have no equivalent here.
                    if isinstance(chunk, FlushMarker):
                        continue
                    input_text += chunk
            else:
                input_text = text

            if not input_text.strip() or self._interrupted:
                return

            await asyncio.to_thread(self._worker_synthesize, input_text)

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")
            logger.error(f"Nvidia TTS Error: {e}")

    def _worker_synthesize(self, text: str):
        """Blocking worker that streams audio back to the main loop."""
        try:
            logger.info(f"Nvidia TTS: Requesting '{text[:20]}...' at {self.sample_rate}Hz")
            
            responses = self._service.synthesize_online(
                text,
                voice_name=self.voice_name,
                language_code=self.language_code,
                sample_rate_hz=self.sample_rate, 
                encoding=AudioEncoding.LINEAR_PCM 
            )
            
            first_chunk = True
            
            for resp in responses:
                if self._interrupted:
                    break
                    
                audio_data = resp.audio
                if audio_data:
                    self._main_loop.call_soon_threadsafe(
                        self._dispatch_audio, audio_data, first_chunk
                    )
                    first_chunk = False
                    
        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")
            logger.error(f"Nvidia TTS Worker Error: {e}")

    def _dispatch_audio(self, audio_data: bytes, is_first: bool):
        """Executed on main loop."""
        if self._interrupted:
            return

        if is_first and self._first_audio_callback:
            asyncio.create_task(self._first_audio_callback())

        if self.audio_track:
            asyncio.create_task(self.audio_track.add_new_bytes(audio_data))

    async def interrupt(self) -> None:
        """Interrupt current synthesis and audio playback."""
        self._interrupted = True
        if self.audio_track:
            self.audio_track.interrupt()

    async def aclose(self) -> None:
        await super().aclose()

Base class for Text-to-Speech implementations

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt current synthesis and audio playback."""
    self._interrupted = True
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt current synthesis and audio playback.

async def synthesize(self, text: AsyncIterator[str] | str, **kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self, 
    text: AsyncIterator[str] | str, 
    **kwargs: Any,
) -> None:
    """Synthesize text to speech using NVIDIA Riva."""
    try:
        if not self.audio_track:
            self.emit("error", "Audio track not set")
            return

        self._interrupted = False

        input_text = ""
        if inspect.isasyncgen(text):
            async for chunk in text:
                if self._interrupted:
                    break
                # Pipeline yields a terminal FlushMarker at end of LLM
                # stream — skip it; Riva's online API takes the full
                # utterance text and segments server-side, so per-segment
                # flush markers have no equivalent here.
                if isinstance(chunk, FlushMarker):
                    continue
                input_text += chunk
        else:
            input_text = text

        if not input_text.strip() or self._interrupted:
            return

        await asyncio.to_thread(self._worker_synthesize, input_text)

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")
        logger.error(f"Nvidia TTS Error: {e}")

Synthesize text to speech using NVIDIA Riva.