Package videosdk.plugins.google

Sub-modules

videosdk.plugins.google.live_api
videosdk.plugins.google.llm
videosdk.plugins.google.stt
videosdk.plugins.google.tts

Classes

class GeminiLiveConfig (voice: Voice | None = 'Puck',
language_code: str | None = 'en-US',
temperature: float | None = None,
top_p: float | None = None,
top_k: float | None = None,
candidate_count: int | None = 1,
max_output_tokens: int | None = None,
presence_penalty: float | None = None,
frequency_penalty: float | None = None,
response_modalities: List[Modality] | None = <factory>,
input_audio_transcription: AudioTranscriptionConfig | None = <factory>,
output_audio_transcription: AudioTranscriptionConfig | None = <factory>,
thinking_config: Optional[ThinkingConfig] | None = <factory>,
realtime_input_config: Optional[RealtimeInputConfig] | None = <factory>,
context_window_compression: Optional[ContextWindowCompressionConfig] | None = <factory>)
Expand source code
@dataclass
class GeminiLiveConfig:
    """Configuration for the Gemini Live API

    Args:
        voice: Voice ID for audio output. Options: 'Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'. Defaults to 'Puck'
        language_code: Language code for speech synthesis. Defaults to 'en-US'
        temperature: Controls randomness in response generation. Higher values (e.g. 0.8) make output more random,
                    lower values (e.g. 0.2) make it more focused. Defaults to None
        top_p: Nucleus sampling parameter. Controls diversity via cumulative probability cutoff. Range 0-1. Defaults to None
        top_k: Limits the number of tokens considered for each step of text generation. Defaults to None
        candidate_count: Number of response candidates to generate. Defaults to 1
        max_output_tokens: Maximum number of tokens allowed in model responses. Defaults to None
        presence_penalty: Penalizes tokens based on their presence in the text so far. Range -2.0 to 2.0. Defaults to None
        frequency_penalty: Penalizes tokens based on their frequency in the text so far. Range -2.0 to 2.0. Defaults to None
        response_modalities: List of enabled response types. Options: ["TEXT", "AUDIO"]. Defaults to ["AUDIO"]
        input_audio_transcription: Configuration for audio transcription features. Defaults to None
        output_audio_transcription: Configuration for audio transcription features. Defaults to None
        thinking_config: Configuration for model's "thinking" behavior. Defaults to None
        realtime_input_config: Configuration for realtime input handling. Defaults to None
        context_window_compression: Configuration for context window compression. Defaults to None
    """

    voice: Voice | None = "Puck"
    language_code: str | None = "en-US"
    temperature: float | None = None
    top_p: float | None = None
    top_k: float | None = None
    candidate_count: int | None = 1
    max_output_tokens: int | None = None
    presence_penalty: float | None = None
    frequency_penalty: float | None = None
    response_modalities: List[Modality] | None = field(
        default_factory=lambda: ["AUDIO"]
    )
    input_audio_transcription: AudioTranscriptionConfig | None = field(
        default_factory=dict
    )
    output_audio_transcription: AudioTranscriptionConfig | None = field(
        default_factory=dict
    )
    thinking_config: Optional[ThinkingConfig] | None = field(default_factory=dict)
    realtime_input_config:Optional[RealtimeInputConfig]| None = field(default_factory=dict)
    context_window_compression:Optional[ContextWindowCompressionConfig] | None = field(default_factory=dict)
    # TODO
    # proactivity: ProactivityConfig | None = field(default_factory=dict)
    # enable_affective_dialog: bool | None = field(default=None)
    
    @property
    def is_text_only_mode(self) -> bool:
        """Check if configured for text-only responses (no audio)"""
        return self.response_modalities == ["TEXT"]

Configuration for the Gemini Live API

Args

voice
Voice ID for audio output. Options: 'Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'. Defaults to 'Puck'
language_code
Language code for speech synthesis. Defaults to 'en-US'
temperature
Controls randomness in response generation. Higher values (e.g. 0.8) make output more random, lower values (e.g. 0.2) make it more focused. Defaults to None
top_p
Nucleus sampling parameter. Controls diversity via cumulative probability cutoff. Range 0-1. Defaults to None
top_k
Limits the number of tokens considered for each step of text generation. Defaults to None
candidate_count
Number of response candidates to generate. Defaults to 1
max_output_tokens
Maximum number of tokens allowed in model responses. Defaults to None
presence_penalty
Penalizes tokens based on their presence in the text so far. Range -2.0 to 2.0. Defaults to None
frequency_penalty
Penalizes tokens based on their frequency in the text so far. Range -2.0 to 2.0. Defaults to None
response_modalities
List of enabled response types. Options: ["TEXT", "AUDIO"]. Defaults to ["AUDIO"]
input_audio_transcription
Configuration for audio transcription features. Defaults to None
output_audio_transcription
Configuration for audio transcription features. Defaults to None
thinking_config
Configuration for model's "thinking" behavior. Defaults to None
realtime_input_config
Configuration for realtime input handling. Defaults to None
context_window_compression
Configuration for context window compression. Defaults to None

Instance variables

var candidate_count : int | None
var context_window_compression : google.genai.types.ContextWindowCompressionConfig | None
var frequency_penalty : float | None
var input_audio_transcription : google.genai.types.AudioTranscriptionConfig | None
prop is_text_only_mode : bool
Expand source code
@property
def is_text_only_mode(self) -> bool:
    """Check if configured for text-only responses (no audio)"""
    return self.response_modalities == ["TEXT"]

Check if configured for text-only responses (no audio)

