Package videosdk.plugins.google

Sub-modules

videosdk.plugins.google.live_api
videosdk.plugins.google.llm
videosdk.plugins.google.stt
videosdk.plugins.google.tts

Classes

class GeminiLiveConfig (voice: Voice | None = 'Puck',
language_code: str | None = 'en-US',
temperature: float | None = None,
top_p: float | None = None,
top_k: float | None = None,
candidate_count: int | None = 1,
max_output_tokens: int | None = None,
presence_penalty: float | None = None,
frequency_penalty: float | None = None,
response_modalities: List[Modality] | None = <factory>,
input_audio_transcription: AudioTranscriptionConfig | None = <factory>,
output_audio_transcription: AudioTranscriptionConfig | None = <factory>)
Expand source code
@dataclass
class GeminiLiveConfig:
    """Configuration for the Gemini Live API

    Args:
        voice: Voice ID for audio output. Options: 'Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'. Defaults to 'Puck'
        language_code: Language code for speech synthesis. Defaults to 'en-US'
        temperature: Controls randomness in response generation. Higher values (e.g. 0.8) make output more random,
                    lower values (e.g. 0.2) make it more focused. Defaults to None
        top_p: Nucleus sampling parameter. Controls diversity via cumulative probability cutoff. Range 0-1. Defaults to None
        top_k: Limits the number of tokens considered for each step of text generation. Defaults to None
        candidate_count: Number of response candidates to generate. Defaults to 1
        max_output_tokens: Maximum number of tokens allowed in model responses. Defaults to None
        presence_penalty: Penalizes tokens based on their presence in the text so far. Range -2.0 to 2.0. Defaults to None
        frequency_penalty: Penalizes tokens based on their frequency in the text so far. Range -2.0 to 2.0. Defaults to None
        response_modalities: List of enabled response types. Options: ["TEXT", "AUDIO"]. Defaults to both
        input_audio_transcription: Configuration for audio transcription features. Defaults to None
        output_audio_transcription: Configuration for audio transcription features. Defaults to None
    """

    voice: Voice | None = "Puck"
    language_code: str | None = "en-US"
    temperature: float | None = None
    top_p: float | None = None
    top_k: float | None = None
    candidate_count: int | None = 1
    max_output_tokens: int | None = None
    presence_penalty: float | None = None
    frequency_penalty: float | None = None
    response_modalities: List[Modality] | None = field(
        default_factory=lambda: ["TEXT", "AUDIO"]
    )
    input_audio_transcription: AudioTranscriptionConfig | None = field(
        default_factory=dict
    )
    output_audio_transcription: AudioTranscriptionConfig | None = field(
        default_factory=dict
    )

Configuration for the Gemini Live API

Args

voice
Voice ID for audio output. Options: 'Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'. Defaults to 'Puck'
language_code
Language code for speech synthesis. Defaults to 'en-US'
temperature
Controls randomness in response generation. Higher values (e.g. 0.8) make output more random, lower values (e.g. 0.2) make it more focused. Defaults to None
top_p
Nucleus sampling parameter. Controls diversity via cumulative probability cutoff. Range 0-1. Defaults to None
top_k
Limits the number of tokens considered for each step of text generation. Defaults to None
candidate_count
Number of response candidates to generate. Defaults to 1
max_output_tokens
Maximum number of tokens allowed in model responses. Defaults to None
presence_penalty
Penalizes tokens based on their presence in the text so far. Range -2.0 to 2.0. Defaults to None
frequency_penalty
Penalizes tokens based on their frequency in the text so far. Range -2.0 to 2.0. Defaults to None
response_modalities
List of enabled response types. Options: ["TEXT", "AUDIO"]. Defaults to both
input_audio_transcription
Configuration for audio transcription features. Defaults to None
output_audio_transcription
Configuration for audio transcription features. Defaults to None

Instance variables

