Module videosdk.plugins.google.tts

Classes

class GoogleTTS (*,
api_key: str | None = None,
speed: float = 1.0,
pitch: float = 0.0,
response_format: "Literal['pcm']" = 'pcm',
voice_config: GoogleVoiceConfig | None = None,
custom_pronunciations: list[dict] | dict | None = None,
vertexai: bool = False,
vertexai_config: VertexAIConfig | None = None,
streaming: bool = True)
Expand source code
class GoogleTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        speed: float = 1.0,
        pitch: float = 0.0,
        response_format: Literal["pcm"] = "pcm",
        voice_config: GoogleVoiceConfig | None = None,
        custom_pronunciations: list[dict] | dict | None = None,
        vertexai: bool = False,
        vertexai_config: VertexAIConfig | None = None,
        streaming: bool = True,
    ) -> None:
        """Initialize the Google TTS plugin.

        Args:
            api_key (Optional[str], optional): Google API key. Defaults to None.
            speed (float): The speed to use for the TTS plugin. Defaults to 1.0.
            pitch (float): The pitch to use for the TTS plugin. Defaults to 0.0.
            response_format (Literal["pcm"]): The response format to use for the TTS plugin. Defaults to "pcm".
            voice_config (GoogleVoiceConfig | None): The voice configuration to use for the TTS plugin. Defaults to None.
            custom_pronunciations: IPA pronunciation overrides,
                                   e.g. [{"tomato": "təˈmeɪtoʊ"}].
            vertexai: Use Vertex AI TTS endpoint with ADC authentication.
            vertexai_config: Project / location settings for Vertex AI.
            streaming: Use gRPC StreamingSynthesize for lower latency.
                       Compatible with vertexai=True — routes over gRPC to the regional endpoint.

        Requires: pip install google-cloud-texttospeech
        """
        super().__init__(sample_rate=GOOGLE_SAMPLE_RATE, num_channels=GOOGLE_CHANNELS)

        try:
            from google.cloud import texttospeech_v1
        except ImportError as exc:
            raise ImportError(
                "google-cloud-texttospeech is required. "
                "Install it with: pip install google-cloud-texttospeech"
            ) from exc

        self._tts = texttospeech_v1

        self.speed = speed
        self.pitch = pitch
        self.response_format = response_format
        self.audio_track = None
        self.loop = None
        self._first_chunk_sent = False
        self.voice_config = voice_config or GoogleVoiceConfig()
        self.custom_pronunciations = custom_pronunciations
        self.vertexai = vertexai
        self.vertexai_config = vertexai_config or VertexAIConfig()
        self.streaming = streaming
        if self.streaming and self.vertexai:
            raise ValueError("Streaming and vertexai cannot be used together.")
        resolved_voice = (voice_config or GoogleVoiceConfig()).name
        if streaming and not self._is_chirp3_hd_voice(resolved_voice):
            raise ValueError(
                f"Streaming synthesis only supports Chirp 3 HD voices "
                f"(e.g. 'en-US-Chirp3-HD-Aoede'). "
                f"Got: '{resolved_voice}'. "
                f"See https://cloud.google.com/text-to-speech/docs/chirp3-hd for available voices."
            )

        self._client = self._build_client(api_key)

    @staticmethod
    def _is_chirp3_hd_voice(name: str) -> bool:
        return "chirp3-hd" in name.lower()

    def _build_client(self, api_key: str | None) -> Any:
        """Construct a TextToSpeechAsyncClient."""
        from google.api_core.client_options import ClientOptions

        if self.vertexai:
            project_id = (
                self.vertexai_config.project_id
                or os.getenv("GOOGLE_CLOUD_PROJECT")
                or os.getenv("GCLOUD_PROJECT")
            )

            if project_id is None:
                service_account_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
                if service_account_path:
                    try:
                        from google.oauth2 import service_account
                        creds = service_account.Credentials.from_service_account_file(
                            service_account_path
                        )
                        project_id = creds.project_id
                    except Exception:
                        pass

            if project_id is None:
                raise ValueError(
                    "Vertex AI TTS requires a GCP project ID. Provide one of:\n"
                    "1. vertexai_config=VertexAIConfig(project_id='my-project')\n"
                    "2. GOOGLE_CLOUD_PROJECT environment variable\n"
                    "3. GOOGLE_APPLICATION_CREDENTIALS pointing to a service-account file"
                )

            location = (
                self.vertexai_config.location
                or os.getenv("GOOGLE_CLOUD_LOCATION")
                or "us-central1"
            )
            self.vertexai_config.project_id = project_id
            self.vertexai_config.location = location

            return self._tts.TextToSpeechAsyncClient(
                client_options=ClientOptions(
                    api_endpoint=f"{location}-texttospeech.googleapis.com"
                )
            )

        else:
            resolved_key = api_key or os.getenv("GOOGLE_API_KEY")
            if not resolved_key:
                raise ValueError(
                    "Google TTS API key required. Provide either:\n"
                    "1. api_key parameter, OR\n"
                    "2. GOOGLE_API_KEY environment variable"
                )
            return self._tts.TextToSpeechAsyncClient(
                client_options=ClientOptions(api_key=resolved_key)
            )
    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        **kwargs: Any,
    ) -> None:
        try:
            if self.streaming:
                await self._synthesize_streaming(text)
            elif isinstance(text, AsyncIterator):
                async for segment in segment_text(text):
                    await self._synthesize_audio(segment)
            else:
                await self._synthesize_audio(text)

            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or loop not initialized")
                return

        except Exception as e:
            self.emit("error", f"Google TTS synthesis failed: {str(e)}")
            raise

    async def _synthesize_audio(self, text: str) -> None:
        """Single-request synthesis via SynthesizeSpeech."""
        tts = self._tts

        if self.custom_pronunciations:
            synthesis_input = tts.SynthesisInput(
                text=text,
                custom_pronunciations=self._build_custom_pronunciations_proto(),
            )
        else:
            synthesis_input = tts.SynthesisInput(text=text)
        is_studio = self.voice_config.name.startswith("en-US-Studio")

        voice_params = tts.VoiceSelectionParams(
            language_code=self.voice_config.languageCode,
            name=self.voice_config.name,
        )
        if not is_studio:
            voice_params.ssml_gender = tts.SsmlVoiceGender[self.voice_config.ssmlGender]
        response = await self._client.synthesize_speech(
            input=synthesis_input,
            voice=voice_params,
            audio_config=tts.AudioConfig(
                audio_encoding=tts.AudioEncoding.LINEAR16,
                speaking_rate=self.speed,
                pitch=self.pitch,
                sample_rate_hertz=GOOGLE_SAMPLE_RATE,
            ),
        )

        if not response.audio_content:
            self.emit("error", "No audio content received from Google TTS")
            return

        await self._stream_audio_chunks(response.audio_content)

    async def _synthesize_streaming(self, text: AsyncIterator[str] | str) -> None:
        """Bidirectional gRPC streaming via StreamingSynthesize."""
        tts = self._tts

        streaming_config_kwargs: dict = dict(
            voice=tts.VoiceSelectionParams(
                language_code=self.voice_config.languageCode,
                name=self.voice_config.name,
            ),
            streaming_audio_config=tts.StreamingAudioConfig(
                audio_encoding=tts.AudioEncoding.PCM,
                sample_rate_hertz=GOOGLE_SAMPLE_RATE,
                speaking_rate=self.speed,
            ),
        )
        if self.custom_pronunciations:
            streaming_config_kwargs["custom_pronunciations"] = (
                self._build_custom_pronunciations_proto()
            )

        streaming_config = tts.StreamingSynthesizeConfig(**streaming_config_kwargs)

        async def request_generator() -> AsyncIterator[Any]:
            yield tts.StreamingSynthesizeRequest(streaming_config=streaming_config)
            if isinstance(text, str):
                yield tts.StreamingSynthesizeRequest(
                    input=tts.StreamingSynthesisInput(text=text)
                )
            else:
                async for chunk in text:
                    if chunk:
                        yield tts.StreamingSynthesizeRequest(
                            input=tts.StreamingSynthesisInput(text=chunk)
                        )

        try:
            async for response in await self._client.streaming_synthesize(
                request_generator()
            ):
                if response.audio_content:
                    await self._stream_audio_chunks(response.audio_content, has_wav_header=False)
        except Exception as e:
            self.emit("error", f"Google TTS streaming error: {str(e)}")
            raise

    def _build_custom_pronunciations_proto(self) -> Any:
        """Convert self.custom_pronunciations to a CustomPronunciations proto."""
        tts = self._tts
        params = []
        try:
            from google.cloud.texttospeech_v1.types import CustomPronunciationParams as _CPP
            PE = _CPP.PhoneticEncoding
            ENCODING_MAP = {
                "ipa":    PE.PHONETIC_ENCODING_IPA,
                "x-sampa": PE.PHONETIC_ENCODING_X_SAMPA,
            }
        except (ImportError, AttributeError):
            ENCODING_MAP = {"ipa": 1, "x-sampa": 2}

        if not self.custom_pronunciations:
            return tts.CustomPronunciations(pronunciations=[])

        raw = self.custom_pronunciations
        entries: list[tuple[str, str, Any]] = []

        if isinstance(raw, dict):
            for phrase, pronunciation in raw.items():
                entries.append((phrase, pronunciation, ENCODING_MAP["ipa"]))
        else:
            for item in raw:
                if not isinstance(item, dict):
                    continue
                if "phrase" in item and "pronunciation" in item:
                    enc_key = item.get("encoding", "ipa").lower()
                    enc = ENCODING_MAP.get(enc_key, ENCODING_MAP["ipa"])
                    if enc_key not in ENCODING_MAP:
                        logger.warning(
                            f"Unknown encoding '{enc_key}' for phrase '{item['phrase']}'. "
                            f"Supported: {list(ENCODING_MAP.keys())}. Falling back to IPA.",
                            UserWarning, stacklevel=3,
                        )
                    entries.append((item["phrase"], item["pronunciation"], enc))
                else:
                    for phrase, pronunciation in item.items():
                        entries.append((phrase, pronunciation, ENCODING_MAP["ipa"]))


        if self.voice_config.languageCode.lower() != "en-us":
            logger.warning(
                f"custom_pronunciations is only supported for en-US. "
                f"Got '{self.voice_config.languageCode}' — pronunciations will be ignored.",
                UserWarning,
                stacklevel=3,
            )

        for phrase, pronunciation, encoding in entries:
            if not phrase or not pronunciation:
                continue
            try:
                params.append(
                    tts.CustomPronunciationParams(
                        phrase=phrase,
                        pronunciation=pronunciation,
                        phonetic_encoding=encoding,
                    )
                )
            except Exception as e:
                logger.warning(
                    f"Skipping custom pronunciation for '{phrase}': {e}",
                    UserWarning,
                    stacklevel=3,
                )

        if not params:
            logger.warning(
                "custom_pronunciations was set but no valid entries were built. "
                "Check your phrase/pronunciation format.",
                UserWarning,
                stacklevel=3,
            )

        return tts.CustomPronunciations(pronunciations=params)


    async def _stream_audio_chunks(
        self, audio_bytes: bytes, has_wav_header: bool = True
    ) -> None:
        """Chunk raw PCM and forward to the audio track."""
        chunk_size = 960
        audio_data = self._remove_wav_header(audio_bytes) if has_wav_header else audio_bytes

        for i in range(0, len(audio_data), chunk_size):
            chunk = audio_data[i:i + chunk_size]

            if len(chunk) < chunk_size and len(chunk) > 0:
                padding_needed = chunk_size - len(chunk)
                chunk += b'\x00' * padding_needed

            if len(chunk) == chunk_size:
                if not self._first_chunk_sent and self._first_audio_callback:
                    self._first_chunk_sent = True
                    await self._first_audio_callback()

                asyncio.create_task(self.audio_track.add_new_bytes(chunk))
                await asyncio.sleep(0.001)

    def _remove_wav_header(self, audio_bytes: bytes) -> bytes:
        """Remove WAV header if present to get raw PCM data"""
        if audio_bytes.startswith(b"RIFF"):
            data_pos = audio_bytes.find(b"data")
            if data_pos != -1:
                return audio_bytes[data_pos + 8:]

        return audio_bytes

    async def aclose(self) -> None:
        if self._client:
            await self._client.transport.close()
        await super().aclose()

    async def interrupt(self) -> None:
        if self.audio_track:
            self.audio_track.interrupt()

