Module videosdk.plugins.google.tts

Classes

class GoogleTTS (*,
api_key: str | None = None,
speed: float = 1.0,
pitch: float = 0.0,
response_format: "Literal['pcm']" = 'pcm',
voice_config: GoogleVoiceConfig | None = None,
custom_pronunciations: list[dict] | dict | None = None,
vertexai: bool = False,
vertexai_config: VertexAIConfig | None = None,
streaming: bool = True,
model: str | None = None,
prompt: str | None = None)
Expand source code
class GoogleTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        speed: float = 1.0,
        pitch: float = 0.0,
        response_format: Literal["pcm"] = "pcm",
        voice_config: GoogleVoiceConfig | None = None,
        custom_pronunciations: list[dict] | dict | None = None,
        vertexai: bool = False,
        vertexai_config: VertexAIConfig | None = None,
        streaming: bool = True,
        model: str | None = None,
        prompt: str | None = None,
    ) -> None:
        """Initialize the Google TTS plugin.

        Args:
            api_key (Optional[str], optional): Google API key. Defaults to None.
            speed (float): The speed to use for the TTS plugin. Defaults to 1.0.
            pitch (float): The pitch to use for the TTS plugin. Defaults to 0.0.
            response_format (Literal["pcm"]): The response format to use for the TTS plugin. Defaults to "pcm".
            voice_config (GoogleVoiceConfig | None): The voice configuration to use for the TTS plugin. Defaults to None.
            custom_pronunciations: IPA pronunciation overrides,
                                   e.g. [{"tomato": "təˈmeɪtoʊ"}].
            vertexai: Use Vertex AI TTS endpoint with ADC authentication.
            vertexai_config: Project / location settings for Vertex AI.
            streaming: Use gRPC StreamingSynthesize for lower latency.
                       Compatible with vertexai=True — routes over gRPC to the regional endpoint.
            model: Optional Gemini-TTS engine, e.g. "gemini-3.1-flash-tts-preview",
                   "gemini-2.5-flash-tts", "gemini-2.5-flash-lite-preview-tts",
                   or "gemini-2.5-pro-tts". When set, voice_config.name is the bare
                   Gemini voice (e.g. "Kore", "Charon") — not a Chirp 3 HD locale-prefixed name.
                   When None, the plugin uses standard Cloud TTS (Chirp 3 HD via voice name).
            prompt: Natural-language style instruction for Gemini-TTS
                    (e.g. "Speak in a warm, professional tone"). Only valid when
                    model is a Gemini-TTS engine.

        Requires: pip install google-cloud-texttospeech
        """
        super().__init__(sample_rate=GOOGLE_SAMPLE_RATE, num_channels=GOOGLE_CHANNELS)

        try:
            from google.cloud import texttospeech_v1
        except ImportError as exc:
            raise ImportError(
                "google-cloud-texttospeech is required. "
                "Install it with: pip install google-cloud-texttospeech"
            ) from exc

        self._tts = texttospeech_v1

        self.speed = speed
        self.pitch = pitch
        self.response_format = response_format
        self.audio_track = None
        self.loop = None
        self._first_chunk_sent = False
        self._interrupted = False
        self.voice_config = voice_config or GoogleVoiceConfig()
        self.custom_pronunciations = custom_pronunciations
        self.vertexai = vertexai
        self.vertexai_config = vertexai_config or VertexAIConfig()
        self.streaming = streaming
        self.model = model
        self.prompt = prompt
        if self.streaming and self.vertexai:
            raise ValueError("Streaming and vertexai cannot be used together.")
        if self.prompt is not None and self.model is None:
            raise ValueError(
                "prompt is only supported with Gemini-TTS models. "
                "Set model='gemini-3.1-flash-tts-preview' (or another gemini-*-tts model) "
                "to use prompt-based style control."
            )
        resolved_voice = (voice_config or GoogleVoiceConfig()).name
        if streaming and self.model is None and not self._is_chirp3_hd_voice(resolved_voice):
            raise ValueError(
                f"Streaming synthesis without a Gemini-TTS model only supports Chirp 3 HD voices "
                f"(e.g. 'en-US-Chirp3-HD-Aoede'). "
                f"Got: '{resolved_voice}'. "
                f"For Gemini-TTS, pass model='gemini-3.1-flash-tts-preview' (or similar). "
                f"See https://cloud.google.com/text-to-speech/docs/chirp3-hd or "
                f"https://cloud.google.com/text-to-speech/docs/gemini-tts."
            )

        self._client = self._build_client(api_key)

    @staticmethod
    def _is_chirp3_hd_voice(name: str) -> bool:
        return "chirp3-hd" in name.lower()

    def _build_client(self, api_key: str | None) -> Any:
        """Construct a TextToSpeechAsyncClient."""
        from google.api_core.client_options import ClientOptions

        if self.vertexai:
            project_id = (
                self.vertexai_config.project_id
                or os.getenv("GOOGLE_CLOUD_PROJECT")
                or os.getenv("GCLOUD_PROJECT")
            )

            if project_id is None:
                service_account_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
                if service_account_path:
                    try:
                        from google.oauth2 import service_account
                        creds = service_account.Credentials.from_service_account_file(
                            service_account_path
                        )
                        project_id = creds.project_id
                    except Exception:
                        pass

            if project_id is None:
                raise ValueError(
                    "Vertex AI TTS requires a GCP project ID. Provide one of:\n"
                    "1. vertexai_config=VertexAIConfig(project_id='my-project')\n"
                    "2. GOOGLE_CLOUD_PROJECT environment variable\n"
                    "3. GOOGLE_APPLICATION_CREDENTIALS pointing to a service-account file"
                )

            location = (
                self.vertexai_config.location
                or os.getenv("GOOGLE_CLOUD_LOCATION")
                or "us-central1"
            )
            self.vertexai_config.project_id = project_id
            self.vertexai_config.location = location

            return self._tts.TextToSpeechAsyncClient(
                client_options=ClientOptions(
                    api_endpoint=f"{location}-texttospeech.googleapis.com"
                )
            )

        else:
            resolved_key = api_key or os.getenv("GOOGLE_API_KEY")
            if not resolved_key:
                raise ValueError(
                    "Google TTS API key required. Provide either:\n"
                    "1. api_key parameter, OR\n"
                    "2. GOOGLE_API_KEY environment variable"
                )
            return self._tts.TextToSpeechAsyncClient(
                client_options=ClientOptions(api_key=resolved_key)
            )
    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def prewarm(self) -> None:
        """Pre-warm the gRPC channel so the first ``synthesize()`` call doesn't
        pay the TLS + HTTP/2 SETTINGS + auth handshake (~150–400ms). Safe to
        call repeatedly."""
        try:
            channel = self._client.transport.grpc_channel
            if hasattr(channel, "channel_ready"):
                await channel.channel_ready()
        except Exception as e:
            logger.warning(f"Google TTS prewarm failed (non-fatal): {e}")

    async def synthesize(
        self,
        text: AsyncIterator[Union[str, FlushMarker]] | str,
        **kwargs: Any,
    ) -> None:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or loop not initialized")
            return

        self._interrupted = False
        try:
            if self.streaming:
                await self._synthesize_streaming(text)
            elif isinstance(text, str):
                await self._synthesize_audio(text)
            else:
                async for segment in segment_text(text):
                    if self._interrupted:
                        break
                    await self._synthesize_audio(segment)

        except Exception as e:
            self.emit("error", f"Google TTS synthesis failed: {str(e)}")
            raise

    async def _synthesize_audio(self, text: str) -> None:
        """Single-request synthesis via SynthesizeSpeech."""
        tts = self._tts

        synthesis_input_kwargs: dict = {"text": text}
        if self.prompt:
            synthesis_input_kwargs["prompt"] = self.prompt
        if self.custom_pronunciations:
            synthesis_input_kwargs["custom_pronunciations"] = (
                self._build_custom_pronunciations_proto()
            )
        synthesis_input = tts.SynthesisInput(**synthesis_input_kwargs)
        is_studio = self.voice_config.name.startswith("en-US-Studio")

        voice_kwargs: dict = {
            "language_code": self.voice_config.languageCode,
            "name": self.voice_config.name,
        }
        if self.model:
            voice_kwargs["model_name"] = self.model
        voice_params = tts.VoiceSelectionParams(**voice_kwargs)
        if not is_studio and not self.model:
            voice_params.ssml_gender = tts.SsmlVoiceGender[self.voice_config.ssmlGender]
        response = await self._client.synthesize_speech(
            input=synthesis_input,
            voice=voice_params,
            audio_config=tts.AudioConfig(
                audio_encoding=tts.AudioEncoding.LINEAR16,
                speaking_rate=self.speed,
                pitch=self.pitch,
                sample_rate_hertz=GOOGLE_SAMPLE_RATE,
            ),
        )

        if not response.audio_content:
            self.emit("error", "No audio content received from Google TTS")
            return

        await self._stream_audio_chunks(response.audio_content)

    async def _synthesize_streaming(self, text: AsyncIterator[str] | str) -> None:
        """Bidirectional gRPC streaming via StreamingSynthesize."""
        tts = self._tts

        voice_kwargs: dict = {
            "language_code": self.voice_config.languageCode,
            "name": self.voice_config.name,
        }
        if self.model:
            voice_kwargs["model_name"] = self.model

        streaming_config_kwargs: dict = dict(
            voice=tts.VoiceSelectionParams(**voice_kwargs),
            streaming_audio_config=tts.StreamingAudioConfig(
                audio_encoding=tts.AudioEncoding.PCM,
                sample_rate_hertz=GOOGLE_SAMPLE_RATE,
                speaking_rate=self.speed,
            ),
        )
        if self.custom_pronunciations:
            streaming_config_kwargs["custom_pronunciations"] = (
                self._build_custom_pronunciations_proto()
            )

        streaming_config = tts.StreamingSynthesizeConfig(**streaming_config_kwargs)

        def _make_input(chunk: str, include_prompt: bool) -> Any:
            kwargs: dict = {"text": chunk}
            if include_prompt and self.prompt:
                kwargs["prompt"] = self.prompt
            return tts.StreamingSynthesisInput(**kwargs)

        async def request_generator() -> AsyncIterator[Any]:
            yield tts.StreamingSynthesizeRequest(streaming_config=streaming_config)
            if isinstance(text, str):
                yield tts.StreamingSynthesizeRequest(
                    input=_make_input(text, include_prompt=True)
                )
            else:
                is_first = True
                async for chunk in text:
                    if self._interrupted:
                        break
                    # Drop FlushMarker — gRPC StreamingSynthesize has no flush
                    # primitive, and the server segments naturally as text
                    # arrives.
                    if isinstance(chunk, FlushMarker):
                        continue
                    if chunk:
                        yield tts.StreamingSynthesizeRequest(
                            input=_make_input(chunk, include_prompt=is_first)
                        )
                        is_first = False

        try:
            async for response in await self._client.streaming_synthesize(
                request_generator()
            ):
                if self._interrupted:
                    break
                if response.audio_content:
                    await self._stream_audio_chunks(response.audio_content, has_wav_header=False)
        except Exception as e:
            if not self._interrupted:
                self.emit("error", f"Google TTS streaming error: {str(e)}")
                raise

    def _build_custom_pronunciations_proto(self) -> Any:
        """Convert self.custom_pronunciations to a CustomPronunciations proto."""
        tts = self._tts
        params = []
        try:
            from google.cloud.texttospeech_v1.types import CustomPronunciationParams as _CPP
            PE = _CPP.PhoneticEncoding
            ENCODING_MAP = {
                "ipa":    PE.PHONETIC_ENCODING_IPA,
                "x-sampa": PE.PHONETIC_ENCODING_X_SAMPA,
            }
        except (ImportError, AttributeError):
            ENCODING_MAP = {"ipa": 1, "x-sampa": 2}

        if not self.custom_pronunciations:
            return tts.CustomPronunciations(pronunciations=[])

        raw = self.custom_pronunciations
        entries: list[tuple[str, str, Any]] = []

        if isinstance(raw, dict):
            for phrase, pronunciation in raw.items():
                entries.append((phrase, pronunciation, ENCODING_MAP["ipa"]))
        else:
            for item in raw:
                if not isinstance(item, dict):
                    continue
                if "phrase" in item and "pronunciation" in item:
                    enc_key = item.get("encoding", "ipa").lower()
                    enc = ENCODING_MAP.get(enc_key, ENCODING_MAP["ipa"])
                    if enc_key not in ENCODING_MAP:
                        logger.warning(
                            f"Unknown encoding '{enc_key}' for phrase '{item['phrase']}'. "
                            f"Supported: {list(ENCODING_MAP.keys())}. Falling back to IPA.",
                            UserWarning, stacklevel=3,
                        )
                    entries.append((item["phrase"], item["pronunciation"], enc))
                else:
                    for phrase, pronunciation in item.items():
                        entries.append((phrase, pronunciation, ENCODING_MAP["ipa"]))


        if self.voice_config.languageCode.lower() != "en-us":
            logger.warning(
                f"custom_pronunciations is only supported for en-US. "
                f"Got '{self.voice_config.languageCode}' — pronunciations will be ignored.",
                UserWarning,
                stacklevel=3,
            )

        for phrase, pronunciation, encoding in entries:
            if not phrase or not pronunciation:
                continue
            try:
                params.append(
                    tts.CustomPronunciationParams(
                        phrase=phrase,
                        pronunciation=pronunciation,
                        phonetic_encoding=encoding,
                    )
                )
            except Exception as e:
                logger.warning(
                    f"Skipping custom pronunciation for '{phrase}': {e}",
                    UserWarning,
                    stacklevel=3,
                )

        if not params:
            logger.warning(
                "custom_pronunciations was set but no valid entries were built. "
                "Check your phrase/pronunciation format.",
                UserWarning,
                stacklevel=3,
            )

        return tts.CustomPronunciations(pronunciations=params)


    async def _stream_audio_chunks(
        self, audio_bytes: bytes, has_wav_header: bool = True
    ) -> None:
        """Chunk raw PCM and forward to the audio track."""
        if self._interrupted:
            return
        chunk_size = 960
        audio_data = self._remove_wav_header(audio_bytes) if has_wav_header else audio_bytes

        for i in range(0, len(audio_data), chunk_size):
            if self._interrupted:
                return
            chunk = audio_data[i:i + chunk_size]

            if len(chunk) < chunk_size and len(chunk) > 0:
                padding_needed = chunk_size - len(chunk)
                chunk += b'\x00' * padding_needed

            if len(chunk) == chunk_size:
                if not self._first_chunk_sent and self._first_audio_callback:
                    self._first_chunk_sent = True
                    await self._first_audio_callback()

                asyncio.create_task(self.audio_track.add_new_bytes(chunk))
                await asyncio.sleep(0.001)

    def _remove_wav_header(self, audio_bytes: bytes) -> bytes:
        """Remove WAV header if present to get raw PCM data"""
        if audio_bytes.startswith(b"RIFF"):
            data_pos = audio_bytes.find(b"data")
            if data_pos != -1:
                return audio_bytes[data_pos + 8:]

        return audio_bytes

    async def aclose(self) -> None:
        if self._client:
            await self._client.transport.close()
        await super().aclose()

    async def interrupt(self) -> None:
        self._interrupted = True
        if self.audio_track:
            self.audio_track.interrupt()