var candidate_count : int | None
var frequency_penalty : float | None
var input_audio_transcription : google.genai.types.AudioTranscriptionConfig | None
var language_code : str | None
var max_output_tokens : int | None
var output_audio_transcription : google.genai.types.AudioTranscriptionConfig | None
var presence_penalty : float | None
var response_modalities : List[google.genai.types.Modality] | None
var temperature : float | None
var top_k : float | None
var top_p : float | None
var voice : Literal['Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'] | None
class GeminiRealtime (*,
api_key: str | None = None,
model: str,
config: GeminiLiveConfig | None = None,
service_account_path: str | None = None)
Expand source code
class GeminiRealtime(RealtimeBaseModel[GeminiEventTypes]):
    """Gemini's realtime model for audio-only communication"""

    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str,
        config: GeminiLiveConfig | None = None,
        service_account_path: str | None = None,
    ) -> None:
        """
        Initialize Gemini realtime model.

        Args:
            api_key: Gemini API key. If not provided, will attempt to read from GOOGLE_API_KEY env var
            service_account_path: Path to Google service account JSON file.
            model: The Gemini model identifier to use (e.g. 'gemini-pro', 'gemini-pro-vision').
            config: Optional configuration object for customizing model behavior. Contains settings for:
                   - voice: Voice ID for audio output ('Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'). Defaults to 'Puck'
                   - language_code: Language code for speech synthesis. Defaults to 'en-US'
                   - temperature: Controls randomness in responses. Higher values (0.8) more random, lower (0.2) more focused
                   - top_p: Nucleus sampling parameter. Controls diversity via probability cutoff. Range 0-1
                   - top_k: Limits number of tokens considered for each generation step
                   - candidate_count: Number of response candidates to generate. Defaults to 1
                   - max_output_tokens: Maximum tokens allowed in model responses
                   - presence_penalty: Penalizes token presence in text. Range -2.0 to 2.0
                   - frequency_penalty: Penalizes token frequency in text. Range -2.0 to 2.0
                   - response_modalities: List of enabled response types ["TEXT", "AUDIO"]. Defaults to both
                   - output_audio_transcription: Configuration for audio transcription features

        Raises:
            ValueError: If neither api_key nor service_account_path is provided and no GOOGLE_API_KEY in env vars
        """
        super().__init__()
        self.model = model
        self._init_client(api_key, service_account_path)
        self._session: Optional[GeminiSession] = None
        self._closing = False
        self._session_should_close = asyncio.Event()
        self._main_task = None
        self.loop = None
        self.audio_track = None
        self._buffered_audio = bytearray()
        self._is_speaking = False
        self._last_audio_time = 0.0
        self._audio_processing_task = None
        self.tools = []
        self._instructions: str = (
            "You are a helpful voice assistant that can answer questions and help with tasks."
        )
        self.config: GeminiLiveConfig = config or GeminiLiveConfig()
        self.target_sample_rate = 24000
        self.input_sample_rate = 48000
        self._user_speaking = False
        self._agent_speaking = False

    def set_agent(self, agent: Agent) -> None:
        self._instructions = agent.instructions
        self.tools = agent.tools
        self.tools_formatted = self._convert_tools_to_gemini_format(self.tools)
        self.formatted_tools = self.tools_formatted

    def _init_client(self, api_key: str | None, service_account_path: str | None):
        if service_account_path:
            os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_path
            self.client = genai.Client(http_options={"api_version": "v1beta"})
        else:
            self.api_key = api_key or os.getenv("GOOGLE_API_KEY")
            if not self.api_key:
                self.emit("error", "GOOGLE_API_KEY or service account required")
                raise ValueError("GOOGLE_API_KEY or service account required")
            self.client = genai.Client(
                api_key=self.api_key, http_options={"api_version": "v1beta"}
            )

    async def connect(self) -> None:
        """Connect to the Gemini Live API"""
        if self._session:
            await self._cleanup_session(self._session)
            self._session = None

        self._closing = False
        self._session_should_close.clear()

        try:

            if (
                not self.audio_track
                and self.loop
                and "AUDIO" in self.config.response_modalities
            ):
                self.audio_track = CustomAudioStreamTrack(self.loop)
            elif not self.loop and "AUDIO" in self.config.response_modalities:
                self.emit(
                    "error", "Event loop not initialized. Audio playback will not work."
                )
                raise RuntimeError(
                    "Event loop not initialized. Audio playback will not work."
                )

            try:
                initial_session = await self._create_session()
                if initial_session:
                    self._session = initial_session
            except Exception as e:
                self.emit("error", f"Initial session creation failed, will retry: {e}")

            if not self._main_task or self._main_task.done():
                self._main_task = asyncio.create_task(
                    self._session_loop(), name="gemini-main-loop"
                )

        except Exception as e:
            self.emit("error", f"Error connecting to Gemini Live API: {e}")
            traceback.print_exc()
            raise

    async def _create_session(self) -> GeminiSession:
        """Create a new Gemini Live API session"""
        config = LiveConnectConfig(
            response_modalities=self.config.response_modalities,
            generation_config=GenerationConfig(
                candidate_count=(
                    self.config.candidate_count
                    if self.config.candidate_count is not None
                    else None
                ),
                temperature=(
                    self.config.temperature
                    if self.config.temperature is not None
                    else None
                ),
                top_p=self.config.top_p if self.config.top_p is not None else None,
                top_k=self.config.top_k if self.config.top_k is not None else None,
                max_output_tokens=(
                    self.config.max_output_tokens
                    if self.config.max_output_tokens is not None
                    else None
                ),
                presence_penalty=(
                    self.config.presence_penalty
                    if self.config.presence_penalty is not None
                    else None
                ),
                frequency_penalty=(
                    self.config.frequency_penalty
                    if self.config.frequency_penalty is not None
                    else None
                ),
            ),
            system_instruction=self._instructions,
            speech_config=SpeechConfig(
                voice_config=VoiceConfig(
                    prebuilt_voice_config=PrebuiltVoiceConfig(
                        voice_name=self.config.voice
                    )
                ),
                language_code=self.config.language_code,
            ),
            tools=self.formatted_tools or None,
            input_audio_transcription=self.config.input_audio_transcription,
            output_audio_transcription=self.config.output_audio_transcription,
        )
        try:
            session_cm = self.client.aio.live.connect(model=self.model, config=config)
            session = await session_cm.__aenter__()
            return GeminiSession(session=session, session_cm=session_cm, tasks=[])
        except Exception as e:
            self.emit("error", f"Connection error: {e}")
            traceback.print_exc()
            raise

    async def _session_loop(self) -> None:
        """Main processing loop for Gemini sessions"""
        reconnect_attempts = 0
        max_reconnect_attempts = 5
        reconnect_delay = 1

        while not self._closing:
            if not self._session:
                try:
                    self._session = await self._create_session()
                    reconnect_attempts = 0
                    reconnect_delay = 1
                except Exception as e:
                    reconnect_attempts += 1
                    reconnect_delay = min(30, reconnect_delay * 2)
                    self.emit(
                        "error",
                        f"session creation attempt {reconnect_attempts} failed: {e}",
                    )
                    if reconnect_attempts >= max_reconnect_attempts:
                        self.emit("error", "Max reconnection attempts reached")
                        break
                    await asyncio.sleep(reconnect_delay)
                    continue

            session = self._session

            recv_task = asyncio.create_task(
                self._receive_loop(session), name="gemini_receive"
            )
            keep_alive_task = asyncio.create_task(
                self._keep_alive(session), name="gemini_keepalive"
            )
            session.tasks.extend([recv_task, keep_alive_task])

            try:
                await self._session_should_close.wait()
            finally:
                for task in session.tasks:
                    if not task.done():
                        task.cancel()
                try:
                    await asyncio.gather(*session.tasks, return_exceptions=True)
                except Exception as e:
                    self.emit("error", f"Error during task cleanup: {e}")

            if not self._closing:
                await self._cleanup_session(session)
                self._session = None
                await asyncio.sleep(reconnect_delay)
                self._session_should_close.clear()

    async def _handle_tool_calls(
        self, response, active_response_id: str, accumulated_input_text: str
    ) -> str:
        """Handle tool calls from Gemini"""
        if not response.tool_call:
            return accumulated_input_text
        for tool_call in response.tool_call.function_calls:

            if self.tools:
                for tool in self.tools:
                    if not is_function_tool(tool):
                        continue
                    tool_info = get_tool_info(tool)
                    if tool_info.name == tool_call.name:
                        if accumulated_input_text:
                            await realtime_metrics_collector.set_user_transcript(
                                accumulated_input_text
                            )
                            accumulated_input_text = ""
                        try:
                            await realtime_metrics_collector.add_tool_call(
                                tool_info.name
                            )
                            result = await tool(**tool_call.args)
                            await self.send_tool_response(
                                [
                                    FunctionResponse(
                                        id=tool_call.id,
                                        name=tool_call.name,
                                        response=result,
                                    )
                                ]
                            )
                        except Exception as e:
                            self.emit(
                                "error",
                                f"Error executing function {tool_call.name}: {e}",
                            )
                            traceback.print_exc()
                        break
        return accumulated_input_text

    async def _receive_loop(self, session: GeminiSession) -> None:
        """Process incoming messages from Gemini"""
        try:
            active_response_id = None
            chunk_number = 0
            accumulated_text = ""
            final_transcription = ""
            accumulated_input_text = ""

            while not self._closing:
                try:
                    async for response in session.session.receive():
                        if self._closing:
                            break

                        if response.tool_call:
                            accumulated_input_text = await self._handle_tool_calls(
                                response, active_response_id, accumulated_input_text
                            )

                        if server_content := response.server_content:
                            if (
                                input_transcription := server_content.input_transcription
                            ):
                                if input_transcription.text:
                                    if not self._user_speaking:
                                        await realtime_metrics_collector.set_user_speech_start()
                                        self._user_speaking = True
                                    self.emit("user_speech_started", {"type": "done"})
                                    accumulated_input_text += input_transcription.text
                                    global_event_emitter.emit(
                                        "input_transcription",
                                        {
                                            "text": accumulated_input_text,
                                            "is_final": False,
                                        },
                                    )

                            if (
                                output_transcription := server_content.output_transcription
                            ):
                                if output_transcription.text:
                                    final_transcription += output_transcription.text
                                    global_event_emitter.emit(
                                        "output_transcription",
                                        {
                                            "text": final_transcription,
                                            "is_final": False,
                                        },
                                    )

                            if not active_response_id:
                                active_response_id = f"response_{id(response)}"
                                chunk_number = 0
                                accumulated_text = ""
                            if server_content.interrupted:
                                if active_response_id:
                                    active_response_id = None
                                    accumulated_text = ""
                                if (
                                    self.audio_track
                                    and "AUDIO" in self.config.response_modalities
                                ):
                                    self.audio_track.interrupt()
                                continue

                            if model_turn := server_content.model_turn:
                                if self._user_speaking:
                                    await realtime_metrics_collector.set_user_speech_end()
                                    self._user_speaking = False
                                if accumulated_input_text:
                                    await realtime_metrics_collector.set_user_transcript(
                                        accumulated_input_text
                                    )
                                    try:
                                        self.emit(
                                            "realtime_model_transcription",
                                            {
                                                "role": "user",
                                                "text": accumulated_input_text,
                                                "is_final": True,
                                            },
                                        )
                                    except Exception:
                                        pass
                                    accumulated_input_text = ""
                                for part in model_turn.parts:
                                    if (
                                        hasattr(part, "inline_data")
                                        and part.inline_data
                                    ):
                                        raw_audio = part.inline_data.data
                                        if not raw_audio or len(raw_audio) < 2:
                                            continue

                                        if "AUDIO" in self.config.response_modalities:
                                            chunk_number += 1
                                            if not self._agent_speaking:
                                                await realtime_metrics_collector.set_agent_speech_start()
                                                self._agent_speaking = True

                                            if self.audio_track and self.loop:
                                                if len(raw_audio) % 2 != 0:
                                                    raw_audio += b"\x00"

                                                asyncio.create_task(
                                                    self.audio_track.add_new_bytes(
                                                        raw_audio
                                                    ),
                                                    name=f"audio_chunk_{chunk_number}",
                                                )

                                    elif hasattr(part, "text") and part.text:
                                        accumulated_text += part.text

                            if server_content.turn_complete and active_response_id:
                                if accumulated_input_text:
                                    await realtime_metrics_collector.set_user_transcript(
                                        accumulated_input_text
                                    )
                                    accumulated_input_text = ""
                                if final_transcription:
                                    await realtime_metrics_collector.set_agent_response(
                                        final_transcription
                                    )
                                    try:
                                        self.emit(
                                            "realtime_model_transcription",
                                            {
                                                "role": "agent",
                                                "text": final_transcription,
                                                "is_final": True,
                                            },
                                        )
                                    except Exception:
                                        pass
                                if (
                                    "TEXT" in self.config.response_modalities
                                    and accumulated_text
                                ):
                                    global_event_emitter.emit(
                                        "text_response",
                                        {"type": "done", "text": accumulated_text},
                                    )
                                elif (
                                    "TEXT" not in self.config.response_modalities
                                    and final_transcription
                                ):
                                    global_event_emitter.emit(
                                        "text_response",
                                        {"type": "done", "text": final_transcription},
                                    )
                                active_response_id = None
                                accumulated_text = ""
                                final_transcription = ""
                                await realtime_metrics_collector.set_agent_speech_end(
                                    timeout=1.0
                                )
                                self._agent_speaking = False

                except Exception as e:
                    if "1000 (OK)" in str(e):
                        logger.info("Normal WebSocket closure")
                    else:
                        logger.error(f"Error in receive loop: {e}")
                        traceback.print_exc()

                    self._session_should_close.set()
                    break

                await asyncio.sleep(0.1)

        except asyncio.CancelledError:
            self.emit("error", "Receive loop cancelled")
        except Exception as e:
            self.emit("error", e)
            traceback.print_exc()
            self._session_should_close.set()

    async def _keep_alive(self, session: GeminiSession) -> None:
        """Send periodic keep-alive messages"""
        try:
            while not self._closing:
                await asyncio.sleep(10)

                if self._closing:
                    break

                try:
                    await session.session.send_client_content(
                        turns=Content(parts=[Part(text=".")], role="user"),
                        turn_complete=False,
                    )
                except Exception as e:
                    if "closed" in str(e).lower():
                        self._session_should_close.set()
                        break
                    self.emit("error", f"Keep-alive error: {e}")
        except asyncio.CancelledError:
            pass
        except Exception as e:
            logger.error(f"Error in keep-alive: {e}")
            self._session_should_close.set()

    async def handle_audio_input(self, audio_data: bytes) -> None:
        """Handle incoming audio data from the user"""
        if not self._session or self._closing:
            return

        if "AUDIO" not in self.config.response_modalities:
            return

        audio_data = np.frombuffer(audio_data, dtype=np.int16)
        audio_data = signal.resample(
            audio_data,
            int(len(audio_data) * self.target_sample_rate / self.input_sample_rate),
        )
        audio_data = audio_data.astype(np.int16).tobytes()

        await self._session.session.send_realtime_input(
            audio=Blob(data=audio_data, mime_type=f"audio/pcm;rate={AUDIO_SAMPLE_RATE}")
        )

    async def handle_video_input(self, video_data: av.VideoFrame) -> None:
        """Improved video input handler with error prevention"""
        if not self._session or self._closing:
            return

        try:
            if not video_data or not video_data.planes:
                return

            now = time.monotonic()
            if (
                hasattr(self, "_last_video_frame")
                and (now - self._last_video_frame) < 0.5
            ):
                return
            self._last_video_frame = now

            processed_jpeg = encode_image(video_data, DEFAULT_IMAGE_ENCODE_OPTIONS)

            if not processed_jpeg or len(processed_jpeg) < 100:
                logger.warning("Invalid JPEG data generated")
                return

            await self._session.session.send_realtime_input(
                video=Blob(data=processed_jpeg, mime_type="image/jpeg")
            )
        except Exception as e:
            self.emit("error", f"Video processing error: {str(e)}")

    async def interrupt(self) -> None:
        """Interrupt current response"""
        if not self._session or self._closing:
            return

        try:
            await self._session.session.send_client_content(
                turns=Content(parts=[Part(text="stop")], role="user"),
                turn_complete=True,
            )
            await realtime_metrics_collector.set_interrupted()
            if self.audio_track and "AUDIO" in self.config.response_modalities:
                self.audio_track.interrupt()
        except Exception as e:
            self.emit("error", f"Interrupt error: {e}")

    async def send_message(self, message: str) -> None:
        """Send a text message to get audio response"""
        retry_count = 0
        max_retries = 5
        while not self._session or not self._session.session:
            if retry_count >= max_retries:
                raise RuntimeError("No active Gemini session after maximum retries")
            logger.debug("No active session, waiting for connection...")
            await asyncio.sleep(1)
            retry_count += 1

        try:
            await self._session.session.send_client_content(
                turns=[
                    Content(
                        parts=[
                            Part(
                                text="Repeat the user's exact message back to them [DO NOT ADD ANYTHING ELSE]:"
                                + message
                            )
                        ],
                        role="model",
                    ),
                    Content(parts=[Part(text=".")], role="user"),
                ],
                turn_complete=True,
            )
            await asyncio.sleep(0.1)
        except Exception as e:
            self.emit("error", f"Error sending message: {e}")
            self._session_should_close.set()

    async def send_text_message(self, message: str) -> None:
        """Send a text message for text-only communication"""
        retry_count = 0
        max_retries = 5
        while not self._session or not self._session.session:
            if retry_count >= max_retries:
                raise RuntimeError("No active Gemini session after maximum retries")
            self.emit("error", "No active session, waiting for connection...")
            await asyncio.sleep(1)
            retry_count += 1

        try:
            await self._session.session.send_client_content(
                turns=Content(parts=[Part(text=message)], role="user"),
                turn_complete=True,
            )
        except Exception as e:
            self.emit("error", f"Error sending text message: {e}")
            self._session_should_close.set()

    async def _cleanup_session(self, session: GeminiSession) -> None:
        """Clean up a session's resources"""
        for task in session.tasks:
            if not task.done():
                task.cancel()

        try:
            await session.session_cm.__aexit__(None, None, None)
        except Exception as e:
            self.emit("error", f"Error closing session: {e}")

    async def aclose(self) -> None:
        """Clean up all resources"""
        if self._closing:
            return

        self._closing = True
        self._session_should_close.set()

        if self._audio_processing_task and not self._audio_processing_task.done():
            self._audio_processing_task.cancel()
            try:
                await asyncio.wait_for(self._audio_processing_task, timeout=1.0)
            except (asyncio.CancelledError, asyncio.TimeoutError):
                pass

        if self._main_task and not self._main_task.done():
            self._main_task.cancel()
            try:
                await asyncio.wait_for(self._main_task, timeout=2.0)
            except (asyncio.CancelledError, asyncio.TimeoutError):
                pass

        if self._session:
            await self._cleanup_session(self._session)
            self._session = None

        if hasattr(self.audio_track, "cleanup") and self.audio_track:
            try:
                await self.audio_track.cleanup()
            except Exception as e:
                self.emit("error", f"Error cleaning up audio track: {e}")

        self._buffered_audio = bytearray()

    async def _reconnect(self) -> None:
        if self._session:
            await self._cleanup_session(self._session)
            self._session = None
        self._session = await self._create_session()

    async def send_tool_response(
        self, function_responses: List[FunctionResponse]
    ) -> None:
        """Send tool responses back to Gemini"""
        if not self._session or not self._session.session:
            return

        try:
            await self._session.session.send_tool_response(
                function_responses=function_responses
            )
        except Exception as e:
            self.emit("error", f"Error sending tool response: {e}")
            self._session_should_close.set()

    def _convert_tools_to_gemini_format(self, tools: List[FunctionTool]) -> List[Tool]:
        """Convert tool definitions to Gemini's Tool format"""
        function_declarations = []

        for tool in tools:
            if not is_function_tool(tool):
                continue

            try:
                function_declaration = build_gemini_schema(tool)
                function_declarations.append(function_declaration)
            except Exception as e:
                self.emit("error", f"Failed to format tool {tool}: {e}")
                continue
        return (
            [Tool(function_declarations=function_declarations)]
            if function_declarations
            else []
        )