var language_code : str | None
var max_output_tokens : int | None
var output_audio_transcription : google.genai.types.AudioTranscriptionConfig | None
var presence_penalty : float | None
var realtime_input_config : google.genai.types.RealtimeInputConfig | None
var response_modalities : List[google.genai.types.Modality] | None
var temperature : float | None
var thinking_config : google.genai.types.ThinkingConfig | None
var top_k : float | None
var top_p : float | None
var voice : Literal['Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'] | None
class GeminiRealtime (*,
api_key: str | None = None,
model: str | None = 'gemini-3.1-flash-live-preview',
config: GeminiLiveConfig | None = None,
service_account_path: str | None = None,
vertexai: bool = False,
vertexai_config: VertexAIConfig | None = None)
Expand source code
class GeminiRealtime(RealtimeBaseModel[GeminiEventTypes]):
    """Gemini's realtime model for audio-only communication"""

    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str | None = "gemini-3.1-flash-live-preview",
        config: GeminiLiveConfig | None = None,
        service_account_path: str | None = None,
        vertexai: bool = False,
        vertexai_config: VertexAIConfig | None = None,
    ) -> None:
        """
        Initialize Gemini realtime model.

        Args:
            api_key: Gemini API key. If not provided, will attempt to read from GOOGLE_API_KEY env var
            service_account_path: Path to Google service account JSON file.
            model: The Gemini model identifier to use (e.g. 'gemini-pro', 'gemini-pro-vision').
            config: Optional configuration object for customizing model behavior. Contains settings for:
                   - voice: Voice ID for audio output ('Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'). Defaults to 'Puck'
                   - language_code: Language code for speech synthesis. Defaults to 'en-US'
                   - temperature: Controls randomness in responses. Higher values (0.8) more random, lower (0.2) more focused
                   - top_p: Nucleus sampling parameter. Controls diversity via probability cutoff. Range 0-1
                   - top_k: Limits number of tokens considered for each generation step
                   - candidate_count: Number of response candidates to generate. Defaults to 1
                   - max_output_tokens: Maximum tokens allowed in model responses
                   - presence_penalty: Penalizes token presence in text. Range -2.0 to 2.0
                   - frequency_penalty: Penalizes token frequency in text. Range -2.0 to 2.0
                   - response_modalities: List of enabled response types ["TEXT", "AUDIO"]. Defaults to ["AUDIO"]
                   - input_audio_transcription: Configuration for audio transcription features
                   - output_audio_transcription: Configuration for audio transcription features
                   - thinking_config: Configuration for model's "thinking" behavior
                   - realtime_input_config: Configuration for realtime input handling
        Raises:
            ValueError: If neither api_key nor service_account_path is provided and no GOOGLE_API_KEY in env vars
        """
        super().__init__()
        self.model = model
        self.vertexai = vertexai
        self.vertexai_config = vertexai_config
        self._init_client(api_key, service_account_path)
        self._session: Optional[GeminiSession] = None
        self._closing = False
        self._session_should_close = asyncio.Event()
        self._main_task = None
        self._buffered_audio = bytearray()
        self._is_speaking = False
        self._last_audio_time = 0.0
        self._audio_processing_task = None
        self.tools = []
        self._instructions: str = (
            "You are a helpful voice assistant that can answer questions and help with tasks."
        )
        self.config: GeminiLiveConfig = config or GeminiLiveConfig()
        self.target_sample_rate = 24000
        self.input_sample_rate = 48000
        self._user_speaking = False
        self._agent_speaking = False

    def set_agent(self, agent: Agent) -> None:
        self._instructions = agent.instructions
        self.tools = agent.tools
        self.tools_formatted = self._convert_tools_to_gemini_format(self.tools)
        self.formatted_tools = self.tools_formatted

    def _init_client(self, api_key: str | None, service_account_path: str | None):
        if self.vertexai:
            project_id = (self.vertexai_config.project_id if self.vertexai_config else None) or os.getenv("GOOGLE_CLOUD_PROJECT")
            if project_id is None:
                service_account_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
                if service_account_path:
                    from google.oauth2 import service_account
                    creds = service_account.Credentials.from_service_account_file(service_account_path)
                    project_id = creds.project_id

            location = (self.vertexai_config.location if self.vertexai_config else None) or os.getenv("GOOGLE_CLOUD_LOCATION") or "us-central1"

            self.client = genai.Client(
                vertexai=True,
                project=project_id,
                location=location,
            )
        else:
            if service_account_path:
                os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_path
                self.client = genai.Client(http_options={"api_version": "v1beta"})
            else:
                self.api_key = api_key or os.getenv("GOOGLE_API_KEY")
                if not self.api_key:
                    self.emit("error", "GOOGLE_API_KEY or service account required")
                    raise ValueError("GOOGLE_API_KEY or service account required")
                self.client = genai.Client(
                    api_key=self.api_key, http_options={"api_version": "v1beta"}
                )

    async def connect(self) -> None:
        """Connect to the Gemini Live API"""
        if self._session:
            await self._cleanup_session(self._session)
            self._session = None

        self._closing = False
        self._session_should_close.clear()

        try:

            if not self.audio_track and "AUDIO" in self.config.response_modalities:
                logger.warning("audio_track not set — it should be assigned externally by the pipeline before connect().")

            try:
                initial_session = await self._create_session()
                if initial_session:
                    self._session = initial_session
            except Exception as e:
                self.emit("error", f"Initial session creation failed, will retry: {e}")

            if not self._main_task or self._main_task.done():
                self._main_task = asyncio.create_task(
                    self._session_loop(), name="gemini-main-loop"
                )

        except Exception as e:
            self.emit("error", f"Error connecting to Gemini Live API: {e}")
            traceback.print_exc()
            raise

    async def _create_session(self) -> GeminiSession:
        """Create a new Gemini Live API session"""
        has_audio_output = "AUDIO" in self.config.response_modalities
        
        speech_config_obj = None
        if has_audio_output:
            speech_config_obj = SpeechConfig(
                voice_config=VoiceConfig(
                    prebuilt_voice_config=PrebuiltVoiceConfig(
                        voice_name=self.config.voice
                    )
                ),
                language_code=self.config.language_code,
            )

        logger.info(f"Creating Gemini Session with modalities: {self.config.response_modalities}, has_audio_output: {has_audio_output}")

        config = LiveConnectConfig(
            response_modalities=self.config.response_modalities,
            generation_config=GenerationConfig(
                candidate_count=(
                    self.config.candidate_count
                    if self.config.candidate_count is not None
                    else None
                ),
                temperature=(
                    self.config.temperature
                    if self.config.temperature is not None
                    else None
                ),
                top_p=self.config.top_p if self.config.top_p is not None else None,
                top_k=self.config.top_k if self.config.top_k is not None else None,
                max_output_tokens=(
                    self.config.max_output_tokens
                    if self.config.max_output_tokens is not None
                    else None
                ),
                presence_penalty=(
                    self.config.presence_penalty
                    if self.config.presence_penalty is not None
                    else None
                ),
                frequency_penalty=(
                    self.config.frequency_penalty
                    if self.config.frequency_penalty is not None
                    else None
                ),
            ),
            system_instruction=self._instructions,
            speech_config=speech_config_obj,
            tools=self.formatted_tools or None,
            input_audio_transcription=self.config.input_audio_transcription if has_audio_output else None,
            output_audio_transcription=self.config.output_audio_transcription if has_audio_output else None,
            realtime_input_config=self.config.realtime_input_config if self.config.realtime_input_config else None, 
            context_window_compression=self.config.context_window_compression if self.config.context_window_compression else None
        )

        if self.is_native_audio_model():
            config = config.model_dump()
            if self.config.thinking_config:
                config["generation_config"]["thinking_config"] = self.config.thinking_config
                logger.info("Added thinking_config to generation_config")

        try:
            session_cm = self.client.aio.live.connect(model=self.model, config=config)
            session = await session_cm.__aenter__()
            return GeminiSession(session=session, session_cm=session_cm, tasks=[])
        except Exception as e:
            self.emit("error", f"Connection error: {e}")
            traceback.print_exc()
            raise

    async def _session_loop(self) -> None:
        """Main processing loop for Gemini sessions"""
        reconnect_attempts = 0
        max_reconnect_attempts = 5
        reconnect_delay = 1

        while not self._closing:
            if not self._session:
                try:
                    self._session = await self._create_session()
                    reconnect_attempts = 0
                    reconnect_delay = 1
                except Exception as e:
                    reconnect_attempts += 1
                    reconnect_delay = min(30, reconnect_delay * 2)
                    self.emit(
                        "error",
                        f"session creation attempt {reconnect_attempts} failed: {e}",
                    )
                    if reconnect_attempts >= max_reconnect_attempts:
                        self.emit("error", "Max reconnection attempts reached")
                        break
                    await asyncio.sleep(reconnect_delay)
                    continue

            session = self._session

            recv_task = asyncio.create_task(
                self._receive_loop(session), name="gemini_receive"
            )
            keep_alive_task = asyncio.create_task(
                self._keep_alive(session), name="gemini_keepalive"
            )
            session.tasks.extend([recv_task, keep_alive_task])

            try:
                await self._session_should_close.wait()
            finally:
                for task in session.tasks:
                    if not task.done():
                        task.cancel()
                try:
                    await asyncio.gather(*session.tasks, return_exceptions=True)
                except Exception as e:
                    self.emit("error", f"Error during task cleanup: {e}")

            if not self._closing:
                await self._cleanup_session(session)
                self._session = None
                await asyncio.sleep(reconnect_delay)
                self._session_should_close.clear()

    async def _handle_tool_calls(
        self, response, active_response_id: str, accumulated_input_text: str
    ) -> str:
        """Handle tool calls from Gemini"""
        if not response.tool_call:
            return accumulated_input_text
        for tool_call in response.tool_call.function_calls:

            if self.tools:
                for tool in self.tools:
                    if not is_function_tool(tool):
                        continue
                    tool_info = get_tool_info(tool)
                    if tool_info.name == tool_call.name:
                        if accumulated_input_text:
                            metrics_collector.set_user_transcript(
                                accumulated_input_text
                            )
                            accumulated_input_text = ""
                        try:
                            metrics_collector.add_function_tool_call(
                                tool_name=tool_info.name
                            )
                            result = await tool(**tool_call.args)
                            await self.send_tool_response(
                                [
                                    FunctionResponse(
                                        id=tool_call.id,
                                        name=tool_call.name,
                                        response=result,
                                    )
                                ]
                            )
                        except Exception as e:
                            self.emit(
                                "error",
                                f"Error executing function {tool_call.name}: {e}",
                            )
                            traceback.print_exc()
                        break
        return accumulated_input_text

    async def _receive_loop(self, session: GeminiSession) -> None:
        """Process incoming messages from Gemini"""
        try:
            active_response_id = None
            chunk_number = 0
            accumulated_text = ""
            final_transcription = ""
            accumulated_input_text = ""

            while not self._closing:
                try:
                    async for response in session.session.receive():
                        if self._closing:
                            break

                        if response.tool_call:
                            accumulated_input_text = await self._handle_tool_calls(
                                response, active_response_id, accumulated_input_text
                            )

                        if server_content := response.server_content:
                            if (
                                input_transcription := server_content.input_transcription
                            ):
                                if input_transcription.text:
                                    if not self._user_speaking:
                                        self.emit("user_speech_started", {})
                                        metrics_collector.on_user_speech_start()
                                        metrics_collector.start_turn()
                                        self._user_speaking = True
                                    self.emit("user_speech_ended", {"type": "done"})
                                    accumulated_input_text += input_transcription.text
                                    global_event_emitter.emit(
                                        "input_transcription",
                                        {
                                            "text": accumulated_input_text,
                                            "is_final": False,
                                        },
                                    )

                                    # If agent is still producing audio, re-emit to restore SPEAKING state
                                    # (user speech events override it to LISTENING → THINKING)
                                    if self._agent_speaking:
                                        self.emit("agent_speech_started", {})
                                        metrics_collector.on_agent_speech_start()

                            if (
                                output_transcription := server_content.output_transcription
                            ):
                                if output_transcription.text:
                                    final_transcription += output_transcription.text
                                    global_event_emitter.emit(
                                        "output_transcription",
                                        {
                                            "text": final_transcription,
                                            "is_final": False,
                                        },
                                    )

                            if not active_response_id:
                                active_response_id = f"response_{id(response)}"
                                chunk_number = 0
                                accumulated_text = ""
                            if server_content.interrupted:
                                if self.current_utterance and not self.current_utterance.is_interruptible:
                                    logger.info("Interruption is disabled for the current utterance. Ignoring server interrupt signal.")
                                    continue
                                
                                if active_response_id:
                                    active_response_id = None
                                    accumulated_text = ""
                                if (
                                    self.audio_track
                                    and "AUDIO" in self.config.response_modalities
                                ):
                                    self.audio_track.interrupt()
                                continue

                            if model_turn := server_content.model_turn:
                                if self._user_speaking:
                                    metrics_collector.on_user_speech_end()
                                    self._user_speaking = False
                                if accumulated_input_text:
                                    metrics_collector.set_user_transcript(
                                        accumulated_input_text
                                    )
                                    try:
                                        self.emit(
                                            "realtime_model_transcription",
                                            {
                                                "role": "user",
                                                "text": accumulated_input_text,
                                                "is_final": True,
                                            },
                                        )
                                    except Exception:
                                        pass
                                    accumulated_input_text = ""
                                for part in model_turn.parts:
                                    if (
                                        hasattr(part, "inline_data")
                                        and part.inline_data
                                    ):
                                        raw_audio = part.inline_data.data
                                        if not raw_audio or len(raw_audio) < 2:
                                            continue

                                        if "AUDIO" in self.config.response_modalities:
                                            chunk_number += 1
                                            if not self._agent_speaking:
                                                self.emit("agent_speech_started", {})
                                                metrics_collector.on_agent_speech_start()
                                                self._agent_speaking = True

                                            if self.audio_track and self.loop:
                                                if len(raw_audio) % 2 != 0:
                                                    raw_audio += b"\x00"

                                                asyncio.create_task(
                                                    self.audio_track.add_new_bytes(
                                                        raw_audio
                                                    ),
                                                    name=f"audio_chunk_{chunk_number}",
                                                )

                                    elif hasattr(part, "text") and part.text:
                                        accumulated_text += part.text

                            if server_content.turn_complete and active_response_id:
                                usage_metadata = self.get_usage_details(response.usage_metadata)
                                metrics_collector.set_realtime_usage(usage_metadata)
                                if accumulated_input_text:
                                    metrics_collector.set_user_transcript(
                                        accumulated_input_text
                                    )
                                    accumulated_input_text = ""
                                if final_transcription:
                                    metrics_collector.set_agent_response(
                                        final_transcription
                                    )
                                    try:
                                        self.emit(
                                            "realtime_model_transcription",
                                            {
                                                "role": "agent",
                                                "text": final_transcription,
                                                "is_final": True,
                                            },
                                        )
                                    except Exception:
                                        pass

                                response_text = None
                                if (
                                    "TEXT" in self.config.response_modalities
                                    and accumulated_text
                                ):
                                    response_text = accumulated_text
                                    global_event_emitter.emit(
                                        "text_response",
                                        {"type": "done", "text": accumulated_text},
                                    )
                                elif (
                                    "TEXT" not in self.config.response_modalities
                                    and final_transcription
                                ):
                                    response_text = final_transcription
                                    global_event_emitter.emit(
                                        "text_response",
                                        {"type": "done", "text": final_transcription},
                                    )
                                
                                if response_text:
                                    self.emit("llm_text_output", {"text": response_text})
                                
                                active_response_id = None
                                accumulated_text = ""
                                final_transcription = ""
                                if self.audio_track:
                                    self.audio_track.mark_synthesis_complete()
                                self._agent_speaking = False

                except Exception as e:
                    err_msg = str(e)
                    is_server_disconnect = (
                        "ConnectionClosed" in type(e).__name__ 
                        or "1011" in err_msg 
                        or "1000" in err_msg
                    )

                    if is_server_disconnect:
                        logger.info(f"Session ended by server ({err_msg}). Stopping local connection.")
                        # CRITICAL: We set _closing to True to stop the outer loop
                        # from attempting to reconnect.
                        self._closing = True
                        self._session_should_close.set()
                        break
                    
                    logger.error(f"Error in receive loop: {e}")
                    traceback.print_exc()
                    self._session_should_close.set()
                    break

                await asyncio.sleep(0.1)

        except asyncio.CancelledError:
            self.emit("error", "Receive loop cancelled")
        except Exception as e:
            self.emit("error", e)
            traceback.print_exc()
            self._session_should_close.set()

    def _is_gemini_3(self) -> bool:
        """Check if the model is a Gemini 3.x family model."""
        return "gemini-3" in self.model

    async def _send_text(self, session: AsyncSession, text: str, *, turn_complete: bool = True) -> None:
        """Send text via the appropriate method for the current model.

        Gemini 3.x restricts ``send_client_content`` to initial history
        seeding, so all runtime text must go through ``send_realtime_input``.
        When ``turn_complete`` is True, we bracket the text with
        activity_start / activity_end signals so the model treats it as
        a complete user turn and generates a response.
        """
        if self._is_gemini_3():
            if turn_complete:
                await session.send_realtime_input(activity_start={})
            await session.send_realtime_input(text=text)
            if turn_complete:
                await session.send_realtime_input(activity_end={})
        else:
            await session.send_client_content(
                turns=[Content(parts=[Part(text=text)], role="user")],
                turn_complete=turn_complete,
            )

    async def _keep_alive(self, session: GeminiSession) -> None:
        """Send periodic keep-alive messages via silent audio.

        Uses a tiny silent audio chunk instead of text to avoid being
        interpreted as user input (which would trigger model responses).
        """
        # Use the same sample rate as handle_audio_input to avoid
        # "Sample rate changed" errors (API locks to the first rate it sees).
        sample_rate = 24000 if self.vertexai else 48000
        # 10ms of silence at the chosen rate (16-bit PCM = 2 bytes/sample)
        silence_samples = sample_rate // 100  # 10ms worth
        SILENT_AUDIO = b"\x00" * (silence_samples * 2)

        try:
            while not self._closing:
                await asyncio.sleep(10)

                if self._closing:
                    break

                try:
                    await session.session.send_realtime_input(
                        audio=Blob(data=SILENT_AUDIO, mime_type=f"audio/pcm;rate={sample_rate}")
                    )
                except Exception as e:
                    if "closed" in str(e).lower() or "1011" in str(e):
                        logger.info("Keep-alive detected closed session. Stopping.")
                        self._closing = True
                        self._session_should_close.set()
                        break
                    self.emit("error", f"Keep-alive error: {e}")
        except asyncio.CancelledError:
            pass
        except Exception as e:
            logger.error(f"Error in keep-alive: {e}")
            self._session_should_close.set()

    def _resample_audio(self, audio_bytes: bytes) -> bytes:
        """Resample audio from input sample rate to output sample rate and convert to mono."""
        try:
            if not audio_bytes:
                return b''

            raw_audio = np.frombuffer(audio_bytes, dtype=np.int16)
            if raw_audio.size == 0:
                return b''

            if self.vertexai:
                stereo_audio = raw_audio.reshape(-1, 2)
                mono_audio = stereo_audio.astype(np.float32).mean(axis=1)
            else:
                mono_audio = raw_audio.astype(np.float32)

            if self.input_sample_rate != self.target_sample_rate:
                output_length = int(len(mono_audio) * self.target_sample_rate / self.input_sample_rate)
                resampled_data = signal.resample(mono_audio, output_length)
            else:
                resampled_data = mono_audio

            resampled_data = np.clip(resampled_data, -32767, 32767)
            return resampled_data.astype(np.int16).tobytes()

        except Exception as e:
            logger.error(f"Error resampling audio: {e}")
            return b''


    async def handle_audio_input(self, audio_data: bytes) -> None:
        """Handle incoming audio data from the user"""
        if not self._session or self._closing:
            return

        if self.current_utterance and not self.current_utterance.is_interruptible:
            logger.info("Interruption is disabled for the current utterance. Not processing audio input.")
            return

        if "AUDIO" not in self.config.response_modalities:
            return

        AUDIO_SAMPLE_RATE = 24000 if self.vertexai else 48000
        self.target_sample_rate = 16000 if self.vertexai else self.target_sample_rate
        audio_data = self._resample_audio(audio_data)
        try:
            await self._session.session.send_realtime_input(
                audio=Blob(data=audio_data, mime_type=f"audio/pcm;rate={AUDIO_SAMPLE_RATE}")
            )
        except Exception as e:
            if "1011" in str(e) or "closed" in str(e).lower():
                logger.info("Cannot send audio (session closed).")
                self._closing = True
                self._session_should_close.set()
            else:
                logger.error(f"Error sending audio: {e}")

    async def interrupt(self) -> None:
        """Interrupt current response"""
        if not self._session or self._closing:
            return
        
        if self.current_utterance and not self.current_utterance.is_interruptible:
            logger.info("Interruption is disabled for the current utterance. Not interrupting Google Live API.")
            return
        
        try:
            await self._send_text(self._session.session, "stop")
            self.emit("agent_speech_ended", {})
            metrics_collector.on_interrupted()
            if self.audio_track and "AUDIO" in self.config.response_modalities:
                self.audio_track.interrupt()
        except Exception as e:
            self.emit("error", f"Interrupt error: {e}")

    async def send_message(self, message: str) -> None:
        """Send a text message to get audio response"""
        retry_count = 0
        max_retries = 5
        while not self._session or not self._session.session:
            if retry_count >= max_retries:
                raise RuntimeError("No active Gemini session after maximum retries")
            logger.debug("No active session, waiting for connection...")
            await asyncio.sleep(1)
            retry_count += 1

        try:
            if self._is_gemini_3():
                prompt_text = (
                    "[SYSTEM INSTRUCTION] Repeat the following message out loud VERBATIM. "
                    "Do NOT add any commentary, do NOT answer or continue the conversation. "
                    "Only say these exact words then stop: " + message
                )
            else:
                prompt_text = (
                    "Please start the conversation by saying exactly this, "
                    "without any additional text: '" + message + "'"
                )
            await self._send_text(self._session.session, prompt_text)
            await asyncio.sleep(0.1)
        except Exception as e:
            self.emit("error", f"Error sending message: {e}")
            self._session_should_close.set()

    async def send_text_message(self, message: str) -> None:
        """Send a text message for text-only communication"""
        retry_count = 0
        max_retries = 5
        while not self._session or not self._session.session:
            if retry_count >= max_retries:
                raise RuntimeError("No active Gemini session after maximum retries")
            self.emit("error", "No active session, waiting for connection...")
            await asyncio.sleep(1)
            retry_count += 1

        try:
            await self._send_text(self._session.session, message)
        except Exception as e:
            self.emit("error", f"Error sending text message: {e}")
            self._session_should_close.set()

    async def handle_video_input(self, video_data: av.VideoFrame) -> None:
            """Improved video input handler with error prevention"""
            if not self._session or self._closing:
                return

            try:
                if not video_data or not video_data.planes:
                    return

                now = time.monotonic()
                if (
                    hasattr(self, "_last_video_frame")
                    and (now - self._last_video_frame) < 0.5
                ):
                    return
                self._last_video_frame = now

                processed_jpeg = encode_image(video_data, DEFAULT_IMAGE_ENCODE_OPTIONS)

                if not processed_jpeg or len(processed_jpeg) < 100:
                    logger.warning("Invalid JPEG data generated")
                    return

                await self._session.session.send_realtime_input(
                    video=Blob(data=processed_jpeg, mime_type="image/jpeg")
                )
            except Exception as e:
                self.emit("error", f"Video processing error: {str(e)}")

    async def send_message_with_frames(self, message: str, frames: list[av.VideoFrame]) -> None:
        """Send a text message with video frames for vision-enabled communication"""
        retry_count = 0
        max_retries = 5
        while not self._session or not self._session.session:
            if retry_count >= max_retries:
                raise RuntimeError("No active Gemini session after maximum retries")
            self.emit("error", "No active session, waiting for connection...")
            await asyncio.sleep(1)
            retry_count += 1

        try:
            parts = [Part(text=message)]
            
            for frame in frames:
                try:
                    processed_jpeg = encode_image(frame, DEFAULT_IMAGE_ENCODE_OPTIONS)
                    if processed_jpeg and len(processed_jpeg) >= 100:
                        parts.append(
                            Part(
                                inline_data=Blob(
                                    data=processed_jpeg,
                                    mime_type="image/jpeg"
                                )
                            )
                        )
                    else:
                        logger.warning("Invalid JPEG data generated for frame")
                except Exception as e:
                    logger.error(f"Error processing frame for send_message_with_frames: {e}")
            
            await self._session.session.send_client_content(
                turns=Content(parts=parts, role="user"),
                turn_complete=True,
            )
        except Exception as e:
            self.emit("error", f"Error sending message with frames: {e}")
            self._session_should_close.set()

    async def _cleanup_session(self, session: GeminiSession) -> None:
        """Clean up a session's resources"""
        for task in session.tasks:
            if not task.done():
                task.cancel()

        try:
            await session.session_cm.__aexit__(None, None, None)
        except Exception as e:
            if "1011" not in str(e) and "closed" not in str(e).lower():
                self.emit("error", f"Error closing session: {e}")

    async def aclose(self) -> None:
        """Clean up all resources"""
        if self._closing:
            return

        self._closing = True
        self._session_should_close.set()

        if self._audio_processing_task and not self._audio_processing_task.done():
            self._audio_processing_task.cancel()
            try:
                await asyncio.wait_for(self._audio_processing_task, timeout=1.0)
            except (asyncio.CancelledError, asyncio.TimeoutError):
                pass

        if self._main_task and not self._main_task.done():
            self._main_task.cancel()
            try:
                await asyncio.wait_for(self._main_task, timeout=2.0)
            except (asyncio.CancelledError, asyncio.TimeoutError):
                pass

        if self._session:
            await self._cleanup_session(self._session)
            self._session = None

        self._buffered_audio = bytearray()
        await super().aclose()

    async def _reconnect(self) -> None:
        if self._session:
            await self._cleanup_session(self._session)
            self._session = None
        self._session = await self._create_session()

    async def send_tool_response(
        self, function_responses: List[FunctionResponse]
    ) -> None:
        """Send tool responses back to Gemini"""
        if not self._session or not self._session.session:
            return

        try:
            await self._session.session.send_tool_response(
                function_responses=function_responses
            )
        except Exception as e:
            self.emit("error", f"Error sending tool response: {e}")
            self._session_should_close.set()

    def _convert_tools_to_gemini_format(self, tools: List[FunctionTool]) -> List[Tool]:
        """Convert tool definitions to Gemini's Tool format"""
        function_declarations = []

        for tool in tools:
            if not is_function_tool(tool):
                continue

            try:
                function_declaration = build_gemini_schema(tool)
                function_declarations.append(function_declaration)
            except Exception as e:
                self.emit("error", f"Failed to format tool {tool}: {e}")
                continue
        return (
            [Tool(function_declarations=function_declarations)]
            if function_declarations
            else []
        )
    def is_native_audio_model(self) -> bool:
        """Check if the model is a native audio model based on its name"""
        return self._is_gemini_3() or "gemini-2.5-flash-native-audio-preview-12-2025" in self.model


    def get_usage_details(self,usage_metadata) -> dict:
        """
        Flatten Gemini Live UsageMetadata into the same pricing dictionary format.

        Supports TEXT/AUDIO/IMAGE breakdown + thoughts tokens.
        """
        total_tokens = getattr(usage_metadata, "total_token_count", 0)
        input_tokens = getattr(usage_metadata, "prompt_token_count", 0)
        output_tokens = getattr(usage_metadata, "response_token_count", 0)

        thoughts_tokens = getattr(usage_metadata, "thoughts_token_count", 0)

        prompt_details = getattr(usage_metadata, "prompt_tokens_details", None) or []
        response_details = getattr(usage_metadata, "response_tokens_details", None) or []

        input_text_tokens = 0
        input_audio_tokens = 0
        input_image_tokens = 0

        output_text_tokens = 0
        output_audio_tokens = 0
        output_image_tokens = 0

        for item in prompt_details:
            modality = str(getattr(item, "modality", "")).upper()
            count = getattr(item, "token_count", 0)

            if "TEXT" in modality:
                input_text_tokens += count
            elif "AUDIO" in modality:
                input_audio_tokens += count
            elif "IMAGE" in modality:
                input_image_tokens += count

        for item in response_details:
            modality = str(getattr(item, "modality", "")).upper()
            count = getattr(item, "token_count", 0)

            if "TEXT" in modality:
                output_text_tokens += count
            elif "AUDIO" in modality:
                output_audio_tokens += count
            elif "IMAGE" in modality:
                output_image_tokens += count

        return {
            "total_tokens": total_tokens,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,

            "input_text_tokens": input_text_tokens,
            "input_audio_tokens": input_audio_tokens,
            "input_image_tokens": input_image_tokens,

            "output_text_tokens": output_text_tokens,
            "output_audio_tokens": output_audio_tokens,
            "output_image_tokens": output_image_tokens,

            "thoughts_tokens": thoughts_tokens,
        }

