Package videosdk.plugins.google
Sub-modules
videosdk.plugins.google.live_apivideosdk.plugins.google.llmvideosdk.plugins.google.sttvideosdk.plugins.google.tts
Classes
class GeminiLiveConfig (voice: Voice | None = 'Puck',
language_code: str | None = 'en-US',
temperature: float | None = None,
top_p: float | None = None,
top_k: float | None = None,
candidate_count: int | None = 1,
max_output_tokens: int | None = None,
presence_penalty: float | None = None,
frequency_penalty: float | None = None,
response_modalities: List[Modality] | None = <factory>,
input_audio_transcription: AudioTranscriptionConfig | None = <factory>,
output_audio_transcription: AudioTranscriptionConfig | None = <factory>,
thinking_config: Optional[ThinkingConfig] | None = <factory>,
realtime_input_config: Optional[RealtimeInputConfig] | None = <factory>,
context_window_compression: Optional[ContextWindowCompressionConfig] | None = <factory>)-
Expand source code
@dataclass class GeminiLiveConfig: """Configuration for the Gemini Live API Args: voice: Voice ID for audio output. Options: 'Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'. Defaults to 'Puck' language_code: Language code for speech synthesis. Defaults to 'en-US' temperature: Controls randomness in response generation. Higher values (e.g. 0.8) make output more random, lower values (e.g. 0.2) make it more focused. Defaults to None top_p: Nucleus sampling parameter. Controls diversity via cumulative probability cutoff. Range 0-1. Defaults to None top_k: Limits the number of tokens considered for each step of text generation. Defaults to None candidate_count: Number of response candidates to generate. Defaults to 1 max_output_tokens: Maximum number of tokens allowed in model responses. Defaults to None presence_penalty: Penalizes tokens based on their presence in the text so far. Range -2.0 to 2.0. Defaults to None frequency_penalty: Penalizes tokens based on their frequency in the text so far. Range -2.0 to 2.0. Defaults to None response_modalities: List of enabled response types. Options: ["TEXT", "AUDIO"]. Defaults to ["AUDIO"] input_audio_transcription: Configuration for audio transcription features. Defaults to None output_audio_transcription: Configuration for audio transcription features. Defaults to None thinking_config: Configuration for model's "thinking" behavior. Defaults to None realtime_input_config: Configuration for realtime input handling. Defaults to None context_window_compression: Configuration for context window compression. Defaults to None """ voice: Voice | None = "Puck" language_code: str | None = "en-US" temperature: float | None = None top_p: float | None = None top_k: float | None = None candidate_count: int | None = 1 max_output_tokens: int | None = None presence_penalty: float | None = None frequency_penalty: float | None = None response_modalities: List[Modality] | None = field( default_factory=lambda: ["AUDIO"] ) input_audio_transcription: AudioTranscriptionConfig | None = field( default_factory=dict ) output_audio_transcription: AudioTranscriptionConfig | None = field( default_factory=dict ) thinking_config: Optional[ThinkingConfig] | None = field(default_factory=dict) realtime_input_config:Optional[RealtimeInputConfig]| None = field(default_factory=dict) context_window_compression:Optional[ContextWindowCompressionConfig] | None = field(default_factory=dict) # TODO # proactivity: ProactivityConfig | None = field(default_factory=dict) # enable_affective_dialog: bool | None = field(default=None) @property def is_text_only_mode(self) -> bool: """Check if configured for text-only responses (no audio)""" return self.response_modalities == ["TEXT"]Configuration for the Gemini Live API
Args
voice- Voice ID for audio output. Options: 'Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'. Defaults to 'Puck'
language_code- Language code for speech synthesis. Defaults to 'en-US'
temperature- Controls randomness in response generation. Higher values (e.g. 0.8) make output more random, lower values (e.g. 0.2) make it more focused. Defaults to None
top_p- Nucleus sampling parameter. Controls diversity via cumulative probability cutoff. Range 0-1. Defaults to None
top_k- Limits the number of tokens considered for each step of text generation. Defaults to None
candidate_count- Number of response candidates to generate. Defaults to 1
max_output_tokens- Maximum number of tokens allowed in model responses. Defaults to None
presence_penalty- Penalizes tokens based on their presence in the text so far. Range -2.0 to 2.0. Defaults to None
frequency_penalty- Penalizes tokens based on their frequency in the text so far. Range -2.0 to 2.0. Defaults to None
response_modalities- List of enabled response types. Options: ["TEXT", "AUDIO"]. Defaults to ["AUDIO"]
input_audio_transcription- Configuration for audio transcription features. Defaults to None
output_audio_transcription- Configuration for audio transcription features. Defaults to None
thinking_config- Configuration for model's "thinking" behavior. Defaults to None
realtime_input_config- Configuration for realtime input handling. Defaults to None
context_window_compression- Configuration for context window compression. Defaults to None
Instance variables
var candidate_count : int | Nonevar context_window_compression : google.genai.types.ContextWindowCompressionConfig | Nonevar frequency_penalty : float | Nonevar input_audio_transcription : google.genai.types.AudioTranscriptionConfig | Noneprop is_text_only_mode : bool-
Expand source code
@property def is_text_only_mode(self) -> bool: """Check if configured for text-only responses (no audio)""" return self.response_modalities == ["TEXT"]Check if configured for text-only responses (no audio)
var language_code : str | Nonevar max_output_tokens : int | Nonevar output_audio_transcription : google.genai.types.AudioTranscriptionConfig | Nonevar presence_penalty : float | Nonevar realtime_input_config : google.genai.types.RealtimeInputConfig | Nonevar response_modalities : List[google.genai.types.Modality] | Nonevar temperature : float | Nonevar thinking_config : google.genai.types.ThinkingConfig | Nonevar top_k : float | Nonevar top_p : float | Nonevar voice : Literal['Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'] | None
class GeminiRealtime (*,
api_key: str | None = None,
model: str | None = 'gemini-3.1-flash-live-preview',
config: GeminiLiveConfig | None = None,
service_account_path: str | None = None,
vertexai: bool = False,
vertexai_config: VertexAIConfig | None = None)-
Expand source code
class GeminiRealtime(RealtimeBaseModel[GeminiEventTypes]): """Gemini's realtime model for audio-only communication""" def __init__( self, *, api_key: str | None = None, model: str | None = "gemini-3.1-flash-live-preview", config: GeminiLiveConfig | None = None, service_account_path: str | None = None, vertexai: bool = False, vertexai_config: VertexAIConfig | None = None, ) -> None: """ Initialize Gemini realtime model. Args: api_key: Gemini API key. If not provided, will attempt to read from GOOGLE_API_KEY env var service_account_path: Path to Google service account JSON file. model: The Gemini model identifier to use (e.g. 'gemini-pro', 'gemini-pro-vision'). config: Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID for audio output ('Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'). Defaults to 'Puck' - language_code: Language code for speech synthesis. Defaults to 'en-US' - temperature: Controls randomness in responses. Higher values (0.8) more random, lower (0.2) more focused - top_p: Nucleus sampling parameter. Controls diversity via probability cutoff. Range 0-1 - top_k: Limits number of tokens considered for each generation step - candidate_count: Number of response candidates to generate. Defaults to 1 - max_output_tokens: Maximum tokens allowed in model responses - presence_penalty: Penalizes token presence in text. Range -2.0 to 2.0 - frequency_penalty: Penalizes token frequency in text. Range -2.0 to 2.0 - response_modalities: List of enabled response types ["TEXT", "AUDIO"]. Defaults to ["AUDIO"] - input_audio_transcription: Configuration for audio transcription features - output_audio_transcription: Configuration for audio transcription features - thinking_config: Configuration for model's "thinking" behavior - realtime_input_config: Configuration for realtime input handling Raises: ValueError: If neither api_key nor service_account_path is provided and no GOOGLE_API_KEY in env vars """ super().__init__() self.model = model self.vertexai = vertexai self.vertexai_config = vertexai_config self._init_client(api_key, service_account_path) self._session: Optional[GeminiSession] = None self._closing = False self._session_should_close = asyncio.Event() self._main_task = None self._buffered_audio = bytearray() self._is_speaking = False self._last_audio_time = 0.0 self._audio_processing_task = None self.tools = [] self._instructions: str = ( "You are a helpful voice assistant that can answer questions and help with tasks." ) self.config: GeminiLiveConfig = config or GeminiLiveConfig() self.target_sample_rate = 24000 self.input_sample_rate = 48000 self._user_speaking = False self._agent_speaking = False def set_agent(self, agent: Agent) -> None: self._instructions = agent.instructions self.tools = agent.tools self.tools_formatted = self._convert_tools_to_gemini_format(self.tools) self.formatted_tools = self.tools_formatted def _init_client(self, api_key: str | None, service_account_path: str | None): if self.vertexai: project_id = (self.vertexai_config.project_id if self.vertexai_config else None) or os.getenv("GOOGLE_CLOUD_PROJECT") if project_id is None: service_account_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") if service_account_path: from google.oauth2 import service_account creds = service_account.Credentials.from_service_account_file(service_account_path) project_id = creds.project_id location = (self.vertexai_config.location if self.vertexai_config else None) or os.getenv("GOOGLE_CLOUD_LOCATION") or "us-central1" self.client = genai.Client( vertexai=True, project=project_id, location=location, ) else: if service_account_path: os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_path self.client = genai.Client(http_options={"api_version": "v1beta"}) else: self.api_key = api_key or os.getenv("GOOGLE_API_KEY") if not self.api_key: self.emit("error", "GOOGLE_API_KEY or service account required") raise ValueError("GOOGLE_API_KEY or service account required") self.client = genai.Client( api_key=self.api_key, http_options={"api_version": "v1beta"} ) async def connect(self) -> None: """Connect to the Gemini Live API""" if self._session: await self._cleanup_session(self._session) self._session = None self._closing = False self._session_should_close.clear() try: if not self.audio_track and "AUDIO" in self.config.response_modalities: logger.warning("audio_track not set — it should be assigned externally by the pipeline before connect().") try: initial_session = await self._create_session() if initial_session: self._session = initial_session except Exception as e: self.emit("error", f"Initial session creation failed, will retry: {e}") if not self._main_task or self._main_task.done(): self._main_task = asyncio.create_task( self._session_loop(), name="gemini-main-loop" ) except Exception as e: self.emit("error", f"Error connecting to Gemini Live API: {e}") traceback.print_exc() raise async def _create_session(self) -> GeminiSession: """Create a new Gemini Live API session""" has_audio_output = "AUDIO" in self.config.response_modalities speech_config_obj = None if has_audio_output: speech_config_obj = SpeechConfig( voice_config=VoiceConfig( prebuilt_voice_config=PrebuiltVoiceConfig( voice_name=self.config.voice ) ), language_code=self.config.language_code, ) logger.info(f"Creating Gemini Session with modalities: {self.config.response_modalities}, has_audio_output: {has_audio_output}") config = LiveConnectConfig( response_modalities=self.config.response_modalities, generation_config=GenerationConfig( candidate_count=( self.config.candidate_count if self.config.candidate_count is not None else None ), temperature=( self.config.temperature if self.config.temperature is not None else None ), top_p=self.config.top_p if self.config.top_p is not None else None, top_k=self.config.top_k if self.config.top_k is not None else None, max_output_tokens=( self.config.max_output_tokens if self.config.max_output_tokens is not None else None ), presence_penalty=( self.config.presence_penalty if self.config.presence_penalty is not None else None ), frequency_penalty=( self.config.frequency_penalty if self.config.frequency_penalty is not None else None ), ), system_instruction=self._instructions, speech_config=speech_config_obj, tools=self.formatted_tools or None, input_audio_transcription=self.config.input_audio_transcription if has_audio_output else None, output_audio_transcription=self.config.output_audio_transcription if has_audio_output else None, realtime_input_config=self.config.realtime_input_config if self.config.realtime_input_config else None, context_window_compression=self.config.context_window_compression if self.config.context_window_compression else None ) if self.is_native_audio_model(): config = config.model_dump() if self.config.thinking_config: config["generation_config"]["thinking_config"] = self.config.thinking_config logger.info("Added thinking_config to generation_config") try: session_cm = self.client.aio.live.connect(model=self.model, config=config) session = await session_cm.__aenter__() return GeminiSession(session=session, session_cm=session_cm, tasks=[]) except Exception as e: self.emit("error", f"Connection error: {e}") traceback.print_exc() raise async def _session_loop(self) -> None: """Main processing loop for Gemini sessions""" reconnect_attempts = 0 max_reconnect_attempts = 5 reconnect_delay = 1 while not self._closing: if not self._session: try: self._session = await self._create_session() reconnect_attempts = 0 reconnect_delay = 1 except Exception as e: reconnect_attempts += 1 reconnect_delay = min(30, reconnect_delay * 2) self.emit( "error", f"session creation attempt {reconnect_attempts} failed: {e}", ) if reconnect_attempts >= max_reconnect_attempts: self.emit("error", "Max reconnection attempts reached") break await asyncio.sleep(reconnect_delay) continue session = self._session recv_task = asyncio.create_task( self._receive_loop(session), name="gemini_receive" ) keep_alive_task = asyncio.create_task( self._keep_alive(session), name="gemini_keepalive" ) session.tasks.extend([recv_task, keep_alive_task]) try: await self._session_should_close.wait() finally: for task in session.tasks: if not task.done(): task.cancel() try: await asyncio.gather(*session.tasks, return_exceptions=True) except Exception as e: self.emit("error", f"Error during task cleanup: {e}") if not self._closing: await self._cleanup_session(session) self._session = None await asyncio.sleep(reconnect_delay) self._session_should_close.clear() async def _handle_tool_calls( self, response, active_response_id: str, accumulated_input_text: str ) -> str: """Handle tool calls from Gemini""" if not response.tool_call: return accumulated_input_text for tool_call in response.tool_call.function_calls: if self.tools: for tool in self.tools: if not is_function_tool(tool): continue tool_info = get_tool_info(tool) if tool_info.name == tool_call.name: if accumulated_input_text: metrics_collector.set_user_transcript( accumulated_input_text ) accumulated_input_text = "" try: metrics_collector.add_function_tool_call( tool_name=tool_info.name ) result = await tool(**tool_call.args) await self.send_tool_response( [ FunctionResponse( id=tool_call.id, name=tool_call.name, response=result, ) ] ) except Exception as e: self.emit( "error", f"Error executing function {tool_call.name}: {e}", ) traceback.print_exc() break return accumulated_input_text async def _receive_loop(self, session: GeminiSession) -> None: """Process incoming messages from Gemini""" try: active_response_id = None chunk_number = 0 accumulated_text = "" final_transcription = "" accumulated_input_text = "" while not self._closing: try: async for response in session.session.receive(): if self._closing: break if response.tool_call: accumulated_input_text = await self._handle_tool_calls( response, active_response_id, accumulated_input_text ) if server_content := response.server_content: if ( input_transcription := server_content.input_transcription ): if input_transcription.text: if not self._user_speaking: self.emit("user_speech_started", {}) metrics_collector.on_user_speech_start() metrics_collector.start_turn() self._user_speaking = True self.emit("user_speech_ended", {"type": "done"}) accumulated_input_text += input_transcription.text global_event_emitter.emit( "input_transcription", { "text": accumulated_input_text, "is_final": False, }, ) # If agent is still producing audio, re-emit to restore SPEAKING state # (user speech events override it to LISTENING → THINKING) if self._agent_speaking: self.emit("agent_speech_started", {}) metrics_collector.on_agent_speech_start() if ( output_transcription := server_content.output_transcription ): if output_transcription.text: final_transcription += output_transcription.text global_event_emitter.emit( "output_transcription", { "text": final_transcription, "is_final": False, }, ) if not active_response_id: active_response_id = f"response_{id(response)}" chunk_number = 0 accumulated_text = "" if server_content.interrupted: if self.current_utterance and not self.current_utterance.is_interruptible: logger.info("Interruption is disabled for the current utterance. Ignoring server interrupt signal.") continue if active_response_id: active_response_id = None accumulated_text = "" if ( self.audio_track and "AUDIO" in self.config.response_modalities ): self.audio_track.interrupt() continue if model_turn := server_content.model_turn: if self._user_speaking: metrics_collector.on_user_speech_end() self._user_speaking = False if accumulated_input_text: metrics_collector.set_user_transcript( accumulated_input_text ) try: self.emit( "realtime_model_transcription", { "role": "user", "text": accumulated_input_text, "is_final": True, }, ) except Exception: pass accumulated_input_text = "" for part in model_turn.parts: if ( hasattr(part, "inline_data") and part.inline_data ): raw_audio = part.inline_data.data if not raw_audio or len(raw_audio) < 2: continue if "AUDIO" in self.config.response_modalities: chunk_number += 1 if not self._agent_speaking: self.emit("agent_speech_started", {}) metrics_collector.on_agent_speech_start() self._agent_speaking = True if self.audio_track and self.loop: if len(raw_audio) % 2 != 0: raw_audio += b"\x00" asyncio.create_task( self.audio_track.add_new_bytes( raw_audio ), name=f"audio_chunk_{chunk_number}", ) elif hasattr(part, "text") and part.text: accumulated_text += part.text if server_content.turn_complete and active_response_id: usage_metadata = self.get_usage_details(response.usage_metadata) metrics_collector.set_realtime_usage(usage_metadata) if accumulated_input_text: metrics_collector.set_user_transcript( accumulated_input_text ) accumulated_input_text = "" if final_transcription: metrics_collector.set_agent_response( final_transcription ) try: self.emit( "realtime_model_transcription", { "role": "agent", "text": final_transcription, "is_final": True, }, ) except Exception: pass response_text = None if ( "TEXT" in self.config.response_modalities and accumulated_text ): response_text = accumulated_text global_event_emitter.emit( "text_response", {"type": "done", "text": accumulated_text}, ) elif ( "TEXT" not in self.config.response_modalities and final_transcription ): response_text = final_transcription global_event_emitter.emit( "text_response", {"type": "done", "text": final_transcription}, ) if response_text: self.emit("llm_text_output", {"text": response_text}) active_response_id = None accumulated_text = "" final_transcription = "" if self.audio_track: self.audio_track.mark_synthesis_complete() self._agent_speaking = False except Exception as e: err_msg = str(e) is_server_disconnect = ( "ConnectionClosed" in type(e).__name__ or "1011" in err_msg or "1000" in err_msg ) if is_server_disconnect: logger.info(f"Session ended by server ({err_msg}). Stopping local connection.") # CRITICAL: We set _closing to True to stop the outer loop # from attempting to reconnect. self._closing = True self._session_should_close.set() break logger.error(f"Error in receive loop: {e}") traceback.print_exc() self._session_should_close.set() break await asyncio.sleep(0.1) except asyncio.CancelledError: self.emit("error", "Receive loop cancelled") except Exception as e: self.emit("error", e) traceback.print_exc() self._session_should_close.set() def _is_gemini_3(self) -> bool: """Check if the model is a Gemini 3.x family model.""" return "gemini-3" in self.model async def _send_text(self, session: AsyncSession, text: str, *, turn_complete: bool = True) -> None: """Send text via the appropriate method for the current model. Gemini 3.x restricts ``send_client_content`` to initial history seeding, so all runtime text must go through ``send_realtime_input``. When ``turn_complete`` is True, we bracket the text with activity_start / activity_end signals so the model treats it as a complete user turn and generates a response. """ if self._is_gemini_3(): if turn_complete: await session.send_realtime_input(activity_start={}) await session.send_realtime_input(text=text) if turn_complete: await session.send_realtime_input(activity_end={}) else: await session.send_client_content( turns=[Content(parts=[Part(text=text)], role="user")], turn_complete=turn_complete, ) async def _keep_alive(self, session: GeminiSession) -> None: """Send periodic keep-alive messages via silent audio. Uses a tiny silent audio chunk instead of text to avoid being interpreted as user input (which would trigger model responses). """ # Use the same sample rate as handle_audio_input to avoid # "Sample rate changed" errors (API locks to the first rate it sees). sample_rate = 24000 if self.vertexai else 48000 # 10ms of silence at the chosen rate (16-bit PCM = 2 bytes/sample) silence_samples = sample_rate // 100 # 10ms worth SILENT_AUDIO = b"\x00" * (silence_samples * 2) try: while not self._closing: await asyncio.sleep(10) if self._closing: break try: await session.session.send_realtime_input( audio=Blob(data=SILENT_AUDIO, mime_type=f"audio/pcm;rate={sample_rate}") ) except Exception as e: if "closed" in str(e).lower() or "1011" in str(e): logger.info("Keep-alive detected closed session. Stopping.") self._closing = True self._session_should_close.set() break self.emit("error", f"Keep-alive error: {e}") except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error in keep-alive: {e}") self._session_should_close.set() def _resample_audio(self, audio_bytes: bytes) -> bytes: """Resample audio from input sample rate to output sample rate and convert to mono.""" try: if not audio_bytes: return b'' raw_audio = np.frombuffer(audio_bytes, dtype=np.int16) if raw_audio.size == 0: return b'' if self.vertexai: stereo_audio = raw_audio.reshape(-1, 2) mono_audio = stereo_audio.astype(np.float32).mean(axis=1) else: mono_audio = raw_audio.astype(np.float32) if self.input_sample_rate != self.target_sample_rate: output_length = int(len(mono_audio) * self.target_sample_rate / self.input_sample_rate) resampled_data = signal.resample(mono_audio, output_length) else: resampled_data = mono_audio resampled_data = np.clip(resampled_data, -32767, 32767) return resampled_data.astype(np.int16).tobytes() except Exception as e: logger.error(f"Error resampling audio: {e}") return b'' async def handle_audio_input(self, audio_data: bytes) -> None: """Handle incoming audio data from the user""" if not self._session or self._closing: return if self.current_utterance and not self.current_utterance.is_interruptible: logger.info("Interruption is disabled for the current utterance. Not processing audio input.") return if "AUDIO" not in self.config.response_modalities: return AUDIO_SAMPLE_RATE = 24000 if self.vertexai else 48000 self.target_sample_rate = 16000 if self.vertexai else self.target_sample_rate audio_data = self._resample_audio(audio_data) try: await self._session.session.send_realtime_input( audio=Blob(data=audio_data, mime_type=f"audio/pcm;rate={AUDIO_SAMPLE_RATE}") ) except Exception as e: if "1011" in str(e) or "closed" in str(e).lower(): logger.info("Cannot send audio (session closed).") self._closing = True self._session_should_close.set() else: logger.error(f"Error sending audio: {e}") async def interrupt(self) -> None: """Interrupt current response""" if not self._session or self._closing: return if self.current_utterance and not self.current_utterance.is_interruptible: logger.info("Interruption is disabled for the current utterance. Not interrupting Google Live API.") return try: await self._send_text(self._session.session, "stop") self.emit("agent_speech_ended", {}) metrics_collector.on_interrupted() if self.audio_track and "AUDIO" in self.config.response_modalities: self.audio_track.interrupt() except Exception as e: self.emit("error", f"Interrupt error: {e}") async def send_message(self, message: str) -> None: """Send a text message to get audio response""" retry_count = 0 max_retries = 5 while not self._session or not self._session.session: if retry_count >= max_retries: raise RuntimeError("No active Gemini session after maximum retries") logger.debug("No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: if self._is_gemini_3(): prompt_text = ( "[SYSTEM INSTRUCTION] Repeat the following message out loud VERBATIM. " "Do NOT add any commentary, do NOT answer or continue the conversation. " "Only say these exact words then stop: " + message ) else: prompt_text = ( "Please start the conversation by saying exactly this, " "without any additional text: '" + message + "'" ) await self._send_text(self._session.session, prompt_text) await asyncio.sleep(0.1) except Exception as e: self.emit("error", f"Error sending message: {e}") self._session_should_close.set() async def send_text_message(self, message: str) -> None: """Send a text message for text-only communication""" retry_count = 0 max_retries = 5 while not self._session or not self._session.session: if retry_count >= max_retries: raise RuntimeError("No active Gemini session after maximum retries") self.emit("error", "No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: await self._send_text(self._session.session, message) except Exception as e: self.emit("error", f"Error sending text message: {e}") self._session_should_close.set() async def handle_video_input(self, video_data: av.VideoFrame) -> None: """Improved video input handler with error prevention""" if not self._session or self._closing: return try: if not video_data or not video_data.planes: return now = time.monotonic() if ( hasattr(self, "_last_video_frame") and (now - self._last_video_frame) < 0.5 ): return self._last_video_frame = now processed_jpeg = encode_image(video_data, DEFAULT_IMAGE_ENCODE_OPTIONS) if not processed_jpeg or len(processed_jpeg) < 100: logger.warning("Invalid JPEG data generated") return await self._session.session.send_realtime_input( video=Blob(data=processed_jpeg, mime_type="image/jpeg") ) except Exception as e: self.emit("error", f"Video processing error: {str(e)}") async def send_message_with_frames(self, message: str, frames: list[av.VideoFrame]) -> None: """Send a text message with video frames for vision-enabled communication""" retry_count = 0 max_retries = 5 while not self._session or not self._session.session: if retry_count >= max_retries: raise RuntimeError("No active Gemini session after maximum retries") self.emit("error", "No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: parts = [Part(text=message)] for frame in frames: try: processed_jpeg = encode_image(frame, DEFAULT_IMAGE_ENCODE_OPTIONS) if processed_jpeg and len(processed_jpeg) >= 100: parts.append( Part( inline_data=Blob( data=processed_jpeg, mime_type="image/jpeg" ) ) ) else: logger.warning("Invalid JPEG data generated for frame") except Exception as e: logger.error(f"Error processing frame for send_message_with_frames: {e}") await self._session.session.send_client_content( turns=Content(parts=parts, role="user"), turn_complete=True, ) except Exception as e: self.emit("error", f"Error sending message with frames: {e}") self._session_should_close.set() async def _cleanup_session(self, session: GeminiSession) -> None: """Clean up a session's resources""" for task in session.tasks: if not task.done(): task.cancel() try: await session.session_cm.__aexit__(None, None, None) except Exception as e: if "1011" not in str(e) and "closed" not in str(e).lower(): self.emit("error", f"Error closing session: {e}") async def aclose(self) -> None: """Clean up all resources""" if self._closing: return self._closing = True self._session_should_close.set() if self._audio_processing_task and not self._audio_processing_task.done(): self._audio_processing_task.cancel() try: await asyncio.wait_for(self._audio_processing_task, timeout=1.0) except (asyncio.CancelledError, asyncio.TimeoutError): pass if self._main_task and not self._main_task.done(): self._main_task.cancel() try: await asyncio.wait_for(self._main_task, timeout=2.0) except (asyncio.CancelledError, asyncio.TimeoutError): pass if self._session: await self._cleanup_session(self._session) self._session = None self._buffered_audio = bytearray() await super().aclose() async def _reconnect(self) -> None: if self._session: await self._cleanup_session(self._session) self._session = None self._session = await self._create_session() async def send_tool_response( self, function_responses: List[FunctionResponse] ) -> None: """Send tool responses back to Gemini""" if not self._session or not self._session.session: return try: await self._session.session.send_tool_response( function_responses=function_responses ) except Exception as e: self.emit("error", f"Error sending tool response: {e}") self._session_should_close.set() def _convert_tools_to_gemini_format(self, tools: List[FunctionTool]) -> List[Tool]: """Convert tool definitions to Gemini's Tool format""" function_declarations = [] for tool in tools: if not is_function_tool(tool): continue try: function_declaration = build_gemini_schema(tool) function_declarations.append(function_declaration) except Exception as e: self.emit("error", f"Failed to format tool {tool}: {e}") continue return ( [Tool(function_declarations=function_declarations)] if function_declarations else [] ) def is_native_audio_model(self) -> bool: """Check if the model is a native audio model based on its name""" return self._is_gemini_3() or "gemini-2.5-flash-native-audio-preview-12-2025" in self.model def get_usage_details(self,usage_metadata) -> dict: """ Flatten Gemini Live UsageMetadata into the same pricing dictionary format. Supports TEXT/AUDIO/IMAGE breakdown + thoughts tokens. """ total_tokens = getattr(usage_metadata, "total_token_count", 0) input_tokens = getattr(usage_metadata, "prompt_token_count", 0) output_tokens = getattr(usage_metadata, "response_token_count", 0) thoughts_tokens = getattr(usage_metadata, "thoughts_token_count", 0) prompt_details = getattr(usage_metadata, "prompt_tokens_details", None) or [] response_details = getattr(usage_metadata, "response_tokens_details", None) or [] input_text_tokens = 0 input_audio_tokens = 0 input_image_tokens = 0 output_text_tokens = 0 output_audio_tokens = 0 output_image_tokens = 0 for item in prompt_details: modality = str(getattr(item, "modality", "")).upper() count = getattr(item, "token_count", 0) if "TEXT" in modality: input_text_tokens += count elif "AUDIO" in modality: input_audio_tokens += count elif "IMAGE" in modality: input_image_tokens += count for item in response_details: modality = str(getattr(item, "modality", "")).upper() count = getattr(item, "token_count", 0) if "TEXT" in modality: output_text_tokens += count elif "AUDIO" in modality: output_audio_tokens += count elif "IMAGE" in modality: output_image_tokens += count return { "total_tokens": total_tokens, "input_tokens": input_tokens, "output_tokens": output_tokens, "input_text_tokens": input_text_tokens, "input_audio_tokens": input_audio_tokens, "input_image_tokens": input_image_tokens, "output_text_tokens": output_text_tokens, "output_audio_tokens": output_audio_tokens, "output_image_tokens": output_image_tokens, "thoughts_tokens": thoughts_tokens, }Gemini's realtime model for audio-only communication
Initialize Gemini realtime model.
Args
api_key- Gemini API key. If not provided, will attempt to read from GOOGLE_API_KEY env var
service_account_path- Path to Google service account JSON file.
model- The Gemini model identifier to use (e.g. 'gemini-pro', 'gemini-pro-vision').
config- Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID for audio output ('Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'). Defaults to 'Puck' - language_code: Language code for speech synthesis. Defaults to 'en-US' - temperature: Controls randomness in responses. Higher values (0.8) more random, lower (0.2) more focused - top_p: Nucleus sampling parameter. Controls diversity via probability cutoff. Range 0-1 - top_k: Limits number of tokens considered for each generation step - candidate_count: Number of response candidates to generate. Defaults to 1 - max_output_tokens: Maximum tokens allowed in model responses - presence_penalty: Penalizes token presence in text. Range -2.0 to 2.0 - frequency_penalty: Penalizes token frequency in text. Range -2.0 to 2.0 - response_modalities: List of enabled response types ["TEXT", "AUDIO"]. Defaults to ["AUDIO"] - input_audio_transcription: Configuration for audio transcription features - output_audio_transcription: Configuration for audio transcription features - thinking_config: Configuration for model's "thinking" behavior - realtime_input_config: Configuration for realtime input handling
Raises
ValueError- If neither api_key nor service_account_path is provided and no GOOGLE_API_KEY in env vars
Ancestors
- videosdk.agents.realtime_base_model.RealtimeBaseModel
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
- abc.ABC
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Clean up all resources""" if self._closing: return self._closing = True self._session_should_close.set() if self._audio_processing_task and not self._audio_processing_task.done(): self._audio_processing_task.cancel() try: await asyncio.wait_for(self._audio_processing_task, timeout=1.0) except (asyncio.CancelledError, asyncio.TimeoutError): pass if self._main_task and not self._main_task.done(): self._main_task.cancel() try: await asyncio.wait_for(self._main_task, timeout=2.0) except (asyncio.CancelledError, asyncio.TimeoutError): pass if self._session: await self._cleanup_session(self._session) self._session = None self._buffered_audio = bytearray() await super().aclose()Clean up all resources
async def connect(self) ‑> None-
Expand source code
async def connect(self) -> None: """Connect to the Gemini Live API""" if self._session: await self._cleanup_session(self._session) self._session = None self._closing = False self._session_should_close.clear() try: if not self.audio_track and "AUDIO" in self.config.response_modalities: logger.warning("audio_track not set — it should be assigned externally by the pipeline before connect().") try: initial_session = await self._create_session() if initial_session: self._session = initial_session except Exception as e: self.emit("error", f"Initial session creation failed, will retry: {e}") if not self._main_task or self._main_task.done(): self._main_task = asyncio.create_task( self._session_loop(), name="gemini-main-loop" ) except Exception as e: self.emit("error", f"Error connecting to Gemini Live API: {e}") traceback.print_exc() raiseConnect to the Gemini Live API
def get_usage_details(self, usage_metadata) ‑> dict-
Expand source code
def get_usage_details(self,usage_metadata) -> dict: """ Flatten Gemini Live UsageMetadata into the same pricing dictionary format. Supports TEXT/AUDIO/IMAGE breakdown + thoughts tokens. """ total_tokens = getattr(usage_metadata, "total_token_count", 0) input_tokens = getattr(usage_metadata, "prompt_token_count", 0) output_tokens = getattr(usage_metadata, "response_token_count", 0) thoughts_tokens = getattr(usage_metadata, "thoughts_token_count", 0) prompt_details = getattr(usage_metadata, "prompt_tokens_details", None) or [] response_details = getattr(usage_metadata, "response_tokens_details", None) or [] input_text_tokens = 0 input_audio_tokens = 0 input_image_tokens = 0 output_text_tokens = 0 output_audio_tokens = 0 output_image_tokens = 0 for item in prompt_details: modality = str(getattr(item, "modality", "")).upper() count = getattr(item, "token_count", 0) if "TEXT" in modality: input_text_tokens += count elif "AUDIO" in modality: input_audio_tokens += count elif "IMAGE" in modality: input_image_tokens += count for item in response_details: modality = str(getattr(item, "modality", "")).upper() count = getattr(item, "token_count", 0) if "TEXT" in modality: output_text_tokens += count elif "AUDIO" in modality: output_audio_tokens += count elif "IMAGE" in modality: output_image_tokens += count return { "total_tokens": total_tokens, "input_tokens": input_tokens, "output_tokens": output_tokens, "input_text_tokens": input_text_tokens, "input_audio_tokens": input_audio_tokens, "input_image_tokens": input_image_tokens, "output_text_tokens": output_text_tokens, "output_audio_tokens": output_audio_tokens, "output_image_tokens": output_image_tokens, "thoughts_tokens": thoughts_tokens, }Flatten Gemini Live UsageMetadata into the same pricing dictionary format.
Supports TEXT/AUDIO/IMAGE breakdown + thoughts tokens.
async def handle_audio_input(self, audio_data: bytes) ‑> None-
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None: """Handle incoming audio data from the user""" if not self._session or self._closing: return if self.current_utterance and not self.current_utterance.is_interruptible: logger.info("Interruption is disabled for the current utterance. Not processing audio input.") return if "AUDIO" not in self.config.response_modalities: return AUDIO_SAMPLE_RATE = 24000 if self.vertexai else 48000 self.target_sample_rate = 16000 if self.vertexai else self.target_sample_rate audio_data = self._resample_audio(audio_data) try: await self._session.session.send_realtime_input( audio=Blob(data=audio_data, mime_type=f"audio/pcm;rate={AUDIO_SAMPLE_RATE}") ) except Exception as e: if "1011" in str(e) or "closed" in str(e).lower(): logger.info("Cannot send audio (session closed).") self._closing = True self._session_should_close.set() else: logger.error(f"Error sending audio: {e}")Handle incoming audio data from the user
async def handle_video_input(self, video_data: av.VideoFrame) ‑> None-
Expand source code
async def handle_video_input(self, video_data: av.VideoFrame) -> None: """Improved video input handler with error prevention""" if not self._session or self._closing: return try: if not video_data or not video_data.planes: return now = time.monotonic() if ( hasattr(self, "_last_video_frame") and (now - self._last_video_frame) < 0.5 ): return self._last_video_frame = now processed_jpeg = encode_image(video_data, DEFAULT_IMAGE_ENCODE_OPTIONS) if not processed_jpeg or len(processed_jpeg) < 100: logger.warning("Invalid JPEG data generated") return await self._session.session.send_realtime_input( video=Blob(data=processed_jpeg, mime_type="image/jpeg") ) except Exception as e: self.emit("error", f"Video processing error: {str(e)}")Improved video input handler with error prevention
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Interrupt current response""" if not self._session or self._closing: return if self.current_utterance and not self.current_utterance.is_interruptible: logger.info("Interruption is disabled for the current utterance. Not interrupting Google Live API.") return try: await self._send_text(self._session.session, "stop") self.emit("agent_speech_ended", {}) metrics_collector.on_interrupted() if self.audio_track and "AUDIO" in self.config.response_modalities: self.audio_track.interrupt() except Exception as e: self.emit("error", f"Interrupt error: {e}")Interrupt current response
def is_native_audio_model(self) ‑> bool-
Expand source code
def is_native_audio_model(self) -> bool: """Check if the model is a native audio model based on its name""" return self._is_gemini_3() or "gemini-2.5-flash-native-audio-preview-12-2025" in self.modelCheck if the model is a native audio model based on its name
async def send_message(self, message: str) ‑> None-
Expand source code
async def send_message(self, message: str) -> None: """Send a text message to get audio response""" retry_count = 0 max_retries = 5 while not self._session or not self._session.session: if retry_count >= max_retries: raise RuntimeError("No active Gemini session after maximum retries") logger.debug("No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: if self._is_gemini_3(): prompt_text = ( "[SYSTEM INSTRUCTION] Repeat the following message out loud VERBATIM. " "Do NOT add any commentary, do NOT answer or continue the conversation. " "Only say these exact words then stop: " + message ) else: prompt_text = ( "Please start the conversation by saying exactly this, " "without any additional text: '" + message + "'" ) await self._send_text(self._session.session, prompt_text) await asyncio.sleep(0.1) except Exception as e: self.emit("error", f"Error sending message: {e}") self._session_should_close.set()Send a text message to get audio response
async def send_message_with_frames(self, message: str, frames: list[av.VideoFrame]) ‑> None-
Expand source code
async def send_message_with_frames(self, message: str, frames: list[av.VideoFrame]) -> None: """Send a text message with video frames for vision-enabled communication""" retry_count = 0 max_retries = 5 while not self._session or not self._session.session: if retry_count >= max_retries: raise RuntimeError("No active Gemini session after maximum retries") self.emit("error", "No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: parts = [Part(text=message)] for frame in frames: try: processed_jpeg = encode_image(frame, DEFAULT_IMAGE_ENCODE_OPTIONS) if processed_jpeg and len(processed_jpeg) >= 100: parts.append( Part( inline_data=Blob( data=processed_jpeg, mime_type="image/jpeg" ) ) ) else: logger.warning("Invalid JPEG data generated for frame") except Exception as e: logger.error(f"Error processing frame for send_message_with_frames: {e}") await self._session.session.send_client_content( turns=Content(parts=parts, role="user"), turn_complete=True, ) except Exception as e: self.emit("error", f"Error sending message with frames: {e}") self._session_should_close.set()Send a text message with video frames for vision-enabled communication
async def send_text_message(self, message: str) ‑> None-
Expand source code
async def send_text_message(self, message: str) -> None: """Send a text message for text-only communication""" retry_count = 0 max_retries = 5 while not self._session or not self._session.session: if retry_count >= max_retries: raise RuntimeError("No active Gemini session after maximum retries") self.emit("error", "No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: await self._send_text(self._session.session, message) except Exception as e: self.emit("error", f"Error sending text message: {e}") self._session_should_close.set()Send a text message for text-only communication
async def send_tool_response(self, function_responses: List[FunctionResponse]) ‑> None-
Expand source code
async def send_tool_response( self, function_responses: List[FunctionResponse] ) -> None: """Send tool responses back to Gemini""" if not self._session or not self._session.session: return try: await self._session.session.send_tool_response( function_responses=function_responses ) except Exception as e: self.emit("error", f"Error sending tool response: {e}") self._session_should_close.set()Send tool responses back to Gemini
def set_agent(self, agent: Agent) ‑> None-
Expand source code
def set_agent(self, agent: Agent) -> None: self._instructions = agent.instructions self.tools = agent.tools self.tools_formatted = self._convert_tools_to_gemini_format(self.tools) self.formatted_tools = self.tools_formatted
class GoogleLLM (*,
api_key: str | None = None,
model: str = 'gemini-2.5-flash-lite',
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_output_tokens: int | None = None,
top_p: float | None = None,
top_k: int | None = None,
presence_penalty: float | None = None,
frequency_penalty: float | None = None,
vertexai: bool = False,
vertexai_config: VertexAIConfig | None = None,
thinking_budget: int | None = 0,
seed: int | None = None,
safety_settings: list | None = None,
http_options: Any | None = None,
thinking_level: str | None = None,
include_thoughts: bool | None = None)-
Expand source code
class GoogleLLM(LLM): def __init__( self, *, api_key: str | None = None, model: str = "gemini-2.5-flash-lite", temperature: float = 0.7, tool_choice: ToolChoice = "auto", max_output_tokens: int | None = None, top_p: float | None = None, top_k: int | None = None, presence_penalty: float | None = None, frequency_penalty: float | None = None, vertexai: bool = False, vertexai_config: VertexAIConfig | None = None, thinking_budget: int | None = 0, seed: int | None = None, safety_settings: list | None = None, http_options: Any | None = None, thinking_level: str | None = None, include_thoughts: bool | None = None, ) -> None: """Initialize the Google LLM plugin. Args: api_key: Google API key. Falls back to ``GOOGLE_API_KEY`` env var. model: Gemini model name. Defaults to "gemini-2.5-flash-lite". temperature: Sampling temperature. tool_choice: Controls which (if any) tool is called. max_output_tokens: Maximum tokens in the response. top_p: Nucleus sampling probability mass. top_k: Top-K tokens considered during sampling. presence_penalty: Penalise tokens that have already appeared. frequency_penalty: Penalise repeated tokens by frequency. vertexai: Use Vertex AI backend instead of the public Gemini API. vertexai_config: Vertex AI project/location override. thinking_budget: Token budget for extended thinking (Gemini 2.5). Set to ``None`` to disable the thinking config entirely. seed: Seed for deterministic sampling. safety_settings: List of ``types.SafetySetting`` / dicts forwarded to the API. http_options: ``types.HttpOptions`` for the underlying Google client. thinking_level: Thinking effort level for Gemini 3+ models (``"low"``, ``"medium"``, ``"high"``, ``"minimal"``). include_thoughts: Whether to surface model thoughts in the response. """ super().__init__() self.api_key = api_key or os.getenv("GOOGLE_API_KEY") self.vertexai = vertexai self.vertexai_config = vertexai_config if not self.vertexai and not self.api_key: raise ValueError( "For VertexAI: Set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to the path of the service account key file. " "The Google Cloud project and location can be set via VertexAIConfig or the environment variables `GOOGLE_CLOUD_PROJECT` and `GOOGLE_CLOUD_LOCATION`. location defaults to `us-central1`. " "For Google Gemini API: Set the `api_key` argument or the `GOOGLE_API_KEY` environment variable." ) self.model = model self.temperature = temperature self.tool_choice = tool_choice self.max_output_tokens = max_output_tokens self.top_p = top_p self.top_k = top_k self.presence_penalty = presence_penalty self.frequency_penalty = frequency_penalty self.thinking_budget = thinking_budget self.seed = seed self.safety_settings = safety_settings self.http_options = http_options self.thinking_level = thinking_level self.include_thoughts = include_thoughts self._cancelled = False self._thought_signatures: dict[str, bytes] = {} self._http_client: httpx.AsyncClient | None = None if self.vertexai: project_id = (self.vertexai_config.project_id if self.vertexai_config else None) or os.getenv("GOOGLE_CLOUD_PROJECT") if project_id is None: service_account_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") if service_account_path: from google.oauth2 import service_account creds = service_account.Credentials.from_service_account_file(service_account_path) project_id = creds.project_id location = (self.vertexai_config.location if self.vertexai_config else None) or os.getenv("GOOGLE_CLOUD_LOCATION") or "us-central1" self._client = Client( vertexai=True, project=project_id, location=location, ) else: if not self.api_key: raise ValueError("GOOGLE_API_KEY required") self._client = Client(api_key=self.api_key) def _build_thinking_config(self) -> "types.ThinkingConfig | None": """Return the appropriate ThinkingConfig for the configured model.""" if "gemini-3" in self.model: return types.ThinkingConfig(thinking_level=self.thinking_level or "low") if self.thinking_budget is not None: return types.ThinkingConfig( thinking_budget=self.thinking_budget, **({"include_thoughts": self.include_thoughts} if self.include_thoughts is not None else {}), ) return None async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, conversational_graph: Any | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Implement chat functionality using Google's Gemini API. Args: messages: ChatContext containing conversation history. tools: Optional list of function tools available to the model. **kwargs: Additional arguments passed to the Google API. Yields: LLMResponse objects containing the model's responses. """ self._cancelled = False try: ( contents, system_instruction, ) = await self._convert_messages_to_contents_async(messages) config_params: dict = { "temperature": self.temperature, **kwargs } thinking_config = self._build_thinking_config() if thinking_config is not None: config_params["thinking_config"] = thinking_config if conversational_graph: config_params["response_mime_type"] = "application/json" config_params["response_json_schema"] = ConversationalGraphResponse.model_json_schema() if system_instruction: config_params["system_instruction"] = [types.Part(text=system_instruction)] if self.max_output_tokens is not None: config_params["max_output_tokens"] = self.max_output_tokens if self.top_p is not None: config_params["top_p"] = self.top_p if self.top_k is not None: config_params["top_k"] = self.top_k if self.presence_penalty is not None: config_params["presence_penalty"] = self.presence_penalty if self.frequency_penalty is not None: config_params["frequency_penalty"] = self.frequency_penalty if self.seed is not None: config_params["seed"] = self.seed if self.safety_settings is not None: config_params["safety_settings"] = self.safety_settings if self.http_options is not None: config_params["http_options"] = self.http_options if tools: function_declarations = [] for tool in tools: if is_function_tool(tool): try: gemini_tool = build_gemini_schema(tool) function_declarations.append(gemini_tool) except Exception as e: logger.error(f"Failed to format tool {tool}: {e}") continue if function_declarations: config_params["tools"] = [types.Tool(function_declarations=function_declarations)] if self.tool_choice == "required": config_params["tool_config"] = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode=types.FunctionCallingConfigMode.ANY ) ) elif self.tool_choice == "auto": config_params["tool_config"] = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode=types.FunctionCallingConfigMode.AUTO ) ) elif self.tool_choice == "none": config_params["tool_config"] = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode=types.FunctionCallingConfigMode.NONE ) ) config = types.GenerateContentConfig(**config_params) response_stream = None try: response_stream = await self._client.aio.models.generate_content_stream( model=self.model, contents=contents, config=config, ) current_content = "" current_function_calls: list = [] usage = None streaming_state = { "in_response": False, "response_start_index": -1, "yielded_content_length": 0, } async for response in response_stream: if self._cancelled: break if response.prompt_feedback: error_msg = f"Prompt feedback error: {response.prompt_feedback}" self.emit("error", Exception(error_msg)) raise Exception(error_msg) if not response.candidates or not response.candidates[0].content: continue candidate = response.candidates[0] if not candidate.content.parts: continue if response.usage_metadata: usage = { "prompt_tokens": response.usage_metadata.prompt_token_count, "completion_tokens": response.usage_metadata.candidates_token_count, "total_tokens": response.usage_metadata.total_token_count, "prompt_cached_tokens": getattr( response.usage_metadata, "cached_content_token_count", 0 ), } for part in candidate.content.parts: if part.function_call: function_call = { "name": part.function_call.name, "arguments": dict(part.function_call.args), } # Capture thought_signature for multi-turn tool calling continuity thought_sig = getattr(part, "thought_signature", None) if thought_sig: self._thought_signatures[part.function_call.name] = thought_sig function_call["thought_signature"] = base64.b64encode(thought_sig).decode("utf-8") current_function_calls.append(function_call) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"function_call": function_call, "usage": usage}, ) elif part.text: current_content += part.text if conversational_graph: for content_chunk in conversational_graph.stream_conversational_graph_response( current_content, streaming_state ): yield LLMResponse( content=content_chunk, role=ChatRole.ASSISTANT, metadata={"usage": usage}, ) else: yield LLMResponse( content=part.text, role=ChatRole.ASSISTANT, metadata={"usage": usage}, ) if current_content and not self._cancelled and conversational_graph: try: parsed_json = json.loads(current_content.strip()) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"graph_response": parsed_json, "usage": usage}, ) except json.JSONDecodeError: yield LLMResponse( content=current_content, role=ChatRole.ASSISTANT, metadata={"usage": usage}, ) finally: if response_stream is not None: try: await response_stream.close() except Exception: pass except (ClientError, ServerError, APIError) as e: if not self._cancelled: error_msg = f"Google API error: {e}" self.emit("error", Exception(error_msg)) raise Exception(error_msg) from e except Exception as e: if not self._cancelled: self.emit("error", e) raise async def cancel_current_generation(self) -> None: self._cancelled = True def _get_http_client(self) -> httpx.AsyncClient: """Return a shared httpx client (lazy-initialised).""" if self._http_client is None or self._http_client.is_closed: timeout_seconds = None if self.http_options is not None: timeout_seconds = getattr(self.http_options, "timeout", None) self._http_client = httpx.AsyncClient( timeout=httpx.Timeout(timeout_seconds or 30.0), follow_redirects=True, ) return self._http_client async def _convert_messages_to_contents_async( self, messages: ChatContext ) -> tuple[list[types.Content], str | None]: """Convert ChatContext to Google Content format.""" async def _format_content_parts_async( content: Union[str, List[ChatContent]] ) -> List[types.Part]: if isinstance(content, str): return [types.Part(text=content)] if len(content) == 1 and isinstance(content[0], str): return [types.Part(text=content[0])] formatted_parts = [] for part in content: if isinstance(part, str): formatted_parts.append(types.Part(text=part)) elif isinstance(part, ImageContent): data_url = part.to_data_url() if data_url.startswith("data:"): header, b64_data = data_url.split(",", 1) media_type = header.split(";")[0].split(":")[1] image_bytes = base64.b64decode(b64_data) formatted_parts.append( types.Part( inline_data=types.Blob( mime_type=media_type, data=image_bytes ) ) ) else: # Reuse the shared client; do NOT create a new one per image client = self._get_http_client() try: response = await client.get(data_url) response.raise_for_status() image_bytes = response.content media_type = response.headers.get("Content-Type", "image/jpeg") formatted_parts.append( types.Part( inline_data=types.Blob( mime_type=media_type, data=image_bytes ) ) ) except httpx.HTTPStatusError as e: logger.error(f"Failed to fetch image from URL {data_url}: {e}") continue return formatted_parts contents = [] system_instruction = None for item in messages.items: if isinstance(item, ChatMessage): if item.role == ChatRole.SYSTEM: if isinstance(item.content, list): system_instruction = next( (str(p) for p in item.content if isinstance(p, str)), "" ) else: system_instruction = str(item.content) continue elif item.role == ChatRole.USER: parts = await _format_content_parts_async(item.content) contents.append(types.Content(role="user", parts=parts)) elif item.role == ChatRole.ASSISTANT: parts = await _format_content_parts_async(item.content) contents.append(types.Content(role="model", parts=parts)) elif isinstance(item, FunctionCall): args = ( json.loads(item.arguments) if isinstance(item.arguments, str) else item.arguments ) function_call = types.FunctionCall(name=item.name, args=args) fc_part = types.Part(function_call=function_call) thought_sig_b64 = (item.metadata or {}).get("thought_signature") if thought_sig_b64: try: sig_bytes = base64.b64decode(thought_sig_b64) fc_part = types.Part( function_call=function_call, thought_signature=sig_bytes, ) except Exception: pass elif item.name in self._thought_signatures: fc_part = types.Part( function_call=function_call, thought_signature=self._thought_signatures[item.name], ) contents.append( types.Content(role="model", parts=[fc_part]) ) elif isinstance(item, FunctionCallOutput): function_response = types.FunctionResponse( name=item.name, response={"output": item.output} ) contents.append( types.Content( role="user", parts=[types.Part(function_response=function_response)] ) ) return contents, system_instruction async def aclose(self) -> None: await self.cancel_current_generation() if self._http_client is not None and not self._http_client.is_closed: await self._http_client.aclose() await super().aclose()Base class for LLM implementations.
Initialize the Google LLM plugin.
Args
api_key- Google API key. Falls back to
GOOGLE_API_KEYenv var. model- Gemini model name. Defaults to "gemini-2.5-flash-lite".
temperature- Sampling temperature.
tool_choice- Controls which (if any) tool is called.
max_output_tokens- Maximum tokens in the response.
top_p- Nucleus sampling probability mass.
top_k- Top-K tokens considered during sampling.
presence_penalty- Penalise tokens that have already appeared.
frequency_penalty- Penalise repeated tokens by frequency.
vertexai- Use Vertex AI backend instead of the public Gemini API.
vertexai_config- Vertex AI project/location override.
thinking_budget- Token budget for extended thinking (Gemini 2.5).
Set to
Noneto disable the thinking config entirely. seed- Seed for deterministic sampling.
safety_settings- List of
types.SafetySetting/ dicts forwarded to the API. http_optionstypes.HttpOptionsfor the underlying Google client.thinking_level- Thinking effort level for Gemini 3+ models
(
"low","medium","high","minimal"). include_thoughts- Whether to surface model thoughts in the response.
Ancestors
- videosdk.agents.llm.llm.LLM
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: await self.cancel_current_generation() if self._http_client is not None and not self._http_client.is_closed: await self._http_client.aclose() await super().aclose()Cleanup resources.
async def cancel_current_generation(self) ‑> None-
Expand source code
async def cancel_current_generation(self) -> None: self._cancelled = TrueCancel the current LLM generation if active.
Raises
NotImplementedError- This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
conversational_graph: Any | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]-
Expand source code
async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, conversational_graph: Any | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Implement chat functionality using Google's Gemini API. Args: messages: ChatContext containing conversation history. tools: Optional list of function tools available to the model. **kwargs: Additional arguments passed to the Google API. Yields: LLMResponse objects containing the model's responses. """ self._cancelled = False try: ( contents, system_instruction, ) = await self._convert_messages_to_contents_async(messages) config_params: dict = { "temperature": self.temperature, **kwargs } thinking_config = self._build_thinking_config() if thinking_config is not None: config_params["thinking_config"] = thinking_config if conversational_graph: config_params["response_mime_type"] = "application/json" config_params["response_json_schema"] = ConversationalGraphResponse.model_json_schema() if system_instruction: config_params["system_instruction"] = [types.Part(text=system_instruction)] if self.max_output_tokens is not None: config_params["max_output_tokens"] = self.max_output_tokens if self.top_p is not None: config_params["top_p"] = self.top_p if self.top_k is not None: config_params["top_k"] = self.top_k if self.presence_penalty is not None: config_params["presence_penalty"] = self.presence_penalty if self.frequency_penalty is not None: config_params["frequency_penalty"] = self.frequency_penalty if self.seed is not None: config_params["seed"] = self.seed if self.safety_settings is not None: config_params["safety_settings"] = self.safety_settings if self.http_options is not None: config_params["http_options"] = self.http_options if tools: function_declarations = [] for tool in tools: if is_function_tool(tool): try: gemini_tool = build_gemini_schema(tool) function_declarations.append(gemini_tool) except Exception as e: logger.error(f"Failed to format tool {tool}: {e}") continue if function_declarations: config_params["tools"] = [types.Tool(function_declarations=function_declarations)] if self.tool_choice == "required": config_params["tool_config"] = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode=types.FunctionCallingConfigMode.ANY ) ) elif self.tool_choice == "auto": config_params["tool_config"] = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode=types.FunctionCallingConfigMode.AUTO ) ) elif self.tool_choice == "none": config_params["tool_config"] = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode=types.FunctionCallingConfigMode.NONE ) ) config = types.GenerateContentConfig(**config_params) response_stream = None try: response_stream = await self._client.aio.models.generate_content_stream( model=self.model, contents=contents, config=config, ) current_content = "" current_function_calls: list = [] usage = None streaming_state = { "in_response": False, "response_start_index": -1, "yielded_content_length": 0, } async for response in response_stream: if self._cancelled: break if response.prompt_feedback: error_msg = f"Prompt feedback error: {response.prompt_feedback}" self.emit("error", Exception(error_msg)) raise Exception(error_msg) if not response.candidates or not response.candidates[0].content: continue candidate = response.candidates[0] if not candidate.content.parts: continue if response.usage_metadata: usage = { "prompt_tokens": response.usage_metadata.prompt_token_count, "completion_tokens": response.usage_metadata.candidates_token_count, "total_tokens": response.usage_metadata.total_token_count, "prompt_cached_tokens": getattr( response.usage_metadata, "cached_content_token_count", 0 ), } for part in candidate.content.parts: if part.function_call: function_call = { "name": part.function_call.name, "arguments": dict(part.function_call.args), } # Capture thought_signature for multi-turn tool calling continuity thought_sig = getattr(part, "thought_signature", None) if thought_sig: self._thought_signatures[part.function_call.name] = thought_sig function_call["thought_signature"] = base64.b64encode(thought_sig).decode("utf-8") current_function_calls.append(function_call) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"function_call": function_call, "usage": usage}, ) elif part.text: current_content += part.text if conversational_graph: for content_chunk in conversational_graph.stream_conversational_graph_response( current_content, streaming_state ): yield LLMResponse( content=content_chunk, role=ChatRole.ASSISTANT, metadata={"usage": usage}, ) else: yield LLMResponse( content=part.text, role=ChatRole.ASSISTANT, metadata={"usage": usage}, ) if current_content and not self._cancelled and conversational_graph: try: parsed_json = json.loads(current_content.strip()) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"graph_response": parsed_json, "usage": usage}, ) except json.JSONDecodeError: yield LLMResponse( content=current_content, role=ChatRole.ASSISTANT, metadata={"usage": usage}, ) finally: if response_stream is not None: try: await response_stream.close() except Exception: pass except (ClientError, ServerError, APIError) as e: if not self._cancelled: error_msg = f"Google API error: {e}" self.emit("error", Exception(error_msg)) raise Exception(error_msg) from e except Exception as e: if not self._cancelled: self.emit("error", e) raiseImplement chat functionality using Google's Gemini API.
Args
messages- ChatContext containing conversation history.
tools- Optional list of function tools available to the model.
**kwargs- Additional arguments passed to the Google API.
Yields
LLMResponse objects containing the model's responses.
class GoogleSTT (*,
api_key: Optional[str] = None,
languages: Union[str, list[str]] = 'en-US',
model: str = 'latest_long',
sample_rate: int = 16000,
interim_results: bool = True,
punctuate: bool = True,
min_confidence_threshold: float = 0.1,
location: str = 'global',
profanity_filter: bool = False,
enable_voice_activity_events: bool = False,
voice_activity_timeout: VoiceActivityConfig = None,
**kwargs: Any)-
Expand source code
class GoogleSTT(BaseSTT): def __init__( self, *, api_key: Optional[str] = None, languages: Union[str, list[str]] = "en-US", model: str = "latest_long", sample_rate: int = 16000, interim_results: bool = True, punctuate: bool = True, min_confidence_threshold: float = 0.1, location: str = "global", profanity_filter:bool=False, enable_voice_activity_events:bool=False, voice_activity_timeout:VoiceActivityConfig = None, **kwargs: Any ) -> None: """Initialize the Google STT plugin. Args: api_key (Optional[str], optional): Google API key. Defaults to None. languages (Union[str, list[str]]): The languages to use for the STT plugin. Defaults to "en-US". model (str): The model to use for the STT plugin. Defaults to "latest_long". sample_rate (int): The sample rate to use for the STT plugin. Defaults to 16000. interim_results (bool): Whether to use interim results for the STT plugin. Defaults to True. punctuate (bool): Whether to use punctuation for the STT plugin. Defaults to True. min_confidence_threshold (float): The minimum confidence threshold for the STT plugin. Defaults to 0.1. location (str): The location to use for the STT plugin. Defaults to "global". profanity_filter(bool): detect profane words and return only the first letter followed by asterisks in the transcript enable_voice_activity_events(bool): whether to enable voice activity timeout. voice_activity_timeout (VoiceActivityConfig): configure speech_start_timeout and speech_end_timeout. """ super().__init__() if not GOOGLE_V2_AVAILABLE: logger.error("Google Cloud Speech V2 is not available") raise ImportError("google-cloud-speech is not installed") if api_key: os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = api_key try: gauth_default() except DefaultCredentialsError as e: logger.error("Google credentials are not configured", exc_info=True) raise ValueError("Google credentials are not configured.") from e self.input_sample_rate = 48000 self.target_sample_rate = sample_rate if isinstance(languages, str): languages = [languages] self._config = { "languages": languages, "model": model, "sample_rate": self.target_sample_rate, "interim_results": interim_results, "punctuate": punctuate, "min_confidence_threshold": min_confidence_threshold, "location": location, "profanity_filter":profanity_filter, "voice_activity_timeout":voice_activity_timeout, "enable_voice_activity_events":enable_voice_activity_events, "speech_start_timeout":voice_activity_timeout.speech_start_timeout if voice_activity_timeout else None, "speech_end_timeout":voice_activity_timeout.speech_end_timeout if voice_activity_timeout else None, } self._client: Optional[SpeechAsyncClient] = None self._stream: Optional[SpeechStream] = None async def _ensure_client(self): if self._client: return try: opts = None if self._config["location"] != "global": opts = ClientOptions(api_endpoint=f"{self._config['location']}-speech.googleapis.com") self._client = SpeechAsyncClient(client_options=opts) except Exception as e: logger.error("Failed to create SpeechAsyncClient", exc_info=True) raise e async def process_audio(self, audio_frames: bytes, **kwargs: Any) -> None: try: if not self._stream: await self._start_stream() if self._stream: if SCIPY_AVAILABLE: try: audio_data = np.frombuffer(audio_frames, dtype=np.int16) resampled_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate)) resampled_bytes = resampled_data.astype(np.int16).tobytes() await self._stream.push_audio(resampled_bytes) except Exception as e: logger.error("Error resampling audio", exc_info=True) self.emit("error", {"message": "Error resampling audio", "error": str(e)}) else: await self._stream.push_audio(audio_frames) except Exception as e: logger.error("process_audio failed", exc_info=True) if self._stream: self.emit("error", {"message": "Failed to process audio", "error": str(e)}) async def _start_stream(self): await self._ensure_client() try: self._stream = SpeechStream(self._client, self._config, self._transcript_callback) await self._stream.start() except Exception as e: logger.error("Failed to start SpeechStream", exc_info=True) raise e async def aclose(self) -> None: try: if self._stream: await self._stream.close() self._stream = None self._client = None except Exception as e: logger.error("Error during aclose", exc_info=True)Base class for Speech-to-Text implementations
Initialize the Google STT plugin.
Args
api_key:Optional[str], optional- Google API key. Defaults to None.
languages:Union[str, list[str]]- The languages to use for the STT plugin. Defaults to "en-US".
model:str- The model to use for the STT plugin. Defaults to "latest_long".
sample_rate:int- The sample rate to use for the STT plugin. Defaults to 16000.
interim_results:bool- Whether to use interim results for the STT plugin. Defaults to True.
punctuate:bool- Whether to use punctuation for the STT plugin. Defaults to True.
min_confidence_threshold:float- The minimum confidence threshold for the STT plugin. Defaults to 0.1.
location:str- The location to use for the STT plugin. Defaults to "global".
- profanity_filter(bool): detect profane words and return only the first letter followed by asterisks in the transcript
- enable_voice_activity_events(bool): whether to enable voice activity timeout.
voice_activity_timeout:VoiceActivityConfig- configure speech_start_timeout and speech_end_timeout.
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: try: if self._stream: await self._stream.close() self._stream = None self._client = None except Exception as e: logger.error("Error during aclose", exc_info=True)Cleanup resources
async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None-
Expand source code
async def process_audio(self, audio_frames: bytes, **kwargs: Any) -> None: try: if not self._stream: await self._start_stream() if self._stream: if SCIPY_AVAILABLE: try: audio_data = np.frombuffer(audio_frames, dtype=np.int16) resampled_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate)) resampled_bytes = resampled_data.astype(np.int16).tobytes() await self._stream.push_audio(resampled_bytes) except Exception as e: logger.error("Error resampling audio", exc_info=True) self.emit("error", {"message": "Error resampling audio", "error": str(e)}) else: await self._stream.push_audio(audio_frames) except Exception as e: logger.error("process_audio failed", exc_info=True) if self._stream: self.emit("error", {"message": "Failed to process audio", "error": str(e)})Process audio frames and convert to text
Args
audio_frames- Iterator of bytes to process
language- Optional language code for recognition
**kwargs- Additional provider-specific arguments
Returns
AsyncIterator yielding STTResponse objects
class GoogleTTS (*,
api_key: str | None = None,
speed: float = 1.0,
pitch: float = 0.0,
response_format: "Literal['pcm']" = 'pcm',
voice_config: GoogleVoiceConfig | None = None,
custom_pronunciations: list[dict] | dict | None = None,
vertexai: bool = False,
vertexai_config: VertexAIConfig | None = None,
streaming: bool = True)-
Expand source code
class GoogleTTS(TTS): def __init__( self, *, api_key: str | None = None, speed: float = 1.0, pitch: float = 0.0, response_format: Literal["pcm"] = "pcm", voice_config: GoogleVoiceConfig | None = None, custom_pronunciations: list[dict] | dict | None = None, vertexai: bool = False, vertexai_config: VertexAIConfig | None = None, streaming: bool = True, ) -> None: """Initialize the Google TTS plugin. Args: api_key (Optional[str], optional): Google API key. Defaults to None. speed (float): The speed to use for the TTS plugin. Defaults to 1.0. pitch (float): The pitch to use for the TTS plugin. Defaults to 0.0. response_format (Literal["pcm"]): The response format to use for the TTS plugin. Defaults to "pcm". voice_config (GoogleVoiceConfig | None): The voice configuration to use for the TTS plugin. Defaults to None. custom_pronunciations: IPA pronunciation overrides, e.g. [{"tomato": "təˈmeɪtoʊ"}]. vertexai: Use Vertex AI TTS endpoint with ADC authentication. vertexai_config: Project / location settings for Vertex AI. streaming: Use gRPC StreamingSynthesize for lower latency. Compatible with vertexai=True — routes over gRPC to the regional endpoint. Requires: pip install google-cloud-texttospeech """ super().__init__(sample_rate=GOOGLE_SAMPLE_RATE, num_channels=GOOGLE_CHANNELS) try: from google.cloud import texttospeech_v1 except ImportError as exc: raise ImportError( "google-cloud-texttospeech is required. " "Install it with: pip install google-cloud-texttospeech" ) from exc self._tts = texttospeech_v1 self.speed = speed self.pitch = pitch self.response_format = response_format self.audio_track = None self.loop = None self._first_chunk_sent = False self.voice_config = voice_config or GoogleVoiceConfig() self.custom_pronunciations = custom_pronunciations self.vertexai = vertexai self.vertexai_config = vertexai_config or VertexAIConfig() self.streaming = streaming if self.streaming and self.vertexai: raise ValueError("Streaming and vertexai cannot be used together.") resolved_voice = (voice_config or GoogleVoiceConfig()).name if streaming and not self._is_chirp3_hd_voice(resolved_voice): raise ValueError( f"Streaming synthesis only supports Chirp 3 HD voices " f"(e.g. 'en-US-Chirp3-HD-Aoede'). " f"Got: '{resolved_voice}'. " f"See https://cloud.google.com/text-to-speech/docs/chirp3-hd for available voices." ) self._client = self._build_client(api_key) @staticmethod def _is_chirp3_hd_voice(name: str) -> bool: return "chirp3-hd" in name.lower() def _build_client(self, api_key: str | None) -> Any: """Construct a TextToSpeechAsyncClient.""" from google.api_core.client_options import ClientOptions if self.vertexai: project_id = ( self.vertexai_config.project_id or os.getenv("GOOGLE_CLOUD_PROJECT") or os.getenv("GCLOUD_PROJECT") ) if project_id is None: service_account_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") if service_account_path: try: from google.oauth2 import service_account creds = service_account.Credentials.from_service_account_file( service_account_path ) project_id = creds.project_id except Exception: pass if project_id is None: raise ValueError( "Vertex AI TTS requires a GCP project ID. Provide one of:\n" "1. vertexai_config=VertexAIConfig(project_id='my-project')\n" "2. GOOGLE_CLOUD_PROJECT environment variable\n" "3. GOOGLE_APPLICATION_CREDENTIALS pointing to a service-account file" ) location = ( self.vertexai_config.location or os.getenv("GOOGLE_CLOUD_LOCATION") or "us-central1" ) self.vertexai_config.project_id = project_id self.vertexai_config.location = location return self._tts.TextToSpeechAsyncClient( client_options=ClientOptions( api_endpoint=f"{location}-texttospeech.googleapis.com" ) ) else: resolved_key = api_key or os.getenv("GOOGLE_API_KEY") if not resolved_key: raise ValueError( "Google TTS API key required. Provide either:\n" "1. api_key parameter, OR\n" "2. GOOGLE_API_KEY environment variable" ) return self._tts.TextToSpeechAsyncClient( client_options=ClientOptions(api_key=resolved_key) ) def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False async def synthesize( self, text: AsyncIterator[str] | str, **kwargs: Any, ) -> None: try: if self.streaming: await self._synthesize_streaming(text) elif isinstance(text, AsyncIterator): async for segment in segment_text(text): await self._synthesize_audio(segment) else: await self._synthesize_audio(text) if not self.audio_track or not self.loop: self.emit("error", "Audio track or loop not initialized") return except Exception as e: self.emit("error", f"Google TTS synthesis failed: {str(e)}") raise async def _synthesize_audio(self, text: str) -> None: """Single-request synthesis via SynthesizeSpeech.""" tts = self._tts if self.custom_pronunciations: synthesis_input = tts.SynthesisInput( text=text, custom_pronunciations=self._build_custom_pronunciations_proto(), ) else: synthesis_input = tts.SynthesisInput(text=text) is_studio = self.voice_config.name.startswith("en-US-Studio") voice_params = tts.VoiceSelectionParams( language_code=self.voice_config.languageCode, name=self.voice_config.name, ) if not is_studio: voice_params.ssml_gender = tts.SsmlVoiceGender[self.voice_config.ssmlGender] response = await self._client.synthesize_speech( input=synthesis_input, voice=voice_params, audio_config=tts.AudioConfig( audio_encoding=tts.AudioEncoding.LINEAR16, speaking_rate=self.speed, pitch=self.pitch, sample_rate_hertz=GOOGLE_SAMPLE_RATE, ), ) if not response.audio_content: self.emit("error", "No audio content received from Google TTS") return await self._stream_audio_chunks(response.audio_content) async def _synthesize_streaming(self, text: AsyncIterator[str] | str) -> None: """Bidirectional gRPC streaming via StreamingSynthesize.""" tts = self._tts streaming_config_kwargs: dict = dict( voice=tts.VoiceSelectionParams( language_code=self.voice_config.languageCode, name=self.voice_config.name, ), streaming_audio_config=tts.StreamingAudioConfig( audio_encoding=tts.AudioEncoding.PCM, sample_rate_hertz=GOOGLE_SAMPLE_RATE, speaking_rate=self.speed, ), ) if self.custom_pronunciations: streaming_config_kwargs["custom_pronunciations"] = ( self._build_custom_pronunciations_proto() ) streaming_config = tts.StreamingSynthesizeConfig(**streaming_config_kwargs) async def request_generator() -> AsyncIterator[Any]: yield tts.StreamingSynthesizeRequest(streaming_config=streaming_config) if isinstance(text, str): yield tts.StreamingSynthesizeRequest( input=tts.StreamingSynthesisInput(text=text) ) else: async for chunk in text: if chunk: yield tts.StreamingSynthesizeRequest( input=tts.StreamingSynthesisInput(text=chunk) ) try: async for response in await self._client.streaming_synthesize( request_generator() ): if response.audio_content: await self._stream_audio_chunks(response.audio_content, has_wav_header=False) except Exception as e: self.emit("error", f"Google TTS streaming error: {str(e)}") raise def _build_custom_pronunciations_proto(self) -> Any: """Convert self.custom_pronunciations to a CustomPronunciations proto.""" tts = self._tts params = [] try: from google.cloud.texttospeech_v1.types import CustomPronunciationParams as _CPP PE = _CPP.PhoneticEncoding ENCODING_MAP = { "ipa": PE.PHONETIC_ENCODING_IPA, "x-sampa": PE.PHONETIC_ENCODING_X_SAMPA, } except (ImportError, AttributeError): ENCODING_MAP = {"ipa": 1, "x-sampa": 2} if not self.custom_pronunciations: return tts.CustomPronunciations(pronunciations=[]) raw = self.custom_pronunciations entries: list[tuple[str, str, Any]] = [] if isinstance(raw, dict): for phrase, pronunciation in raw.items(): entries.append((phrase, pronunciation, ENCODING_MAP["ipa"])) else: for item in raw: if not isinstance(item, dict): continue if "phrase" in item and "pronunciation" in item: enc_key = item.get("encoding", "ipa").lower() enc = ENCODING_MAP.get(enc_key, ENCODING_MAP["ipa"]) if enc_key not in ENCODING_MAP: logger.warning( f"Unknown encoding '{enc_key}' for phrase '{item['phrase']}'. " f"Supported: {list(ENCODING_MAP.keys())}. Falling back to IPA.", UserWarning, stacklevel=3, ) entries.append((item["phrase"], item["pronunciation"], enc)) else: for phrase, pronunciation in item.items(): entries.append((phrase, pronunciation, ENCODING_MAP["ipa"])) if self.voice_config.languageCode.lower() != "en-us": logger.warning( f"custom_pronunciations is only supported for en-US. " f"Got '{self.voice_config.languageCode}' — pronunciations will be ignored.", UserWarning, stacklevel=3, ) for phrase, pronunciation, encoding in entries: if not phrase or not pronunciation: continue try: params.append( tts.CustomPronunciationParams( phrase=phrase, pronunciation=pronunciation, phonetic_encoding=encoding, ) ) except Exception as e: logger.warning( f"Skipping custom pronunciation for '{phrase}': {e}", UserWarning, stacklevel=3, ) if not params: logger.warning( "custom_pronunciations was set but no valid entries were built. " "Check your phrase/pronunciation format.", UserWarning, stacklevel=3, ) return tts.CustomPronunciations(pronunciations=params) async def _stream_audio_chunks( self, audio_bytes: bytes, has_wav_header: bool = True ) -> None: """Chunk raw PCM and forward to the audio track.""" chunk_size = 960 audio_data = self._remove_wav_header(audio_bytes) if has_wav_header else audio_bytes for i in range(0, len(audio_data), chunk_size): chunk = audio_data[i:i + chunk_size] if len(chunk) < chunk_size and len(chunk) > 0: padding_needed = chunk_size - len(chunk) chunk += b'\x00' * padding_needed if len(chunk) == chunk_size: if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() asyncio.create_task(self.audio_track.add_new_bytes(chunk)) await asyncio.sleep(0.001) def _remove_wav_header(self, audio_bytes: bytes) -> bytes: """Remove WAV header if present to get raw PCM data""" if audio_bytes.startswith(b"RIFF"): data_pos = audio_bytes.find(b"data") if data_pos != -1: return audio_bytes[data_pos + 8:] return audio_bytes async def aclose(self) -> None: if self._client: await self._client.transport.close() await super().aclose() async def interrupt(self) -> None: if self.audio_track: self.audio_track.interrupt()Base class for Text-to-Speech implementations
Initialize the Google TTS plugin.
Args
api_key:Optional[str], optional- Google API key. Defaults to None.
speed:float- The speed to use for the TTS plugin. Defaults to 1.0.
pitch:float- The pitch to use for the TTS plugin. Defaults to 0.0.
- response_format (Literal["pcm"]): The response format to use for the TTS plugin. Defaults to "pcm".
voice_config:GoogleVoiceConfig | None- The voice configuration to use for the TTS plugin. Defaults to None.
custom_pronunciations- IPA pronunciation overrides, e.g. [{"tomato": "təˈmeɪtoʊ"}].
vertexai- Use Vertex AI TTS endpoint with ADC authentication.
vertexai_config- Project / location settings for Vertex AI.
streaming- Use gRPC StreamingSynthesize for lower latency. Compatible with vertexai=True — routes over gRPC to the regional endpoint.
Requires: pip install google-cloud-texttospeech
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if self._client: await self._client.transport.close() await super().aclose()Cleanup resources
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: if self.audio_track: self.audio_track.interrupt()Interrupt the TTS process
def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = FalseReset the first audio tracking state for next TTS task
async def synthesize(self, text: AsyncIterator[str] | str, **kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[str] | str, **kwargs: Any, ) -> None: try: if self.streaming: await self._synthesize_streaming(text) elif isinstance(text, AsyncIterator): async for segment in segment_text(text): await self._synthesize_audio(segment) else: await self._synthesize_audio(text) if not self.audio_track or not self.loop: self.emit("error", "Audio track or loop not initialized") return except Exception as e: self.emit("error", f"Google TTS synthesis failed: {str(e)}") raiseConvert text to speech
Args
text- Text to convert to speech (either string or async iterator of strings)
voice_id- Optional voice identifier
**kwargs- Additional provider-specific arguments
Returns
None
class GoogleVoiceConfig (languageCode: str = 'en-US',
name: str = 'en-US-Chirp3-HD-Charon',
ssmlGender: str = 'MALE')-
Expand source code
@dataclass class GoogleVoiceConfig: languageCode: str = "en-US" name: str = "en-US-Chirp3-HD-Charon" ssmlGender: str = "MALE"GoogleVoiceConfig(languageCode: 'str' = 'en-US', name: 'str' = 'en-US-Chirp3-HD-Charon', ssmlGender: 'str' = 'MALE')
Instance variables
var languageCode : strvar name : strvar ssmlGender : str
class VertexAIConfig (project_id: str | None = None, location: str | None = None)-
Expand source code
@dataclass class VertexAIConfig: project_id: str| None = None location: str| None = NoneVertexAIConfig(project_id: 'str | None' = None, location: 'str | None' = None)
Instance variables
var location : str | Nonevar project_id : str | None
class VoiceActivityConfig (speech_start_timeout: float = 1.0, speech_end_timeout: float = 5.0)-
Expand source code
@dataclass class VoiceActivityConfig: speech_start_timeout:float = 1.0 speech_end_timeout:float = 5.0VoiceActivityConfig(speech_start_timeout: 'float' = 1.0, speech_end_timeout: 'float' = 5.0)
Instance variables
var speech_end_timeout : floatvar speech_start_timeout : float