Gemini's realtime model for audio-only communication

Initialize Gemini realtime model.

Args

api_key
Gemini API key. If not provided, will attempt to read from GOOGLE_API_KEY env var
service_account_path
Path to Google service account JSON file.
model
The Gemini model identifier to use (e.g. 'gemini-pro', 'gemini-pro-vision').
config
Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID for audio output ('Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'). Defaults to 'Puck' - language_code: Language code for speech synthesis. Defaults to 'en-US' - temperature: Controls randomness in responses. Higher values (0.8) more random, lower (0.2) more focused - top_p: Nucleus sampling parameter. Controls diversity via probability cutoff. Range 0-1 - top_k: Limits number of tokens considered for each generation step - candidate_count: Number of response candidates to generate. Defaults to 1 - max_output_tokens: Maximum tokens allowed in model responses - presence_penalty: Penalizes token presence in text. Range -2.0 to 2.0 - frequency_penalty: Penalizes token frequency in text. Range -2.0 to 2.0 - response_modalities: List of enabled response types ["TEXT", "AUDIO"]. Defaults to both - output_audio_transcription: Configuration for audio transcription features

Raises

ValueError
If neither api_key nor service_account_path is provided and no GOOGLE_API_KEY in env vars

Ancestors

  • videosdk.agents.realtime_base_model.RealtimeBaseModel
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic
  • abc.ABC

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Clean up all resources"""
    if self._closing:
        return

    self._closing = True
    self._session_should_close.set()

    if self._audio_processing_task and not self._audio_processing_task.done():
        self._audio_processing_task.cancel()
        try:
            await asyncio.wait_for(self._audio_processing_task, timeout=1.0)
        except (asyncio.CancelledError, asyncio.TimeoutError):
            pass

    if self._main_task and not self._main_task.done():
        self._main_task.cancel()
        try:
            await asyncio.wait_for(self._main_task, timeout=2.0)
        except (asyncio.CancelledError, asyncio.TimeoutError):
            pass

    if self._session:
        await self._cleanup_session(self._session)
        self._session = None

    if hasattr(self.audio_track, "cleanup") and self.audio_track:
        try:
            await self.audio_track.cleanup()
        except Exception as e:
            self.emit("error", f"Error cleaning up audio track: {e}")

    self._buffered_audio = bytearray()

Clean up all resources

async def connect(self) ‑> None
Expand source code
async def connect(self) -> None:
    """Connect to the Gemini Live API"""
    if self._session:
        await self._cleanup_session(self._session)
        self._session = None

    self._closing = False
    self._session_should_close.clear()

    try:

        if (
            not self.audio_track
            and self.loop
            and "AUDIO" in self.config.response_modalities
        ):
            self.audio_track = CustomAudioStreamTrack(self.loop)
        elif not self.loop and "AUDIO" in self.config.response_modalities:
            self.emit(
                "error", "Event loop not initialized. Audio playback will not work."
            )
            raise RuntimeError(
                "Event loop not initialized. Audio playback will not work."
            )

        try:
            initial_session = await self._create_session()
            if initial_session:
                self._session = initial_session
        except Exception as e:
            self.emit("error", f"Initial session creation failed, will retry: {e}")

        if not self._main_task or self._main_task.done():
            self._main_task = asyncio.create_task(
                self._session_loop(), name="gemini-main-loop"
            )

    except Exception as e:
        self.emit("error", f"Error connecting to Gemini Live API: {e}")
        traceback.print_exc()
        raise

Connect to the Gemini Live API

async def handle_audio_input(self, audio_data: bytes) ‑> None
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None:
    """Handle incoming audio data from the user"""
    if not self._session or self._closing:
        return

    if "AUDIO" not in self.config.response_modalities:
        return

    audio_data = np.frombuffer(audio_data, dtype=np.int16)
    audio_data = signal.resample(
        audio_data,
        int(len(audio_data) * self.target_sample_rate / self.input_sample_rate),
    )
    audio_data = audio_data.astype(np.int16).tobytes()

    await self._session.session.send_realtime_input(
        audio=Blob(data=audio_data, mime_type=f"audio/pcm;rate={AUDIO_SAMPLE_RATE}")
    )

Handle incoming audio data from the user

async def handle_video_input(self, video_data: av.VideoFrame) ‑> None
Expand source code
async def handle_video_input(self, video_data: av.VideoFrame) -> None:
    """Improved video input handler with error prevention"""
    if not self._session or self._closing:
        return

    try:
        if not video_data or not video_data.planes:
            return

        now = time.monotonic()
        if (
            hasattr(self, "_last_video_frame")
            and (now - self._last_video_frame) < 0.5
        ):
            return
        self._last_video_frame = now

        processed_jpeg = encode_image(video_data, DEFAULT_IMAGE_ENCODE_OPTIONS)

        if not processed_jpeg or len(processed_jpeg) < 100:
            logger.warning("Invalid JPEG data generated")
            return

        await self._session.session.send_realtime_input(
            video=Blob(data=processed_jpeg, mime_type="image/jpeg")
        )
    except Exception as e:
        self.emit("error", f"Video processing error: {str(e)}")

Improved video input handler with error prevention

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt current response"""
    if not self._session or self._closing:
        return

    try:
        await self._session.session.send_client_content(
            turns=Content(parts=[Part(text="stop")], role="user"),
            turn_complete=True,
        )
        await realtime_metrics_collector.set_interrupted()
        if self.audio_track and "AUDIO" in self.config.response_modalities:
            self.audio_track.interrupt()
    except Exception as e:
        self.emit("error", f"Interrupt error: {e}")