Gemini's realtime model for audio-only communication

Initialize Gemini realtime model.

Args

api_key
Gemini API key. If not provided, will attempt to read from GOOGLE_API_KEY env var
service_account_path
Path to Google service account JSON file.
model
The Gemini model identifier to use (e.g. 'gemini-pro', 'gemini-pro-vision').
config
Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID for audio output ('Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'). Defaults to 'Puck' - language_code: Language code for speech synthesis. Defaults to 'en-US' - temperature: Controls randomness in responses. Higher values (0.8) more random, lower (0.2) more focused - top_p: Nucleus sampling parameter. Controls diversity via probability cutoff. Range 0-1 - top_k: Limits number of tokens considered for each generation step - candidate_count: Number of response candidates to generate. Defaults to 1 - max_output_tokens: Maximum tokens allowed in model responses - presence_penalty: Penalizes token presence in text. Range -2.0 to 2.0 - frequency_penalty: Penalizes token frequency in text. Range -2.0 to 2.0 - response_modalities: List of enabled response types ["TEXT", "AUDIO"]. Defaults to ["AUDIO"] - input_audio_transcription: Configuration for audio transcription features - output_audio_transcription: Configuration for audio transcription features - thinking_config: Configuration for model's "thinking" behavior - realtime_input_config: Configuration for realtime input handling