Base class for Text-to-Speech implementations

Initialize the Google TTS plugin.

Args

api_key : Optional[str], optional
Google API key. Defaults to None.
speed : float
The speed to use for the TTS plugin. Defaults to 1.0.
pitch : float
The pitch to use for the TTS plugin. Defaults to 0.0.
response_format (Literal["pcm"]): The response format to use for the TTS plugin. Defaults to "pcm".
voice_config : GoogleVoiceConfig | None
The voice configuration to use for the TTS plugin. Defaults to None.
custom_pronunciations
IPA pronunciation overrides, e.g. [{"tomato": "təˈmeɪtoʊ"}].
vertexai
Use Vertex AI TTS endpoint with ADC authentication.
vertexai_config
Project / location settings for Vertex AI.
streaming
Use gRPC StreamingSynthesize for lower latency. Compatible with vertexai=True — routes over gRPC to the regional endpoint.
model
Optional Gemini-TTS engine, e.g. "gemini-3.1-flash-tts-preview", "gemini-2.5-flash-tts", "gemini-2.5-flash-lite-preview-tts", or "gemini-2.5-pro-tts". When set, voice_config.name is the bare Gemini voice (e.g. "Kore", "Charon") — not a Chirp 3 HD locale-prefixed name. When None, the plugin uses standard Cloud TTS (Chirp 3 HD via voice name).
prompt
Natural-language style instruction for Gemini-TTS (e.g. "Speak in a warm, professional tone"). Only valid when model is a Gemini-TTS engine.