Interrupt current response

async def send_message(self, message: str) ‑> None
Expand source code
async def send_message(self, message: str) -> None:
    """Send a text message to get audio response"""
    retry_count = 0
    max_retries = 5
    while not self._session or not self._session.session:
        if retry_count >= max_retries:
            raise RuntimeError("No active Gemini session after maximum retries")
        logger.debug("No active session, waiting for connection...")
        await asyncio.sleep(1)
        retry_count += 1

    try:
        await self._session.session.send_client_content(
            turns=[
                Content(
                    parts=[
                        Part(
                            text="Repeat the user's exact message back to them [DO NOT ADD ANYTHING ELSE]:"
                            + message
                        )
                    ],
                    role="model",
                ),
                Content(parts=[Part(text=".")], role="user"),
            ],
            turn_complete=True,
        )
        await asyncio.sleep(0.1)
    except Exception as e:
        self.emit("error", f"Error sending message: {e}")
        self._session_should_close.set()

Send a text message to get audio response

async def send_text_message(self, message: str) ‑> None
Expand source code
async def send_text_message(self, message: str) -> None:
    """Send a text message for text-only communication"""
    retry_count = 0
    max_retries = 5
    while not self._session or not self._session.session:
        if retry_count >= max_retries:
            raise RuntimeError("No active Gemini session after maximum retries")
        self.emit("error", "No active session, waiting for connection...")
        await asyncio.sleep(1)
        retry_count += 1

    try:
        await self._session.session.send_client_content(
            turns=Content(parts=[Part(text=message)], role="user"),
            turn_complete=True,
        )
    except Exception as e:
        self.emit("error", f"Error sending text message: {e}")
        self._session_should_close.set()