Raises

ValueError
If neither api_key nor service_account_path is provided and no GOOGLE_API_KEY in env vars

Ancestors

  • videosdk.agents.realtime_base_model.RealtimeBaseModel
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic
  • abc.ABC

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Clean up all resources"""
    if self._closing:
        return

    self._closing = True
    self._session_should_close.set()

    if self._audio_processing_task and not self._audio_processing_task.done():
        self._audio_processing_task.cancel()
        try:
            await asyncio.wait_for(self._audio_processing_task, timeout=1.0)
        except (asyncio.CancelledError, asyncio.TimeoutError):
            pass

    if self._main_task and not self._main_task.done():
        self._main_task.cancel()
        try:
            await asyncio.wait_for(self._main_task, timeout=2.0)
        except (asyncio.CancelledError, asyncio.TimeoutError):
            pass

    if self._session:
        await self._cleanup_session(self._session)
        self._session = None

    self._buffered_audio = bytearray()
    await super().aclose()

Clean up all resources

async def connect(self) ‑> None
Expand source code
async def connect(self) -> None:
    """Connect to the Gemini Live API"""
    if self._session:
        await self._cleanup_session(self._session)
        self._session = None

    self._closing = False
    self._session_should_close.clear()

    try:

        if not self.audio_track and "AUDIO" in self.config.response_modalities:
            logger.warning("audio_track not set — it should be assigned externally by the pipeline before connect().")

        try:
            initial_session = await self._create_session()
            if initial_session:
                self._session = initial_session
        except Exception as e:
            self.emit("error", f"Initial session creation failed, will retry: {e}")

        if not self._main_task or self._main_task.done():
            self._main_task = asyncio.create_task(
                self._session_loop(), name="gemini-main-loop"
            )

    except Exception as e:
        self.emit("error", f"Error connecting to Gemini Live API: {e}")
        traceback.print_exc()
        raise

Connect to the Gemini Live API

def get_usage_details(self, usage_metadata) ‑> dict
Expand source code
def get_usage_details(self,usage_metadata) -> dict:
    """
    Flatten Gemini Live UsageMetadata into the same pricing dictionary format.

    Supports TEXT/AUDIO/IMAGE breakdown + thoughts tokens.
    """
    total_tokens = getattr(usage_metadata, "total_token_count", 0)
    input_tokens = getattr(usage_metadata, "prompt_token_count", 0)
    output_tokens = getattr(usage_metadata, "response_token_count", 0)

    thoughts_tokens = getattr(usage_metadata, "thoughts_token_count", 0)

    prompt_details = getattr(usage_metadata, "prompt_tokens_details", None) or []
    response_details = getattr(usage_metadata, "response_tokens_details", None) or []

    input_text_tokens = 0
    input_audio_tokens = 0
    input_image_tokens = 0

    output_text_tokens = 0
    output_audio_tokens = 0
    output_image_tokens = 0

    for item in prompt_details:
        modality = str(getattr(item, "modality", "")).upper()
        count = getattr(item, "token_count", 0)

        if "TEXT" in modality:
            input_text_tokens += count
        elif "AUDIO" in modality:
            input_audio_tokens += count
        elif "IMAGE" in modality:
            input_image_tokens += count

    for item in response_details:
        modality = str(getattr(item, "modality", "")).upper()
        count = getattr(item, "token_count", 0)

        if "TEXT" in modality:
            output_text_tokens += count
        elif "AUDIO" in modality:
            output_audio_tokens += count
        elif "IMAGE" in modality:
            output_image_tokens += count

    return {
        "total_tokens": total_tokens,
        "input_tokens": input_tokens,
        "output_tokens": output_tokens,

        "input_text_tokens": input_text_tokens,
        "input_audio_tokens": input_audio_tokens,
        "input_image_tokens": input_image_tokens,

        "output_text_tokens": output_text_tokens,
        "output_audio_tokens": output_audio_tokens,
        "output_image_tokens": output_image_tokens,

        "thoughts_tokens": thoughts_tokens,
    }

Flatten Gemini Live UsageMetadata into the same pricing dictionary format.

Supports TEXT/AUDIO/IMAGE breakdown + thoughts tokens.

async def handle_audio_input(self, audio_data: bytes) ‑> None
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None:
    """Handle incoming audio data from the user"""
    if not self._session or self._closing:
        return

    if self.current_utterance and not self.current_utterance.is_interruptible:
        logger.info("Interruption is disabled for the current utterance. Not processing audio input.")
        return

    if "AUDIO" not in self.config.response_modalities:
        return

    AUDIO_SAMPLE_RATE = 24000 if self.vertexai else 48000
    self.target_sample_rate = 16000 if self.vertexai else self.target_sample_rate
    audio_data = self._resample_audio(audio_data)
    try:
        await self._session.session.send_realtime_input(
            audio=Blob(data=audio_data, mime_type=f"audio/pcm;rate={AUDIO_SAMPLE_RATE}")
        )
    except Exception as e:
        if "1011" in str(e) or "closed" in str(e).lower():
            logger.info("Cannot send audio (session closed).")
            self._closing = True
            self._session_should_close.set()
        else:
            logger.error(f"Error sending audio: {e}")

Handle incoming audio data from the user

async def handle_video_input(self, video_data: av.VideoFrame) ‑> None
Expand source code
async def handle_video_input(self, video_data: av.VideoFrame) -> None:
        """Improved video input handler with error prevention"""
        if not self._session or self._closing:
            return

        try:
            if not video_data or not video_data.planes:
                return

            now = time.monotonic()
            if (
                hasattr(self, "_last_video_frame")
                and (now - self._last_video_frame) < 0.5
            ):
                return
            self._last_video_frame = now

            processed_jpeg = encode_image(video_data, DEFAULT_IMAGE_ENCODE_OPTIONS)

            if not processed_jpeg or len(processed_jpeg) < 100:
                logger.warning("Invalid JPEG data generated")
                return

            await self._session.session.send_realtime_input(
                video=Blob(data=processed_jpeg, mime_type="image/jpeg")
            )
        except Exception as e:
            self.emit("error", f"Video processing error: {str(e)}")

Improved video input handler with error prevention

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt current response"""
    if not self._session or self._closing:
        return
    
    if self.current_utterance and not self.current_utterance.is_interruptible:
        logger.info("Interruption is disabled for the current utterance. Not interrupting Google Live API.")
        return
    
    try:
        await self._send_text(self._session.session, "stop")
        self.emit("agent_speech_ended", {})
        metrics_collector.on_interrupted()
        if self.audio_track and "AUDIO" in self.config.response_modalities:
            self.audio_track.interrupt()
    except Exception as e:
        self.emit("error", f"Interrupt error: {e}")

Interrupt current response

def is_native_audio_model(self) ‑> bool
Expand source code
def is_native_audio_model(self) -> bool:
    """Check if the model is a native audio model based on its name"""
    return self._is_gemini_3() or "gemini-2.5-flash-native-audio-preview-12-2025" in self.model

Check if the model is a native audio model based on its name

async def send_message(self, message: str) ‑> None
Expand source code
async def send_message(self, message: str) -> None:
    """Send a text message to get audio response"""
    retry_count = 0
    max_retries = 5
    while not self._session or not self._session.session:
        if retry_count >= max_retries:
            raise RuntimeError("No active Gemini session after maximum retries")
        logger.debug("No active session, waiting for connection...")
        await asyncio.sleep(1)
        retry_count += 1

    try:
        if self._is_gemini_3():
            prompt_text = (
                "[SYSTEM INSTRUCTION] Repeat the following message out loud VERBATIM. "
                "Do NOT add any commentary, do NOT answer or continue the conversation. "
                "Only say these exact words then stop: " + message
            )
        else:
            prompt_text = (
                "Please start the conversation by saying exactly this, "
                "without any additional text: '" + message + "'"
            )
        await self._send_text(self._session.session, prompt_text)
        await asyncio.sleep(0.1)
    except Exception as e:
        self.emit("error", f"Error sending message: {e}")
        self._session_should_close.set()

Send a text message to get audio response

async def send_message_with_frames(self, message: str, frames: list[av.VideoFrame]) ‑> None
Expand source code
async def send_message_with_frames(self, message: str, frames: list[av.VideoFrame]) -> None:
    """Send a text message with video frames for vision-enabled communication"""
    retry_count = 0
    max_retries = 5
    while not self._session or not self._session.session:
        if retry_count >= max_retries:
            raise RuntimeError("No active Gemini session after maximum retries")
        self.emit("error", "No active session, waiting for connection...")
        await asyncio.sleep(1)
        retry_count += 1

    try:
        parts = [Part(text=message)]
        
        for frame in frames:
            try:
                processed_jpeg = encode_image(frame, DEFAULT_IMAGE_ENCODE_OPTIONS)
                if processed_jpeg and len(processed_jpeg) >= 100:
                    parts.append(
                        Part(
                            inline_data=Blob(
                                data=processed_jpeg,
                                mime_type="image/jpeg"
                            )
                        )
                    )
                else:
                    logger.warning("Invalid JPEG data generated for frame")
            except Exception as e:
                logger.error(f"Error processing frame for send_message_with_frames: {e}")
        
        await self._session.session.send_client_content(
            turns=Content(parts=parts, role="user"),
            turn_complete=True,
        )
    except Exception as e:
        self.emit("error", f"Error sending message with frames: {e}")
        self._session_should_close.set()

Send a text message with video frames for vision-enabled communication

async def send_text_message(self, message: str) ‑> None
Expand source code
async def send_text_message(self, message: str) -> None:
    """Send a text message for text-only communication"""
    retry_count = 0
    max_retries = 5
    while not self._session or not self._session.session:
        if retry_count >= max_retries:
            raise RuntimeError("No active Gemini session after maximum retries")
        self.emit("error", "No active session, waiting for connection...")
        await asyncio.sleep(1)
        retry_count += 1

    try:
        await self._send_text(self._session.session, message)
    except Exception as e:
        self.emit("error", f"Error sending text message: {e}")
        self._session_should_close.set()