Requires: pip install google-cloud-texttospeech

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._client:
        await self._client.transport.close()
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    self._interrupted = True
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt the TTS process

async def prewarm(self) ‑> None
Expand source code
async def prewarm(self) -> None:
    """Pre-warm the gRPC channel so the first ``synthesize()`` call doesn't
    pay the TLS + HTTP/2 SETTINGS + auth handshake (~150–400ms). Safe to
    call repeatedly."""
    try:
        channel = self._client.transport.grpc_channel
        if hasattr(channel, "channel_ready"):
            await channel.channel_ready()
    except Exception as e:
        logger.warning(f"Google TTS prewarm failed (non-fatal): {e}")

Pre-warm the gRPC channel so the first synthesize() call doesn't pay the TLS + HTTP/2 SETTINGS + auth handshake (~150–400ms). Safe to call repeatedly.

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self, text: AsyncIterator[Union[str, FlushMarker]] | str, **kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[Union[str, FlushMarker]] | str,
    **kwargs: Any,
) -> None:
    if not self.audio_track or not self.loop:
        self.emit("error", "Audio track or loop not initialized")
        return

    self._interrupted = False
    try:
        if self.streaming:
            await self._synthesize_streaming(text)
        elif isinstance(text, str):
            await self._synthesize_audio(text)
        else:
            async for segment in segment_text(text):
                if self._interrupted:
                    break
                await self._synthesize_audio(segment)

    except Exception as e:
        self.emit("error", f"Google TTS synthesis failed: {str(e)}")
        raise

Convert text to speech

Args

text
Text to convert to speech. Either a plain string or an async iterator that may yield str chunks and FlushMarker segment-boundary markers. Plugins that don't support per-segment flushing should drop the markers with an inline isinstance check (or rely on segment_text which already drops them).
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None

class GoogleVoiceConfig (languageCode: str = 'en-US',
name: str = 'en-US-Chirp3-HD-Charon',
ssmlGender: str = 'MALE')
Expand source code
@dataclass
class GoogleVoiceConfig:
    languageCode: str = "en-US"
    name: str = "en-US-Chirp3-HD-Charon"
    ssmlGender: str = "MALE"

GoogleVoiceConfig(languageCode: 'str' = 'en-US', name: 'str' = 'en-US-Chirp3-HD-Charon', ssmlGender: 'str' = 'MALE')

Instance variables

var languageCode : str
var name : str
var ssmlGender : str
class VertexAIConfig (project_id: str | None = None, location: str = 'us-central1')
Expand source code
@dataclass
class VertexAIConfig:
    project_id: str | None = None
    location: str = "us-central1"

VertexAIConfig(project_id: 'str | None' = None, location: 'str' = 'us-central1')

Instance variables

var location : str
var project_id : str | None