Send a text message for text-only communication

async def send_tool_response(self, function_responses: List[FunctionResponse]) ‑> None
Expand source code
async def send_tool_response(
    self, function_responses: List[FunctionResponse]
) -> None:
    """Send tool responses back to Gemini"""
    if not self._session or not self._session.session:
        return

    try:
        await self._session.session.send_tool_response(
            function_responses=function_responses
        )
    except Exception as e:
        self.emit("error", f"Error sending tool response: {e}")
        self._session_should_close.set()

Send tool responses back to Gemini

def set_agent(self, agent: Agent) ‑> None
Expand source code
def set_agent(self, agent: Agent) -> None:
    self._instructions = agent.instructions
    self.tools = agent.tools
    self.tools_formatted = self._convert_tools_to_gemini_format(self.tools)
    self.formatted_tools = self.tools_formatted
class GoogleLLM (*,
api_key: str | None = None,
model: str = 'gemini-2.0-flash-001',
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_output_tokens: int | None = None,
top_p: float | None = None,
top_k: int | None = None,
presence_penalty: float | None = None,
frequency_penalty: float | None = None)
Expand source code
class GoogleLLM(LLM):
    
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "gemini-2.0-flash-001",
        temperature: float = 0.7,
        tool_choice: ToolChoice = "auto",
        max_output_tokens: int | None = None,
        top_p: float | None = None,
        top_k: int | None = None,
        presence_penalty: float | None = None,
        frequency_penalty: float | None = None,
    ) -> None:
        """Initialize the Google LLM plugin
        
        Args:
            api_key (str): Google API key. If not provided, will attempt to read from GOOGLE_API_KEY env var
            model (str): The model to use for the LLM plugin.
            temperature (float): The temperature to use for the LLM plugin
            tool_choice (ToolChoice): The tool choice to use for the LLM plugin
            max_output_tokens (int): The maximum output tokens to use for the LLM plugin
            top_p (float): The top P to use for the LLM plugin
            top_k (int): The top K to use for the LLM plugin
            presence_penalty (float): The presence penalty to use for the LLM plugin
            frequency_penalty (float): The frequency penalty to use for the LLM plugin
        """
        super().__init__()
        
        self.api_key = api_key or os.getenv("GOOGLE_API_KEY")
        if not self.api_key:
            raise ValueError("Google API key must be provided either through api_key parameter or GOOGLE_API_KEY environment variable")
        
        self.model = model
        self.temperature = temperature
        self.tool_choice = tool_choice
        self.max_output_tokens = max_output_tokens
        self.top_p = top_p
        self.top_k = top_k
        self.presence_penalty = presence_penalty
        self.frequency_penalty = frequency_penalty
        self._cancelled = False
        
        self._client = Client(api_key=self.api_key)

    async def chat(
        self,
        messages: ChatContext,
        tools: list[FunctionTool] | None = None,
        **kwargs: Any
    ) -> AsyncIterator[LLMResponse]:
        """
        Implement chat functionality using Google's Gemini API
        
        Args:
            messages: ChatContext containing conversation history
            tools: Optional list of function tools available to the model
            **kwargs: Additional arguments passed to the Google API
            
        Yields:
            LLMResponse objects containing the model's responses
        """
        self._cancelled = False
        
        try:
            (
                contents,
                system_instruction,
            ) = await self._convert_messages_to_contents_async(messages)
            config_params = {
                "temperature": self.temperature,
                **kwargs
            }
            if system_instruction:
                config_params["system_instruction"] = [types.Part(text=system_instruction)]
            
            if self.max_output_tokens is not None:
                config_params["max_output_tokens"] = self.max_output_tokens
            if self.top_p is not None:
                config_params["top_p"] = self.top_p
            if self.top_k is not None:
                config_params["top_k"] = self.top_k
            if self.presence_penalty is not None:
                config_params["presence_penalty"] = self.presence_penalty
            if self.frequency_penalty is not None:
                config_params["frequency_penalty"] = self.frequency_penalty

            if tools:
                function_declarations = []
                for tool in tools:
                    if is_function_tool(tool):
                        try:
                            gemini_tool = build_gemini_schema(tool)
                            function_declarations.append(gemini_tool)
                        except Exception as e:
                            logger.error(f"Failed to format tool {tool}: {e}")
                            continue
                
                if function_declarations:
                    config_params["tools"] = [types.Tool(function_declarations=function_declarations)]
                    
                    if self.tool_choice == "required":
                        config_params["tool_config"] = types.ToolConfig(
                            function_calling_config=types.FunctionCallingConfig(
                                mode=types.FunctionCallingConfigMode.ANY
                            )
                        )
                    elif self.tool_choice == "auto":
                        config_params["tool_config"] = types.ToolConfig(
                            function_calling_config=types.FunctionCallingConfig(
                                mode=types.FunctionCallingConfigMode.AUTO
                            )
                        )
                    elif self.tool_choice == "none":
                        config_params["tool_config"] = types.ToolConfig(
                            function_calling_config=types.FunctionCallingConfig(
                                mode=types.FunctionCallingConfigMode.NONE
                            )
                        )

            config = types.GenerateContentConfig(**config_params)

            response_stream = await self._client.aio.models.generate_content_stream(
                model=self.model,
                contents=contents,
                config=config,
            )

            current_content = ""
            current_function_calls = []

            async for response in response_stream:
                if self._cancelled:
                    break
                    
                if response.prompt_feedback:
                    error_msg = f"Prompt feedback error: {response.prompt_feedback}"
                    self.emit("error", Exception(error_msg))
                    raise Exception(error_msg)

                if not response.candidates or not response.candidates[0].content:
                    continue

                candidate = response.candidates[0]
                if not candidate.content.parts:
                    continue

                for part in candidate.content.parts:
                    if part.function_call:
                        function_call = {
                            "name": part.function_call.name,
                            "arguments": dict(part.function_call.args)
                        }
                        current_function_calls.append(function_call)
                        
                        yield LLMResponse(
                            content="",
                            role=ChatRole.ASSISTANT,
                            metadata={"function_call": function_call}
                        )
                    elif part.text:
                        current_content = part.text
                        yield LLMResponse(
                            content=current_content,
                            role=ChatRole.ASSISTANT
                        )

        except (ClientError, ServerError, APIError) as e:
            if not self._cancelled:
                error_msg = f"Google API error: {e}"
                self.emit("error", Exception(error_msg))
            raise Exception(error_msg) from e
        except Exception as e:
            if not self._cancelled:
                self.emit("error", e)
            raise

    async def cancel_current_generation(self) -> None:
        self._cancelled = True

    async def _convert_messages_to_contents_async(
        self, messages: ChatContext
    ) -> tuple[list[types.Content], str | None]:
        """Convert ChatContext to Google Content format"""

        async def _format_content_parts_async(
            content: Union[str, List[ChatContent]]
        ) -> List[types.Part]:
            if isinstance(content, str):
                return [types.Part(text=content)]

            if len(content) == 1 and isinstance(content[0], str):
                return [types.Part(text=content[0])]

            formatted_parts = []
            for part in content:
                if isinstance(part, str):
                    formatted_parts.append(types.Part(text=part))
                elif isinstance(part, ImageContent):
                    data_url = part.to_data_url()
                    if data_url.startswith("data:"):
                        header, b64_data = data_url.split(",", 1)
                        media_type = header.split(";")[0].split(":")[1]
                        image_bytes = base64.b64decode(b64_data)

                        formatted_parts.append(
                            types.Part(
                                inline_data=types.Blob(
                                    mime_type=media_type, data=image_bytes
                                )
                            )
                        )
                    else: # Fetch image from URL
                        async with httpx.AsyncClient() as client:
                            try:
                                response = await client.get(data_url)
                                response.raise_for_status()
                                image_bytes = response.content
                                media_type = response.headers.get(
                                    "Content-Type", "image/jpeg"
                                )
                                formatted_parts.append(
                                    types.Part(
                                        inline_data=types.Blob(
                                            mime_type=media_type, data=image_bytes
                                        )
                                    )
                                )
                            except httpx.HTTPStatusError as e:
                                logger.error(f"Failed to fetch image from URL {data_url}: {e}")
                                continue 

            return formatted_parts

        contents = []
        system_instruction = None

        for item in messages.items:
            if isinstance(item, ChatMessage):
                if item.role == ChatRole.SYSTEM:
                    if isinstance(item.content, list):
                        system_instruction = next(
                            (str(p) for p in item.content if isinstance(p, str)), ""
                        )
                    else:
                        system_instruction = str(item.content)
                    continue
                elif item.role == ChatRole.USER:
                    parts = await _format_content_parts_async(item.content)
                    contents.append(types.Content(role="user", parts=parts))
                elif item.role == ChatRole.ASSISTANT:
                    parts = await _format_content_parts_async(item.content)
                    contents.append(types.Content(role="model", parts=parts))
            elif isinstance(item, FunctionCall):
                function_call = types.FunctionCall(
                    name=item.name,
                    args=(
                        json.loads(item.arguments)
                        if isinstance(item.arguments, str)
                        else item.arguments
                    ),
                )
                contents.append(
                    types.Content(role="model", parts=[types.Part(function_call=function_call)])
                )
            elif isinstance(item, FunctionCallOutput):
                function_response = types.FunctionResponse(
                    name=item.name, response={"output": item.output}
                )
                contents.append(
                    types.Content(
                        role="user", parts=[types.Part(function_response=function_response)]
                    )
                )

        return contents, system_instruction

    async def aclose(self) -> None:
        await self.cancel_current_generation()