Send a text message for text-only communication

async def send_tool_response(self, function_responses: List[FunctionResponse]) ‑> None
Expand source code
async def send_tool_response(
    self, function_responses: List[FunctionResponse]
) -> None:
    """Send tool responses back to Gemini"""
    if not self._session or not self._session.session:
        return

    try:
        await self._session.session.send_tool_response(
            function_responses=function_responses
        )
    except Exception as e:
        self.emit("error", f"Error sending tool response: {e}")
        self._session_should_close.set()

Send tool responses back to Gemini

def set_agent(self, agent: Agent) ‑> None
Expand source code
def set_agent(self, agent: Agent) -> None:
    self._instructions = agent.instructions
    self.tools = agent.tools
    self.tools_formatted = self._convert_tools_to_gemini_format(self.tools)
    self.formatted_tools = self.tools_formatted
class GoogleLLM (*,
api_key: str | None = None,
model: str = 'gemini-2.5-flash-lite',
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_output_tokens: int | None = None,
top_p: float | None = None,
top_k: int | None = None,
presence_penalty: float | None = None,
frequency_penalty: float | None = None,
vertexai: bool = False,
vertexai_config: VertexAIConfig | None = None,
thinking_budget: int | None = 0,
seed: int | None = None,
safety_settings: list | None = None,
http_options: Any | None = None,
thinking_level: str | None = None,
include_thoughts: bool | None = None)
Expand source code
class GoogleLLM(LLM):
    
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "gemini-2.5-flash-lite",
        temperature: float = 0.7,
        tool_choice: ToolChoice = "auto",
        max_output_tokens: int | None = None,
        top_p: float | None = None,
        top_k: int | None = None,
        presence_penalty: float | None = None,
        frequency_penalty: float | None = None,
        vertexai: bool = False,
        vertexai_config: VertexAIConfig | None = None,
        thinking_budget: int | None = 0,
        seed: int | None = None,
        safety_settings: list | None = None,
        http_options: Any | None = None,
        thinking_level: str | None = None,
        include_thoughts: bool | None = None,
    ) -> None:
        """Initialize the Google LLM plugin.

        Args:
            api_key: Google API key. Falls back to ``GOOGLE_API_KEY`` env var.
            model: Gemini model name. Defaults to "gemini-2.5-flash-lite".
            temperature: Sampling temperature.
            tool_choice: Controls which (if any) tool is called.
            max_output_tokens: Maximum tokens in the response.
            top_p: Nucleus sampling probability mass.
            top_k: Top-K tokens considered during sampling.
            presence_penalty: Penalise tokens that have already appeared.
            frequency_penalty: Penalise repeated tokens by frequency.
            vertexai: Use Vertex AI backend instead of the public Gemini API.
            vertexai_config: Vertex AI project/location override.
            thinking_budget: Token budget for extended thinking (Gemini 2.5).
                Set to ``None`` to disable the thinking config entirely.
            seed: Seed for deterministic sampling.
            safety_settings: List of ``types.SafetySetting`` / dicts forwarded to the API.
            http_options: ``types.HttpOptions`` for the underlying Google client.
            thinking_level: Thinking effort level for Gemini 3+ models
                (``"low"``, ``"medium"``, ``"high"``, ``"minimal"``).
            include_thoughts: Whether to surface model thoughts in the response.
        """
        super().__init__()
        
        self.api_key = api_key or os.getenv("GOOGLE_API_KEY")
        self.vertexai = vertexai
        self.vertexai_config = vertexai_config
        if not self.vertexai and not self.api_key:
            raise ValueError(
                "For VertexAI: Set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to the path of the service account key file. "
                "The Google Cloud project and location can be set via VertexAIConfig or the environment variables `GOOGLE_CLOUD_PROJECT` and `GOOGLE_CLOUD_LOCATION`. location defaults to `us-central1`. "
                "For Google Gemini API: Set the `api_key` argument or the `GOOGLE_API_KEY` environment variable."
            )
        self.model = model
        self.temperature = temperature
        self.tool_choice = tool_choice
        self.max_output_tokens = max_output_tokens
        self.top_p = top_p
        self.top_k = top_k
        self.presence_penalty = presence_penalty
        self.frequency_penalty = frequency_penalty
        self.thinking_budget = thinking_budget
        self.seed = seed
        self.safety_settings = safety_settings
        self.http_options = http_options
        self.thinking_level = thinking_level
        self.include_thoughts = include_thoughts
        self._cancelled = False
        self._thought_signatures: dict[str, bytes] = {}
        self._http_client: httpx.AsyncClient | None = None
        if self.vertexai:
            project_id = (self.vertexai_config.project_id if self.vertexai_config else None) or os.getenv("GOOGLE_CLOUD_PROJECT")
            if project_id is None:
                service_account_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
                if service_account_path:
                    from google.oauth2 import service_account
                    creds = service_account.Credentials.from_service_account_file(service_account_path)
                    project_id = creds.project_id

            location = (self.vertexai_config.location if self.vertexai_config else None) or os.getenv("GOOGLE_CLOUD_LOCATION") or "us-central1"
            self._client = Client(
                vertexai=True,
                project=project_id,
                location=location,
            )
        else:
            if not self.api_key:
                raise ValueError("GOOGLE_API_KEY required")
            self._client = Client(api_key=self.api_key)

    def _build_thinking_config(self) -> "types.ThinkingConfig | None":
        """Return the appropriate ThinkingConfig for the configured model."""
        if "gemini-3" in self.model:
            return types.ThinkingConfig(thinking_level=self.thinking_level or "low")
        if self.thinking_budget is not None:
            return types.ThinkingConfig(
                thinking_budget=self.thinking_budget,
                **({"include_thoughts": self.include_thoughts} if self.include_thoughts is not None else {}),
            )
        return None

    async def chat(
        self,
        messages: ChatContext,
        tools: list[FunctionTool] | None = None,
        conversational_graph: Any | None = None,
        **kwargs: Any
    ) -> AsyncIterator[LLMResponse]:
        """
        Implement chat functionality using Google's Gemini API.

        Args:
            messages: ChatContext containing conversation history.
            tools: Optional list of function tools available to the model.
            **kwargs: Additional arguments passed to the Google API.

        Yields:
            LLMResponse objects containing the model's responses.
        """
        self._cancelled = False
        
        try:
            (
                contents,
                system_instruction,
            ) = await self._convert_messages_to_contents_async(messages)
            config_params: dict = {
                "temperature": self.temperature,
                **kwargs
            }
            thinking_config = self._build_thinking_config()
            if thinking_config is not None:
                config_params["thinking_config"] = thinking_config
            if conversational_graph:
                config_params["response_mime_type"] = "application/json"
                config_params["response_json_schema"] = ConversationalGraphResponse.model_json_schema()
            
            if system_instruction:
                config_params["system_instruction"] = [types.Part(text=system_instruction)]
            
            if self.max_output_tokens is not None:
                config_params["max_output_tokens"] = self.max_output_tokens
            if self.top_p is not None:
                config_params["top_p"] = self.top_p
            if self.top_k is not None:
                config_params["top_k"] = self.top_k
            if self.presence_penalty is not None:
                config_params["presence_penalty"] = self.presence_penalty
            if self.frequency_penalty is not None:
                config_params["frequency_penalty"] = self.frequency_penalty
            if self.seed is not None:
                config_params["seed"] = self.seed
            if self.safety_settings is not None:
                config_params["safety_settings"] = self.safety_settings
            if self.http_options is not None:
                config_params["http_options"] = self.http_options

            if tools:
                function_declarations = []
                for tool in tools:
                    if is_function_tool(tool):
                        try:
                            gemini_tool = build_gemini_schema(tool)
                            function_declarations.append(gemini_tool)
                        except Exception as e:
                            logger.error(f"Failed to format tool {tool}: {e}")
                            continue
                
                if function_declarations:
                    config_params["tools"] = [types.Tool(function_declarations=function_declarations)]
                    
                    if self.tool_choice == "required":
                        config_params["tool_config"] = types.ToolConfig(
                            function_calling_config=types.FunctionCallingConfig(
                                mode=types.FunctionCallingConfigMode.ANY
                            )
                        )
                    elif self.tool_choice == "auto":
                        config_params["tool_config"] = types.ToolConfig(
                            function_calling_config=types.FunctionCallingConfig(
                                mode=types.FunctionCallingConfigMode.AUTO
                            )
                        )
                    elif self.tool_choice == "none":
                        config_params["tool_config"] = types.ToolConfig(
                            function_calling_config=types.FunctionCallingConfig(
                                mode=types.FunctionCallingConfigMode.NONE
                            )
                        )

            config = types.GenerateContentConfig(**config_params)

            response_stream = None
            try:
                response_stream = await self._client.aio.models.generate_content_stream(
                    model=self.model,
                    contents=contents,
                    config=config,
                )

                current_content = ""
                current_function_calls: list = []
                usage = None

                streaming_state = {
                    "in_response": False,
                    "response_start_index": -1,
                    "yielded_content_length": 0,
                }

                async for response in response_stream:
                    if self._cancelled:
                        break

                    if response.prompt_feedback:
                        error_msg = f"Prompt feedback error: {response.prompt_feedback}"
                        self.emit("error", Exception(error_msg))
                        raise Exception(error_msg)

                    if not response.candidates or not response.candidates[0].content:
                        continue

                    candidate = response.candidates[0]
                    if not candidate.content.parts:
                        continue

                    if response.usage_metadata:
                        usage = {
                            "prompt_tokens": response.usage_metadata.prompt_token_count,
                            "completion_tokens": response.usage_metadata.candidates_token_count,
                            "total_tokens": response.usage_metadata.total_token_count,
                            "prompt_cached_tokens": getattr(
                                response.usage_metadata, "cached_content_token_count", 0
                            ),
                        }

                    for part in candidate.content.parts:
                        if part.function_call:
                            function_call = {
                                "name": part.function_call.name,
                                "arguments": dict(part.function_call.args),
                            }
                            # Capture thought_signature for multi-turn tool calling continuity
                            thought_sig = getattr(part, "thought_signature", None)
                            if thought_sig:
                                self._thought_signatures[part.function_call.name] = thought_sig
                                function_call["thought_signature"] = base64.b64encode(thought_sig).decode("utf-8")

                            current_function_calls.append(function_call)

                            yield LLMResponse(
                                content="",
                                role=ChatRole.ASSISTANT,
                                metadata={"function_call": function_call, "usage": usage},
                            )
                        elif part.text:
                            current_content += part.text
                            if conversational_graph:
                                for content_chunk in conversational_graph.stream_conversational_graph_response(
                                    current_content, streaming_state
                                ):
                                    yield LLMResponse(
                                        content=content_chunk,
                                        role=ChatRole.ASSISTANT,
                                        metadata={"usage": usage},
                                    )
                            else:
                                yield LLMResponse(
                                    content=part.text,
                                    role=ChatRole.ASSISTANT,
                                    metadata={"usage": usage},
                                )

                if current_content and not self._cancelled and conversational_graph:
                    try:
                        parsed_json = json.loads(current_content.strip())
                        yield LLMResponse(
                            content="",
                            role=ChatRole.ASSISTANT,
                            metadata={"graph_response": parsed_json, "usage": usage},
                        )
                    except json.JSONDecodeError:
                        yield LLMResponse(
                            content=current_content,
                            role=ChatRole.ASSISTANT,
                            metadata={"usage": usage},
                        )

            finally:
                if response_stream is not None:
                    try:
                        await response_stream.close()
                    except Exception:
                        pass

        except (ClientError, ServerError, APIError) as e:
            if not self._cancelled:
                error_msg = f"Google API error: {e}"
                self.emit("error", Exception(error_msg))
            raise Exception(error_msg) from e
        except Exception as e:
            if not self._cancelled:
                self.emit("error", e)
            raise

    async def cancel_current_generation(self) -> None:
        self._cancelled = True

    def _get_http_client(self) -> httpx.AsyncClient:
        """Return a shared httpx client (lazy-initialised)."""
        if self._http_client is None or self._http_client.is_closed:
            timeout_seconds = None
            if self.http_options is not None:
                timeout_seconds = getattr(self.http_options, "timeout", None)
            self._http_client = httpx.AsyncClient(
                timeout=httpx.Timeout(timeout_seconds or 30.0),
                follow_redirects=True,
            )
        return self._http_client

    async def _convert_messages_to_contents_async(
        self, messages: ChatContext
    ) -> tuple[list[types.Content], str | None]:
        """Convert ChatContext to Google Content format."""

        async def _format_content_parts_async(
            content: Union[str, List[ChatContent]]
        ) -> List[types.Part]:
            if isinstance(content, str):
                return [types.Part(text=content)]

            if len(content) == 1 and isinstance(content[0], str):
                return [types.Part(text=content[0])]

            formatted_parts = []
            for part in content:
                if isinstance(part, str):
                    formatted_parts.append(types.Part(text=part))
                elif isinstance(part, ImageContent):
                    data_url = part.to_data_url()
                    if data_url.startswith("data:"):
                        header, b64_data = data_url.split(",", 1)
                        media_type = header.split(";")[0].split(":")[1]
                        image_bytes = base64.b64decode(b64_data)
                        formatted_parts.append(
                            types.Part(
                                inline_data=types.Blob(
                                    mime_type=media_type, data=image_bytes
                                )
                            )
                        )
                    else:
                        # Reuse the shared client; do NOT create a new one per image
                        client = self._get_http_client()
                        try:
                            response = await client.get(data_url)
                            response.raise_for_status()
                            image_bytes = response.content
                            media_type = response.headers.get("Content-Type", "image/jpeg")
                            formatted_parts.append(
                                types.Part(
                                    inline_data=types.Blob(
                                        mime_type=media_type, data=image_bytes
                                    )
                                )
                            )
                        except httpx.HTTPStatusError as e:
                            logger.error(f"Failed to fetch image from URL {data_url}: {e}")
                            continue

            return formatted_parts

        contents = []
        system_instruction = None

        for item in messages.items:
            if isinstance(item, ChatMessage):
                if item.role == ChatRole.SYSTEM:
                    if isinstance(item.content, list):
                        system_instruction = next(
                            (str(p) for p in item.content if isinstance(p, str)), ""
                        )
                    else:
                        system_instruction = str(item.content)
                    continue
                elif item.role == ChatRole.USER:
                    parts = await _format_content_parts_async(item.content)
                    contents.append(types.Content(role="user", parts=parts))
                elif item.role == ChatRole.ASSISTANT:
                    parts = await _format_content_parts_async(item.content)
                    contents.append(types.Content(role="model", parts=parts))
            elif isinstance(item, FunctionCall):
                args = (
                    json.loads(item.arguments)
                    if isinstance(item.arguments, str)
                    else item.arguments
                )
                function_call = types.FunctionCall(name=item.name, args=args)
                fc_part = types.Part(function_call=function_call)

                thought_sig_b64 = (item.metadata or {}).get("thought_signature")
                if thought_sig_b64:
                    try:
                        sig_bytes = base64.b64decode(thought_sig_b64)
                        fc_part = types.Part(
                            function_call=function_call,
                            thought_signature=sig_bytes,
                        )
                    except Exception:
                        pass
                elif item.name in self._thought_signatures:
                    fc_part = types.Part(
                        function_call=function_call,
                        thought_signature=self._thought_signatures[item.name],
                    )
                contents.append(
                    types.Content(role="model", parts=[fc_part])
                )
            elif isinstance(item, FunctionCallOutput):
                function_response = types.FunctionResponse(
                    name=item.name, response={"output": item.output}
                )
                contents.append(
                    types.Content(
                        role="user", parts=[types.Part(function_response=function_response)]
                    )
                )

        return contents, system_instruction

    async def aclose(self) -> None:
        await self.cancel_current_generation()
        if self._http_client is not None and not self._http_client.is_closed:
            await self._http_client.aclose()
        await super().aclose()