Base class for Text-to-Speech implementations

Initialize the Google TTS plugin.

Args

api_key : Optional[str], optional
Google API key. Defaults to None.
speed : float
The speed to use for the TTS plugin. Defaults to 1.0.
pitch : float
The pitch to use for the TTS plugin. Defaults to 0.0.
response_format (Literal["pcm"]): The response format to use for the TTS plugin. Defaults to "pcm".
voice_config : GoogleVoiceConfig | None
The voice configuration to use for the TTS plugin. Defaults to None.
custom_pronunciations
IPA pronunciation overrides, e.g. [{"tomato": "təˈmeɪtoʊ"}].
vertexai
Use Vertex AI TTS endpoint with ADC authentication.
vertexai_config
Project / location settings for Vertex AI.
streaming
Use gRPC StreamingSynthesize for lower latency. Compatible with vertexai=True — routes over gRPC to the regional endpoint.

Requires: pip install google-cloud-texttospeech

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._client:
        await self._client.transport.close()
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt the TTS process

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self, text: AsyncIterator[str] | str, **kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[str] | str,
    **kwargs: Any,
) -> None:
    try:
        if self.streaming:
            await self._synthesize_streaming(text)
        elif isinstance(text, AsyncIterator):
            async for segment in segment_text(text):
                await self._synthesize_audio(segment)
        else:
            await self._synthesize_audio(text)

        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or loop not initialized")
            return

    except Exception as e:
        self.emit("error", f"Google TTS synthesis failed: {str(e)}")
        raise

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None

class GoogleVoiceConfig (languageCode: str = 'en-US',
name: str = 'en-US-Chirp3-HD-Charon',
ssmlGender: str = 'MALE')
Expand source code
@dataclass
class GoogleVoiceConfig:
    languageCode: str = "en-US"
    name: str = "en-US-Chirp3-HD-Charon"
    ssmlGender: str = "MALE"

GoogleVoiceConfig(languageCode: 'str' = 'en-US', name: 'str' = 'en-US-Chirp3-HD-Charon', ssmlGender: 'str' = 'MALE')

Instance variables

var languageCode : str
var name : str
var ssmlGender : str
class VertexAIConfig (project_id: str | None = None, location: str = 'us-central1')
Expand source code
@dataclass
class VertexAIConfig:
    project_id: str | None = None
    location: str = "us-central1"

VertexAIConfig(project_id: 'str | None' = None, location: 'str' = 'us-central1')

Instance variables

var location : str
var project_id : str | None