Base class for LLM implementations.

Initialize the Google LLM plugin

Args

api_key : str
Google API key. If not provided, will attempt to read from GOOGLE_API_KEY env var
model : str
The model to use for the LLM plugin.
temperature : float
The temperature to use for the LLM plugin
tool_choice : ToolChoice
The tool choice to use for the LLM plugin
max_output_tokens : int
The maximum output tokens to use for the LLM plugin
top_p : float
The top P to use for the LLM plugin
top_k : int
The top K to use for the LLM plugin
presence_penalty : float
The presence penalty to use for the LLM plugin
frequency_penalty : float
The frequency penalty to use for the LLM plugin

Ancestors

  • videosdk.agents.llm.llm.LLM
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await self.cancel_current_generation()

Cleanup resources.

async def cancel_current_generation(self) ‑> None
Expand source code
async def cancel_current_generation(self) -> None:
    self._cancelled = True

Cancel the current LLM generation if active.

Raises

NotImplementedError
This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]
Expand source code
async def chat(
    self,
    messages: ChatContext,
    tools: list[FunctionTool] | None = None,
    **kwargs: Any
) -> AsyncIterator[LLMResponse]:
    """
    Implement chat functionality using Google's Gemini API
    
    Args:
        messages: ChatContext containing conversation history
        tools: Optional list of function tools available to the model
        **kwargs: Additional arguments passed to the Google API
        
    Yields:
        LLMResponse objects containing the model's responses
    """
    self._cancelled = False
    
    try:
        (
            contents,
            system_instruction,
        ) = await self._convert_messages_to_contents_async(messages)
        config_params = {
            "temperature": self.temperature,
            **kwargs
        }
        if system_instruction:
            config_params["system_instruction"] = [types.Part(text=system_instruction)]
        
        if self.max_output_tokens is not None:
            config_params["max_output_tokens"] = self.max_output_tokens
        if self.top_p is not None:
            config_params["top_p"] = self.top_p
        if self.top_k is not None:
            config_params["top_k"] = self.top_k
        if self.presence_penalty is not None:
            config_params["presence_penalty"] = self.presence_penalty
        if self.frequency_penalty is not None:
            config_params["frequency_penalty"] = self.frequency_penalty

        if tools:
            function_declarations = []
            for tool in tools:
                if is_function_tool(tool):
                    try:
                        gemini_tool = build_gemini_schema(tool)
                        function_declarations.append(gemini_tool)
                    except Exception as e:
                        logger.error(f"Failed to format tool {tool}: {e}")
                        continue
            
            if function_declarations:
                config_params["tools"] = [types.Tool(function_declarations=function_declarations)]
                
                if self.tool_choice == "required":
                    config_params["tool_config"] = types.ToolConfig(
                        function_calling_config=types.FunctionCallingConfig(
                            mode=types.FunctionCallingConfigMode.ANY
                        )
                    )
                elif self.tool_choice == "auto":
                    config_params["tool_config"] = types.ToolConfig(
                        function_calling_config=types.FunctionCallingConfig(
                            mode=types.FunctionCallingConfigMode.AUTO
                        )
                    )
                elif self.tool_choice == "none":
                    config_params["tool_config"] = types.ToolConfig(
                        function_calling_config=types.FunctionCallingConfig(
                            mode=types.FunctionCallingConfigMode.NONE
                        )
                    )

        config = types.GenerateContentConfig(**config_params)

        response_stream = await self._client.aio.models.generate_content_stream(
            model=self.model,
            contents=contents,
            config=config,
        )

        current_content = ""
        current_function_calls = []

        async for response in response_stream:
            if self._cancelled:
                break
                
            if response.prompt_feedback:
                error_msg = f"Prompt feedback error: {response.prompt_feedback}"
                self.emit("error", Exception(error_msg))
                raise Exception(error_msg)

            if not response.candidates or not response.candidates[0].content:
                continue

            candidate = response.candidates[0]
            if not candidate.content.parts:
                continue

            for part in candidate.content.parts:
                if part.function_call:
                    function_call = {
                        "name": part.function_call.name,
                        "arguments": dict(part.function_call.args)
                    }
                    current_function_calls.append(function_call)
                    
                    yield LLMResponse(
                        content="",
                        role=ChatRole.ASSISTANT,
                        metadata={"function_call": function_call}
                    )
                elif part.text:
                    current_content = part.text
                    yield LLMResponse(
                        content=current_content,
                        role=ChatRole.ASSISTANT
                    )

    except (ClientError, ServerError, APIError) as e:
        if not self._cancelled:
            error_msg = f"Google API error: {e}"
            self.emit("error", Exception(error_msg))
        raise Exception(error_msg) from e
    except Exception as e:
        if not self._cancelled:
            self.emit("error", e)
        raise