Base class for LLM implementations.

Initialize the Google LLM plugin.

Args

api_key
Google API key. Falls back to GOOGLE_API_KEY env var.
model
Gemini model name. Defaults to "gemini-2.5-flash-lite".
temperature
Sampling temperature.
tool_choice
Controls which (if any) tool is called.
max_output_tokens
Maximum tokens in the response.
top_p
Nucleus sampling probability mass.
top_k
Top-K tokens considered during sampling.
presence_penalty
Penalise tokens that have already appeared.
frequency_penalty
Penalise repeated tokens by frequency.
vertexai
Use Vertex AI backend instead of the public Gemini API.
vertexai_config
Vertex AI project/location override.
thinking_budget
Token budget for extended thinking (Gemini 2.5). Set to None to disable the thinking config entirely.
seed
Seed for deterministic sampling.
safety_settings
List of types.SafetySetting / dicts forwarded to the API.
http_options
types.HttpOptions for the underlying Google client.
thinking_level
Thinking effort level for Gemini 3+ models ("low", "medium", "high", "minimal").
include_thoughts
Whether to surface model thoughts in the response.

Ancestors

  • videosdk.agents.llm.llm.LLM
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await self.cancel_current_generation()
    if self._http_client is not None and not self._http_client.is_closed:
        await self._http_client.aclose()
    await super().aclose()

Cleanup resources.

async def cancel_current_generation(self) ‑> None
Expand source code
async def cancel_current_generation(self) -> None:
    self._cancelled = True

Cancel the current LLM generation if active.

Raises

NotImplementedError
This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
conversational_graph: Any | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]
Expand source code
async def chat(
    self,
    messages: ChatContext,
    tools: list[FunctionTool] | None = None,
    conversational_graph: Any | None = None,
    **kwargs: Any
) -> AsyncIterator[LLMResponse]:
    """
    Implement chat functionality using Google's Gemini API.

    Args:
        messages: ChatContext containing conversation history.
        tools: Optional list of function tools available to the model.
        **kwargs: Additional arguments passed to the Google API.

    Yields:
        LLMResponse objects containing the model's responses.
    """
    self._cancelled = False
    
    try:
        (
            contents,
            system_instruction,
        ) = await self._convert_messages_to_contents_async(messages)
        config_params: dict = {
            "temperature": self.temperature,
            **kwargs
        }
        thinking_config = self._build_thinking_config()
        if thinking_config is not None:
            config_params["thinking_config"] = thinking_config
        if conversational_graph:
            config_params["response_mime_type"] = "application/json"
            config_params["response_json_schema"] = ConversationalGraphResponse.model_json_schema()
        
        if system_instruction:
            config_params["system_instruction"] = [types.Part(text=system_instruction)]
        
        if self.max_output_tokens is not None:
            config_params["max_output_tokens"] = self.max_output_tokens
        if self.top_p is not None:
            config_params["top_p"] = self.top_p
        if self.top_k is not None:
            config_params["top_k"] = self.top_k
        if self.presence_penalty is not None:
            config_params["presence_penalty"] = self.presence_penalty
        if self.frequency_penalty is not None:
            config_params["frequency_penalty"] = self.frequency_penalty
        if self.seed is not None:
            config_params["seed"] = self.seed
        if self.safety_settings is not None:
            config_params["safety_settings"] = self.safety_settings
        if self.http_options is not None:
            config_params["http_options"] = self.http_options

        if tools:
            function_declarations = []
            for tool in tools:
                if is_function_tool(tool):
                    try:
                        gemini_tool = build_gemini_schema(tool)
                        function_declarations.append(gemini_tool)
                    except Exception as e:
                        logger.error(f"Failed to format tool {tool}: {e}")
                        continue
            
            if function_declarations:
                config_params["tools"] = [types.Tool(function_declarations=function_declarations)]
                
                if self.tool_choice == "required":
                    config_params["tool_config"] = types.ToolConfig(
                        function_calling_config=types.FunctionCallingConfig(
                            mode=types.FunctionCallingConfigMode.ANY
                        )
                    )
                elif self.tool_choice == "auto":
                    config_params["tool_config"] = types.ToolConfig(
                        function_calling_config=types.FunctionCallingConfig(
                            mode=types.FunctionCallingConfigMode.AUTO
                        )
                    )
                elif self.tool_choice == "none":
                    config_params["tool_config"] = types.ToolConfig(
                        function_calling_config=types.FunctionCallingConfig(
                            mode=types.FunctionCallingConfigMode.NONE
                        )
                    )

        config = types.GenerateContentConfig(**config_params)

        response_stream = None
        try:
            response_stream = await self._client.aio.models.generate_content_stream(
                model=self.model,
                contents=contents,
                config=config,
            )

            current_content = ""
            current_function_calls: list = []
            usage = None

            streaming_state = {
                "in_response": False,
                "response_start_index": -1,
                "yielded_content_length": 0,
            }

            async for response in response_stream:
                if self._cancelled:
                    break

                if response.prompt_feedback:
                    error_msg = f"Prompt feedback error: {response.prompt_feedback}"
                    self.emit("error", Exception(error_msg))
                    raise Exception(error_msg)

                if not response.candidates or not response.candidates[0].content:
                    continue

                candidate = response.candidates[0]
                if not candidate.content.parts:
                    continue

                if response.usage_metadata:
                    usage = {
                        "prompt_tokens": response.usage_metadata.prompt_token_count,
                        "completion_tokens": response.usage_metadata.candidates_token_count,
                        "total_tokens": response.usage_metadata.total_token_count,
                        "prompt_cached_tokens": getattr(
                            response.usage_metadata, "cached_content_token_count", 0
                        ),
                    }

                for part in candidate.content.parts:
                    if part.function_call:
                        function_call = {
                            "name": part.function_call.name,
                            "arguments": dict(part.function_call.args),
                        }
                        # Capture thought_signature for multi-turn tool calling continuity
                        thought_sig = getattr(part, "thought_signature", None)
                        if thought_sig:
                            self._thought_signatures[part.function_call.name] = thought_sig
                            function_call["thought_signature"] = base64.b64encode(thought_sig).decode("utf-8")

                        current_function_calls.append(function_call)

                        yield LLMResponse(
                            content="",
                            role=ChatRole.ASSISTANT,
                            metadata={"function_call": function_call, "usage": usage},
                        )
                    elif part.text:
                        current_content += part.text
                        if conversational_graph:
                            for content_chunk in conversational_graph.stream_conversational_graph_response(
                                current_content, streaming_state
                            ):
                                yield LLMResponse(
                                    content=content_chunk,
                                    role=ChatRole.ASSISTANT,
                                    metadata={"usage": usage},
                                )
                        else:
                            yield LLMResponse(
                                content=part.text,
                                role=ChatRole.ASSISTANT,
                                metadata={"usage": usage},
                            )

            if current_content and not self._cancelled and conversational_graph:
                try:
                    parsed_json = json.loads(current_content.strip())
                    yield LLMResponse(
                        content="",
                        role=ChatRole.ASSISTANT,
                        metadata={"graph_response": parsed_json, "usage": usage},
                    )
                except json.JSONDecodeError:
                    yield LLMResponse(
                        content=current_content,
                        role=ChatRole.ASSISTANT,
                        metadata={"usage": usage},
                    )

        finally:
            if response_stream is not None:
                try:
                    await response_stream.close()
                except Exception:
                    pass

    except (ClientError, ServerError, APIError) as e:
        if not self._cancelled:
            error_msg = f"Google API error: {e}"
            self.emit("error", Exception(error_msg))
        raise Exception(error_msg) from e
    except Exception as e:
        if not self._cancelled:
            self.emit("error", e)
        raise