Implement chat functionality using Google's Gemini API

Args

messages
ChatContext containing conversation history
tools
Optional list of function tools available to the model
**kwargs
Additional arguments passed to the Google API

Yields

LLMResponse objects containing the model's responses

class GoogleSTT (*,
api_key: Optional[str] = None,
languages: Union[str, list[str]] = 'en-US',
model: str = 'latest_long',
sample_rate: int = 16000,
interim_results: bool = True,
punctuate: bool = True,
min_confidence_threshold: float = 0.1,
location: str = 'global',
**kwargs: Any)
Expand source code
class GoogleSTT(BaseSTT):
    def __init__(
        self,
        *,
        api_key: Optional[str] = None,
        languages: Union[str, list[str]] = "en-US",
        model: str = "latest_long",
        sample_rate: int = 16000,
        interim_results: bool = True,
        punctuate: bool = True,
        min_confidence_threshold: float = 0.1,
        location: str = "global",
        **kwargs: Any
    ) -> None:
        """Initialize the Google STT plugin.

        Args:
            api_key (Optional[str], optional): Google API key. Defaults to None.
            languages (Union[str, list[str]]): The languages to use for the STT plugin. Defaults to "en-US".
            model (str): The model to use for the STT plugin. Defaults to "latest_long".
            sample_rate (int): The sample rate to use for the STT plugin. Defaults to 16000.
            interim_results (bool): Whether to use interim results for the STT plugin. Defaults to True.
            punctuate (bool): Whether to use punctuation for the STT plugin. Defaults to True.
            min_confidence_threshold (float): The minimum confidence threshold for the STT plugin. Defaults to 0.1.
            location (str): The location to use for the STT plugin. Defaults to "global".
        """
        super().__init__()
        if not GOOGLE_V2_AVAILABLE:
            logger.error("Google Cloud Speech V2 is not available")
            raise ImportError("google-cloud-speech is not installed")

        if api_key:
            os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = api_key
        try:
            gauth_default()
        except DefaultCredentialsError as e:
            logger.error("Google credentials are not configured", exc_info=True)
            raise ValueError("Google credentials are not configured.") from e

        self.input_sample_rate = 48000
        self.target_sample_rate = sample_rate
        if isinstance(languages, str):
            languages = [languages]
        
        self._config = {
            "languages": languages,
            "model": model,
            "sample_rate": self.target_sample_rate,
            "interim_results": interim_results,
            "punctuate": punctuate,
            "min_confidence_threshold": min_confidence_threshold,
            "location": location,
        }

        self._client: Optional[SpeechAsyncClient] = None
        self._stream: Optional[SpeechStream] = None

    async def _ensure_client(self):
        if self._client:
            return
        try:
            opts = None
            if self._config["location"] != "global":
                opts = ClientOptions(api_endpoint=f"{self._config['location']}-speech.googleapis.com")
            self._client = SpeechAsyncClient(client_options=opts)
        except Exception as e:
            logger.error("Failed to create SpeechAsyncClient", exc_info=True)
            raise e

    async def process_audio(self, audio_frames: bytes, **kwargs: Any) -> None:
        try:
            if not self._stream:
                await self._start_stream()
            
            if self._stream:
                if SCIPY_AVAILABLE:
                    try:
                        audio_data = np.frombuffer(audio_frames, dtype=np.int16)
                        resampled_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate))
                        resampled_bytes = resampled_data.astype(np.int16).tobytes()
                        await self._stream.push_audio(resampled_bytes)
                    except Exception as e:
                        logger.error("Error resampling audio", exc_info=True)
                        self.emit("error", {"message": "Error resampling audio", "error": str(e)})
                else:
                    await self._stream.push_audio(audio_frames)
        except Exception as e:
            logger.error("process_audio failed", exc_info=True)
            if self._stream:
                self.emit("error", {"message": "Failed to process audio", "error": str(e)})

    async def _start_stream(self):
        await self._ensure_client()
        try:
            self._stream = SpeechStream(self._client, self._config, self._transcript_callback)
            await self._stream.start()
        except Exception as e:
            logger.error("Failed to start SpeechStream", exc_info=True)
            raise e

    async def aclose(self) -> None:
        try:
            if self._stream:
                await self._stream.close()
                self._stream = None
            self._client = None
        except Exception as e:
            logger.error("Error during aclose", exc_info=True)

Base class for Speech-to-Text implementations

Initialize the Google STT plugin.

Args

api_key : Optional[str], optional
Google API key. Defaults to None.
languages : Union[str, list[str]]
The languages to use for the STT plugin. Defaults to "en-US".
model : str
The model to use for the STT plugin. Defaults to "latest_long".
sample_rate : int
The sample rate to use for the STT plugin. Defaults to 16000.
interim_results : bool
Whether to use interim results for the STT plugin. Defaults to True.
punctuate : bool
Whether to use punctuation for the STT plugin. Defaults to True.
min_confidence_threshold : float
The minimum confidence threshold for the STT plugin. Defaults to 0.1.
location : str
The location to use for the STT plugin. Defaults to "global".

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    try:
        if self._stream:
            await self._stream.close()
            self._stream = None
        self._client = None
    except Exception as e:
        logger.error("Error during aclose", exc_info=True)

Cleanup resources

async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None
Expand source code
async def process_audio(self, audio_frames: bytes, **kwargs: Any) -> None:
    try:
        if not self._stream:
            await self._start_stream()
        
        if self._stream:
            if SCIPY_AVAILABLE:
                try:
                    audio_data = np.frombuffer(audio_frames, dtype=np.int16)
                    resampled_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate))
                    resampled_bytes = resampled_data.astype(np.int16).tobytes()
                    await self._stream.push_audio(resampled_bytes)
                except Exception as e:
                    logger.error("Error resampling audio", exc_info=True)
                    self.emit("error", {"message": "Error resampling audio", "error": str(e)})
            else:
                await self._stream.push_audio(audio_frames)
    except Exception as e:
        logger.error("process_audio failed", exc_info=True)
        if self._stream:
            self.emit("error", {"message": "Failed to process audio", "error": str(e)})