Implement chat functionality using Google's Gemini API.

Args

messages
ChatContext containing conversation history.
tools
Optional list of function tools available to the model.
**kwargs
Additional arguments passed to the Google API.

Yields

LLMResponse objects containing the model's responses.

class GoogleSTT (*,
api_key: Optional[str] = None,
languages: Union[str, list[str]] = 'en-US',
model: str = 'latest_long',
sample_rate: int = 16000,
interim_results: bool = True,
punctuate: bool = True,
min_confidence_threshold: float = 0.1,
location: str = 'global',
profanity_filter: bool = False,
enable_voice_activity_events: bool = False,
voice_activity_timeout: VoiceActivityConfig = None,
**kwargs: Any)
Expand source code
class GoogleSTT(BaseSTT):
    def __init__(
        self,
        *,
        api_key: Optional[str] = None,
        languages: Union[str, list[str]] = "en-US",
        model: str = "latest_long",
        sample_rate: int = 16000,
        interim_results: bool = True,
        punctuate: bool = True,
        min_confidence_threshold: float = 0.1,
        location: str = "global",
        profanity_filter:bool=False,
        enable_voice_activity_events:bool=False,
        voice_activity_timeout:VoiceActivityConfig = None,
        **kwargs: Any
    ) -> None:
        """Initialize the Google STT plugin.

        Args:
            api_key (Optional[str], optional): Google API key. Defaults to None.
            languages (Union[str, list[str]]): The languages to use for the STT plugin. Defaults to "en-US".
            model (str): The model to use for the STT plugin. Defaults to "latest_long".
            sample_rate (int): The sample rate to use for the STT plugin. Defaults to 16000.
            interim_results (bool): Whether to use interim results for the STT plugin. Defaults to True.
            punctuate (bool): Whether to use punctuation for the STT plugin. Defaults to True.
            min_confidence_threshold (float): The minimum confidence threshold for the STT plugin. Defaults to 0.1.
            location (str): The location to use for the STT plugin. Defaults to "global".
            profanity_filter(bool): detect profane words and return only the first letter followed by asterisks in the transcript
            enable_voice_activity_events(bool): whether to enable voice activity timeout.
            voice_activity_timeout (VoiceActivityConfig): configure speech_start_timeout and speech_end_timeout.

        """
        super().__init__()
        if not GOOGLE_V2_AVAILABLE:
            logger.error("Google Cloud Speech V2 is not available")
            raise ImportError("google-cloud-speech is not installed")

        if api_key:
            os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = api_key
        try:
            gauth_default()
        except DefaultCredentialsError as e:
            logger.error("Google credentials are not configured", exc_info=True)
            raise ValueError("Google credentials are not configured.") from e

        self.input_sample_rate = 48000
        self.target_sample_rate = sample_rate
        if isinstance(languages, str):
            languages = [languages]
        
        self._config = {
            "languages": languages,
            "model": model,
            "sample_rate": self.target_sample_rate,
            "interim_results": interim_results,
            "punctuate": punctuate,
            "min_confidence_threshold": min_confidence_threshold,
            "location": location,
            "profanity_filter":profanity_filter,
            "voice_activity_timeout":voice_activity_timeout,
            "enable_voice_activity_events":enable_voice_activity_events,
            "speech_start_timeout":voice_activity_timeout.speech_start_timeout if voice_activity_timeout else None,
            "speech_end_timeout":voice_activity_timeout.speech_end_timeout if voice_activity_timeout else None,
        }

        self._client: Optional[SpeechAsyncClient] = None
        self._stream: Optional[SpeechStream] = None

    async def _ensure_client(self):
        if self._client:
            return
        try:
            opts = None
            if self._config["location"] != "global":
                opts = ClientOptions(api_endpoint=f"{self._config['location']}-speech.googleapis.com")
            self._client = SpeechAsyncClient(client_options=opts)
        except Exception as e:
            logger.error("Failed to create SpeechAsyncClient", exc_info=True)
            raise e

    async def process_audio(self, audio_frames: bytes, **kwargs: Any) -> None:
        try:
            if not self._stream:
                await self._start_stream()
            
            if self._stream:
                if SCIPY_AVAILABLE:
                    try:
                        audio_data = np.frombuffer(audio_frames, dtype=np.int16)
                        resampled_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate))
                        resampled_bytes = resampled_data.astype(np.int16).tobytes()
                        await self._stream.push_audio(resampled_bytes)
                    except Exception as e:
                        logger.error("Error resampling audio", exc_info=True)
                        self.emit("error", {"message": "Error resampling audio", "error": str(e)})
                else:
                    await self._stream.push_audio(audio_frames)
        except Exception as e:
            logger.error("process_audio failed", exc_info=True)
            if self._stream:
                self.emit("error", {"message": "Failed to process audio", "error": str(e)})

    async def _start_stream(self):
        await self._ensure_client()
        try:
            self._stream = SpeechStream(self._client, self._config, self._transcript_callback)
            await self._stream.start()
        except Exception as e:
            logger.error("Failed to start SpeechStream", exc_info=True)
            raise e

    async def aclose(self) -> None:
        try:
            if self._stream:
                await self._stream.close()
                self._stream = None
            self._client = None
        except Exception as e:
            logger.error("Error during aclose", exc_info=True)

Base class for Speech-to-Text implementations

Initialize the Google STT plugin.

Args

api_key : Optional[str], optional
Google API key. Defaults to None.
languages : Union[str, list[str]]
The languages to use for the STT plugin. Defaults to "en-US".
model : str
The model to use for the STT plugin. Defaults to "latest_long".
sample_rate : int
The sample rate to use for the STT plugin. Defaults to 16000.
interim_results : bool
Whether to use interim results for the STT plugin. Defaults to True.
punctuate : bool
Whether to use punctuation for the STT plugin. Defaults to True.
min_confidence_threshold : float
The minimum confidence threshold for the STT plugin. Defaults to 0.1.
location : str
The location to use for the STT plugin. Defaults to "global".
profanity_filter(bool): detect profane words and return only the first letter followed by asterisks in the transcript
enable_voice_activity_events(bool): whether to enable voice activity timeout.
voice_activity_timeout : VoiceActivityConfig
configure speech_start_timeout and speech_end_timeout.

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    try:
        if self._stream:
            await self._stream.close()
            self._stream = None
        self._client = None
    except Exception as e:
        logger.error("Error during aclose", exc_info=True)

Cleanup resources

async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None
Expand source code
async def process_audio(self, audio_frames: bytes, **kwargs: Any) -> None:
    try:
        if not self._stream:
            await self._start_stream()
        
        if self._stream:
            if SCIPY_AVAILABLE:
                try:
                    audio_data = np.frombuffer(audio_frames, dtype=np.int16)
                    resampled_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate))
                    resampled_bytes = resampled_data.astype(np.int16).tobytes()
                    await self._stream.push_audio(resampled_bytes)
                except Exception as e:
                    logger.error("Error resampling audio", exc_info=True)
                    self.emit("error", {"message": "Error resampling audio", "error": str(e)})
            else:
                await self._stream.push_audio(audio_frames)
    except Exception as e:
        logger.error("process_audio failed", exc_info=True)
        if self._stream:
            self.emit("error", {"message": "Failed to process audio", "error": str(e)})

Process audio frames and convert to text

Args

audio_frames
Iterator of bytes to process
language
Optional language code for recognition
**kwargs
Additional provider-specific arguments

Returns

AsyncIterator yielding STTResponse objects