Process audio frames and convert to text

Args

audio_frames
Iterator of bytes to process
language
Optional language code for recognition
**kwargs
Additional provider-specific arguments

Returns

AsyncIterator yielding STTResponse objects

class GoogleTTS (*,
api_key: str | None = None,
speed: float = 1.0,
pitch: float = 0.0,
response_format: "Literal['pcm']" = 'pcm',
voice_config: GoogleVoiceConfig | None = None)
Expand source code
class GoogleTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        speed: float = 1.0,
        pitch: float = 0.0,
        response_format: Literal["pcm"] = "pcm",
        voice_config: GoogleVoiceConfig | None = None,
    ) -> None:
        """Initialize the Google TTS plugin.

        Args:
            api_key (Optional[str], optional): Google API key. Defaults to None.
            speed (float): The speed to use for the TTS plugin. Defaults to 1.0.
            pitch (float): The pitch to use for the TTS plugin. Defaults to 0.0.
            response_format (Literal["pcm"]): The response format to use for the TTS plugin. Defaults to "pcm".
            voice_config (GoogleVoiceConfig | None): The voice configuration to use for the TTS plugin. Defaults to None.
        """
        super().__init__(sample_rate=GOOGLE_SAMPLE_RATE, num_channels=GOOGLE_CHANNELS)

        self.speed = speed
        self.pitch = pitch
        self.response_format = response_format
        self.audio_track = None
        self.loop = None
        self._first_chunk_sent = False
        self.voice_config = voice_config or GoogleVoiceConfig()
        self.api_key = api_key or os.getenv("GOOGLE_API_KEY")

        if not self.api_key:
            raise ValueError(
                "Google TTS API key required. Provide either:\n"
                "1. api_key parameter, OR\n"
                "2. GOOGLE_API_KEY environment variable"
            )

        self._http_client = httpx.AsyncClient(
            timeout=httpx.Timeout(connect=15.0, read=30.0,
                                  write=5.0, pool=5.0),
            follow_redirects=True,
        )

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        **kwargs: Any,
    ) -> None:
        try:
            if isinstance(text, AsyncIterator):
                async for segment in segment_text(text):
                    await self._synthesize_audio(segment)
            else:
                await self._synthesize_audio(text)

            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or loop not initialized")
                return

        except Exception as e:
            self.emit("error", f"Google TTS synthesis failed: {str(e)}")

    async def _synthesize_audio(self, text: str) -> None:
        """Synthesize text to speech using Google TTS REST API"""
        try:
            voice_config = {
                "languageCode": self.voice_config.languageCode,
                "name": self.voice_config.name,
            }

            if not self.voice_config.name.startswith("en-US-Studio"):
                voice_config["ssmlGender"] = self.voice_config.ssmlGender

            payload = {
                "input": {"text": text},
                "voice": voice_config,
                "audioConfig": {
                    "audioEncoding": "LINEAR16",
                    "speakingRate": self.speed,
                    "pitch": self.pitch,
                    "sampleRateHertz": GOOGLE_SAMPLE_RATE,
                },
            }

            response = await self._http_client.post(
                GOOGLE_TTS_ENDPOINT, params={"key": self.api_key}, json=payload
            )
            response.raise_for_status()

            response_data = response.json()
            audio_content = response_data.get("audioContent")
            if not audio_content:
                self.emit("error", "No audio content received from Google TTS")
                return

            audio_bytes = base64.b64decode(audio_content)

            if not audio_bytes:
                self.emit("error", "Decoded audio bytes are empty")
                return

            await self._stream_audio_chunks(audio_bytes)

        except httpx.HTTPStatusError as e:
            if e.response.status_code == 403:
                self.emit(
                    "error", "Google TTS authentication failed. Please check your API key.")
            elif e.response.status_code == 400:
                try:
                    error_data = e.response.json()
                    error_msg = error_data.get("error", {}).get(
                        "message", "Bad request")
                    self.emit(
                        "error", f"Google TTS request error: {error_msg}")
                except:
                    self.emit(
                        "error", "Google TTS bad request. Please check your configuration.")
            else:
                self.emit(
                    "error", f"Google TTS HTTP error: {e.response.status_code}")
            raise

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        """Stream audio data in chunks to avoid beeps and ensure smooth playback"""
        chunk_size = 960
        audio_data = self._remove_wav_header(audio_bytes)

        for i in range(0, len(audio_data), chunk_size):
            chunk = audio_data[i:i + chunk_size]

            if len(chunk) < chunk_size and len(chunk) > 0:
                padding_needed = chunk_size - len(chunk)
                chunk += b'\x00' * padding_needed

            if len(chunk) == chunk_size:
                if not self._first_chunk_sent and self._first_audio_callback:
                    self._first_chunk_sent = True
                    await self._first_audio_callback()

                asyncio.create_task(self.audio_track.add_new_bytes(chunk))
                await asyncio.sleep(0.001)

    def _remove_wav_header(self, audio_bytes: bytes) -> bytes:
        """Remove WAV header if present to get raw PCM data"""
        if audio_bytes.startswith(b"RIFF"):
            data_pos = audio_bytes.find(b"data")
            if data_pos != -1:
                return audio_bytes[data_pos + 8:]

        return audio_bytes

    async def aclose(self) -> None:
        if self._http_client:
            await self._http_client.aclose()
        await super().aclose()

    async def interrupt(self) -> None:
        if self.audio_track:
            self.audio_track.interrupt()

Base class for Text-to-Speech implementations

Initialize the Google TTS plugin.

Args

api_key : Optional[str], optional
Google API key. Defaults to None.
speed : float
The speed to use for the TTS plugin. Defaults to 1.0.
pitch : float
The pitch to use for the TTS plugin. Defaults to 0.0.
response_format (Literal["pcm"]): The response format to use for the TTS plugin. Defaults to "pcm".
voice_config : GoogleVoiceConfig | None
The voice configuration to use for the TTS plugin. Defaults to None.

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._http_client:
        await self._http_client.aclose()
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt the TTS process

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self, text: AsyncIterator[str] | str, **kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[str] | str,
    **kwargs: Any,
) -> None:
    try:
        if isinstance(text, AsyncIterator):
            async for segment in segment_text(text):
                await self._synthesize_audio(segment)
        else:
            await self._synthesize_audio(text)

        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or loop not initialized")
            return

    except Exception as e:
        self.emit("error", f"Google TTS synthesis failed: {str(e)}")

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None

class GoogleVoiceConfig (languageCode: str = 'en-US',
name: str = 'en-US-Chirp3-HD-Aoede',
ssmlGender: str = 'FEMALE')
Expand source code
@dataclass
class GoogleVoiceConfig:
    languageCode: str = "en-US"
    name: str = "en-US-Chirp3-HD-Aoede"
    ssmlGender: str = "FEMALE"

GoogleVoiceConfig(languageCode: 'str' = 'en-US', name: 'str' = 'en-US-Chirp3-HD-Aoede', ssmlGender: 'str' = 'FEMALE')

Instance variables

var languageCode : str
var name : str
var ssmlGender : str