class GoogleTTS (*,
api_key: str | None = None,
speed: float = 1.0,
pitch: float = 0.0,
response_format: "Literal['pcm']" = 'pcm',
voice_config: GoogleVoiceConfig | None = None,
custom_pronunciations: list[dict] | dict | None = None,
vertexai: bool = False,
vertexai_config: VertexAIConfig | None = None,
streaming: bool = True)
Expand source code
class GoogleTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        speed: float = 1.0,
        pitch: float = 0.0,
        response_format: Literal["pcm"] = "pcm",
        voice_config: GoogleVoiceConfig | None = None,
        custom_pronunciations: list[dict] | dict | None = None,
        vertexai: bool = False,
        vertexai_config: VertexAIConfig | None = None,
        streaming: bool = True,
    ) -> None:
        """Initialize the Google TTS plugin.

        Args:
            api_key (Optional[str], optional): Google API key. Defaults to None.
            speed (float): The speed to use for the TTS plugin. Defaults to 1.0.
            pitch (float): The pitch to use for the TTS plugin. Defaults to 0.0.
            response_format (Literal["pcm"]): The response format to use for the TTS plugin. Defaults to "pcm".
            voice_config (GoogleVoiceConfig | None): The voice configuration to use for the TTS plugin. Defaults to None.
            custom_pronunciations: IPA pronunciation overrides,
                                   e.g. [{"tomato": "təˈmeɪtoʊ"}].
            vertexai: Use Vertex AI TTS endpoint with ADC authentication.
            vertexai_config: Project / location settings for Vertex AI.
            streaming: Use gRPC StreamingSynthesize for lower latency.
                       Compatible with vertexai=True — routes over gRPC to the regional endpoint.

        Requires: pip install google-cloud-texttospeech
        """
        super().__init__(sample_rate=GOOGLE_SAMPLE_RATE, num_channels=GOOGLE_CHANNELS)

        try:
            from google.cloud import texttospeech_v1
        except ImportError as exc:
            raise ImportError(
                "google-cloud-texttospeech is required. "
                "Install it with: pip install google-cloud-texttospeech"
            ) from exc

        self._tts = texttospeech_v1

        self.speed = speed
        self.pitch = pitch
        self.response_format = response_format
        self.audio_track = None
        self.loop = None
        self._first_chunk_sent = False
        self.voice_config = voice_config or GoogleVoiceConfig()
        self.custom_pronunciations = custom_pronunciations
        self.vertexai = vertexai
        self.vertexai_config = vertexai_config or VertexAIConfig()
        self.streaming = streaming
        if self.streaming and self.vertexai:
            raise ValueError("Streaming and vertexai cannot be used together.")
        resolved_voice = (voice_config or GoogleVoiceConfig()).name
        if streaming and not self._is_chirp3_hd_voice(resolved_voice):
            raise ValueError(
                f"Streaming synthesis only supports Chirp 3 HD voices "
                f"(e.g. 'en-US-Chirp3-HD-Aoede'). "
                f"Got: '{resolved_voice}'. "
                f"See https://cloud.google.com/text-to-speech/docs/chirp3-hd for available voices."
            )

        self._client = self._build_client(api_key)

    @staticmethod
    def _is_chirp3_hd_voice(name: str) -> bool:
        return "chirp3-hd" in name.lower()

    def _build_client(self, api_key: str | None) -> Any:
        """Construct a TextToSpeechAsyncClient."""
        from google.api_core.client_options import ClientOptions

        if self.vertexai:
            project_id = (
                self.vertexai_config.project_id
                or os.getenv("GOOGLE_CLOUD_PROJECT")
                or os.getenv("GCLOUD_PROJECT")
            )

            if project_id is None:
                service_account_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
                if service_account_path:
                    try:
                        from google.oauth2 import service_account
                        creds = service_account.Credentials.from_service_account_file(
                            service_account_path
                        )
                        project_id = creds.project_id
                    except Exception:
                        pass

            if project_id is None:
                raise ValueError(
                    "Vertex AI TTS requires a GCP project ID. Provide one of:\n"
                    "1. vertexai_config=VertexAIConfig(project_id='my-project')\n"
                    "2. GOOGLE_CLOUD_PROJECT environment variable\n"
                    "3. GOOGLE_APPLICATION_CREDENTIALS pointing to a service-account file"
                )

            location = (
                self.vertexai_config.location
                or os.getenv("GOOGLE_CLOUD_LOCATION")
                or "us-central1"
            )
            self.vertexai_config.project_id = project_id
            self.vertexai_config.location = location

            return self._tts.TextToSpeechAsyncClient(
                client_options=ClientOptions(
                    api_endpoint=f"{location}-texttospeech.googleapis.com"
                )
            )

        else:
            resolved_key = api_key or os.getenv("GOOGLE_API_KEY")
            if not resolved_key:
                raise ValueError(
                    "Google TTS API key required. Provide either:\n"
                    "1. api_key parameter, OR\n"
                    "2. GOOGLE_API_KEY environment variable"
                )
            return self._tts.TextToSpeechAsyncClient(
                client_options=ClientOptions(api_key=resolved_key)
            )
    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        **kwargs: Any,
    ) -> None:
        try:
            if self.streaming:
                await self._synthesize_streaming(text)
            elif isinstance(text, AsyncIterator):
                async for segment in segment_text(text):
                    await self._synthesize_audio(segment)
            else:
                await self._synthesize_audio(text)

            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or loop not initialized")
                return

        except Exception as e:
            self.emit("error", f"Google TTS synthesis failed: {str(e)}")
            raise

    async def _synthesize_audio(self, text: str) -> None:
        """Single-request synthesis via SynthesizeSpeech."""
        tts = self._tts

        if self.custom_pronunciations:
            synthesis_input = tts.SynthesisInput(
                text=text,
                custom_pronunciations=self._build_custom_pronunciations_proto(),
            )
        else:
            synthesis_input = tts.SynthesisInput(text=text)
        is_studio = self.voice_config.name.startswith("en-US-Studio")

        voice_params = tts.VoiceSelectionParams(
            language_code=self.voice_config.languageCode,
            name=self.voice_config.name,
        )
        if not is_studio:
            voice_params.ssml_gender = tts.SsmlVoiceGender[self.voice_config.ssmlGender]
        response = await self._client.synthesize_speech(
            input=synthesis_input,
            voice=voice_params,
            audio_config=tts.AudioConfig(
                audio_encoding=tts.AudioEncoding.LINEAR16,
                speaking_rate=self.speed,
                pitch=self.pitch,
                sample_rate_hertz=GOOGLE_SAMPLE_RATE,
            ),
        )

        if not response.audio_content:
            self.emit("error", "No audio content received from Google TTS")
            return

        await self._stream_audio_chunks(response.audio_content)

    async def _synthesize_streaming(self, text: AsyncIterator[str] | str) -> None:
        """Bidirectional gRPC streaming via StreamingSynthesize."""
        tts = self._tts

        streaming_config_kwargs: dict = dict(
            voice=tts.VoiceSelectionParams(
                language_code=self.voice_config.languageCode,
                name=self.voice_config.name,
            ),
            streaming_audio_config=tts.StreamingAudioConfig(
                audio_encoding=tts.AudioEncoding.PCM,
                sample_rate_hertz=GOOGLE_SAMPLE_RATE,
                speaking_rate=self.speed,
            ),
        )
        if self.custom_pronunciations:
            streaming_config_kwargs["custom_pronunciations"] = (
                self._build_custom_pronunciations_proto()
            )

        streaming_config = tts.StreamingSynthesizeConfig(**streaming_config_kwargs)

        async def request_generator() -> AsyncIterator[Any]:
            yield tts.StreamingSynthesizeRequest(streaming_config=streaming_config)
            if isinstance(text, str):
                yield tts.StreamingSynthesizeRequest(
                    input=tts.StreamingSynthesisInput(text=text)
                )
            else:
                async for chunk in text:
                    if chunk:
                        yield tts.StreamingSynthesizeRequest(
                            input=tts.StreamingSynthesisInput(text=chunk)
                        )

        try:
            async for response in await self._client.streaming_synthesize(
                request_generator()
            ):
                if response.audio_content:
                    await self._stream_audio_chunks(response.audio_content, has_wav_header=False)
        except Exception as e:
            self.emit("error", f"Google TTS streaming error: {str(e)}")
            raise

    def _build_custom_pronunciations_proto(self) -> Any:
        """Convert self.custom_pronunciations to a CustomPronunciations proto."""
        tts = self._tts
        params = []
        try:
            from google.cloud.texttospeech_v1.types import CustomPronunciationParams as _CPP
            PE = _CPP.PhoneticEncoding
            ENCODING_MAP = {
                "ipa":    PE.PHONETIC_ENCODING_IPA,
                "x-sampa": PE.PHONETIC_ENCODING_X_SAMPA,
            }
        except (ImportError, AttributeError):
            ENCODING_MAP = {"ipa": 1, "x-sampa": 2}

        if not self.custom_pronunciations:
            return tts.CustomPronunciations(pronunciations=[])

        raw = self.custom_pronunciations
        entries: list[tuple[str, str, Any]] = []

        if isinstance(raw, dict):
            for phrase, pronunciation in raw.items():
                entries.append((phrase, pronunciation, ENCODING_MAP["ipa"]))
        else:
            for item in raw:
                if not isinstance(item, dict):
                    continue
                if "phrase" in item and "pronunciation" in item:
                    enc_key = item.get("encoding", "ipa").lower()
                    enc = ENCODING_MAP.get(enc_key, ENCODING_MAP["ipa"])
                    if enc_key not in ENCODING_MAP:
                        logger.warning(
                            f"Unknown encoding '{enc_key}' for phrase '{item['phrase']}'. "
                            f"Supported: {list(ENCODING_MAP.keys())}. Falling back to IPA.",
                            UserWarning, stacklevel=3,
                        )
                    entries.append((item["phrase"], item["pronunciation"], enc))
                else:
                    for phrase, pronunciation in item.items():
                        entries.append((phrase, pronunciation, ENCODING_MAP["ipa"]))


        if self.voice_config.languageCode.lower() != "en-us":
            logger.warning(
                f"custom_pronunciations is only supported for en-US. "
                f"Got '{self.voice_config.languageCode}' — pronunciations will be ignored.",
                UserWarning,
                stacklevel=3,
            )

        for phrase, pronunciation, encoding in entries:
            if not phrase or not pronunciation:
                continue
            try:
                params.append(
                    tts.CustomPronunciationParams(
                        phrase=phrase,
                        pronunciation=pronunciation,
                        phonetic_encoding=encoding,
                    )
                )
            except Exception as e:
                logger.warning(
                    f"Skipping custom pronunciation for '{phrase}': {e}",
                    UserWarning,
                    stacklevel=3,
                )

        if not params:
            logger.warning(
                "custom_pronunciations was set but no valid entries were built. "
                "Check your phrase/pronunciation format.",
                UserWarning,
                stacklevel=3,
            )

        return tts.CustomPronunciations(pronunciations=params)


    async def _stream_audio_chunks(
        self, audio_bytes: bytes, has_wav_header: bool = True
    ) -> None:
        """Chunk raw PCM and forward to the audio track."""
        chunk_size = 960
        audio_data = self._remove_wav_header(audio_bytes) if has_wav_header else audio_bytes

        for i in range(0, len(audio_data), chunk_size):
            chunk = audio_data[i:i + chunk_size]

            if len(chunk) < chunk_size and len(chunk) > 0:
                padding_needed = chunk_size - len(chunk)
                chunk += b'\x00' * padding_needed

            if len(chunk) == chunk_size:
                if not self._first_chunk_sent and self._first_audio_callback:
                    self._first_chunk_sent = True
                    await self._first_audio_callback()

                asyncio.create_task(self.audio_track.add_new_bytes(chunk))
                await asyncio.sleep(0.001)

    def _remove_wav_header(self, audio_bytes: bytes) -> bytes:
        """Remove WAV header if present to get raw PCM data"""
        if audio_bytes.startswith(b"RIFF"):
            data_pos = audio_bytes.find(b"data")
            if data_pos != -1:
                return audio_bytes[data_pos + 8:]

        return audio_bytes

    async def aclose(self) -> None:
        if self._client:
            await self._client.transport.close()
        await super().aclose()

    async def interrupt(self) -> None:
        if self.audio_track:
            self.audio_track.interrupt()

Base class for Text-to-Speech implementations

Initialize the Google TTS plugin.

Args

api_key : Optional[str], optional
Google API key. Defaults to None.
speed : float
The speed to use for the TTS plugin. Defaults to 1.0.
pitch : float
The pitch to use for the TTS plugin. Defaults to 0.0.
response_format (Literal["pcm"]): The response format to use for the TTS plugin. Defaults to "pcm".
voice_config : GoogleVoiceConfig | None
The voice configuration to use for the TTS plugin. Defaults to None.
custom_pronunciations
IPA pronunciation overrides, e.g. [{"tomato": "təˈmeɪtoʊ"}].
vertexai
Use Vertex AI TTS endpoint with ADC authentication.
vertexai_config
Project / location settings for Vertex AI.
streaming
Use gRPC StreamingSynthesize for lower latency. Compatible with vertexai=True — routes over gRPC to the regional endpoint.

Requires: pip install google-cloud-texttospeech

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._client:
        await self._client.transport.close()
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt the TTS process

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self, text: AsyncIterator[str] | str, **kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[str] | str,
    **kwargs: Any,
) -> None:
    try:
        if self.streaming:
            await self._synthesize_streaming(text)
        elif isinstance(text, AsyncIterator):
            async for segment in segment_text(text):
                await self._synthesize_audio(segment)
        else:
            await self._synthesize_audio(text)

        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or loop not initialized")
            return

    except Exception as e:
        self.emit("error", f"Google TTS synthesis failed: {str(e)}")
        raise

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None

class GoogleVoiceConfig (languageCode: str = 'en-US',
name: str = 'en-US-Chirp3-HD-Charon',
ssmlGender: str = 'MALE')
Expand source code
@dataclass
class GoogleVoiceConfig:
    languageCode: str = "en-US"
    name: str = "en-US-Chirp3-HD-Charon"
    ssmlGender: str = "MALE"

GoogleVoiceConfig(languageCode: 'str' = 'en-US', name: 'str' = 'en-US-Chirp3-HD-Charon', ssmlGender: 'str' = 'MALE')

Instance variables

var languageCode : str
var name : str
var ssmlGender : str
class VertexAIConfig (project_id: str | None = None, location: str | None = None)
Expand source code
@dataclass
class VertexAIConfig:
    project_id: str| None = None
    location: str| None = None

VertexAIConfig(project_id: 'str | None' = None, location: 'str | None' = None)

Instance variables

var location : str | None
var project_id : str | None
class VoiceActivityConfig (speech_start_timeout: float = 1.0, speech_end_timeout: float = 5.0)
Expand source code
@dataclass
class VoiceActivityConfig:
    speech_start_timeout:float = 1.0
    speech_end_timeout:float = 5.0

VoiceActivityConfig(speech_start_timeout: 'float' = 1.0, speech_end_timeout: 'float' = 5.0)

Instance variables

var speech_end_timeout : float
var speech_start_timeout : float