Package videosdk.plugins.google
Sub-modules
videosdk.plugins.google.live_api
videosdk.plugins.google.llm
videosdk.plugins.google.stt
videosdk.plugins.google.tts
Classes
class GeminiLiveConfig (voice: Voice | None = 'Puck',
language_code: str | None = 'en-US',
temperature: float | None = None,
top_p: float | None = None,
top_k: float | None = None,
candidate_count: int | None = 1,
max_output_tokens: int | None = None,
presence_penalty: float | None = None,
frequency_penalty: float | None = None,
response_modalities: List[Modality] | None = <factory>,
input_audio_transcription: AudioTranscriptionConfig | None = <factory>,
output_audio_transcription: AudioTranscriptionConfig | None = <factory>)-
Expand source code
@dataclass class GeminiLiveConfig: """Configuration for the Gemini Live API Args: voice: Voice ID for audio output. Options: 'Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'. Defaults to 'Puck' language_code: Language code for speech synthesis. Defaults to 'en-US' temperature: Controls randomness in response generation. Higher values (e.g. 0.8) make output more random, lower values (e.g. 0.2) make it more focused. Defaults to None top_p: Nucleus sampling parameter. Controls diversity via cumulative probability cutoff. Range 0-1. Defaults to None top_k: Limits the number of tokens considered for each step of text generation. Defaults to None candidate_count: Number of response candidates to generate. Defaults to 1 max_output_tokens: Maximum number of tokens allowed in model responses. Defaults to None presence_penalty: Penalizes tokens based on their presence in the text so far. Range -2.0 to 2.0. Defaults to None frequency_penalty: Penalizes tokens based on their frequency in the text so far. Range -2.0 to 2.0. Defaults to None response_modalities: List of enabled response types. Options: ["TEXT", "AUDIO"]. Defaults to both input_audio_transcription: Configuration for audio transcription features. Defaults to None output_audio_transcription: Configuration for audio transcription features. Defaults to None """ voice: Voice | None = "Puck" language_code: str | None = "en-US" temperature: float | None = None top_p: float | None = None top_k: float | None = None candidate_count: int | None = 1 max_output_tokens: int | None = None presence_penalty: float | None = None frequency_penalty: float | None = None response_modalities: List[Modality] | None = field( default_factory=lambda: ["TEXT", "AUDIO"] ) input_audio_transcription: AudioTranscriptionConfig | None = field( default_factory=dict ) output_audio_transcription: AudioTranscriptionConfig | None = field( default_factory=dict )
Configuration for the Gemini Live API
Args
voice
- Voice ID for audio output. Options: 'Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'. Defaults to 'Puck'
language_code
- Language code for speech synthesis. Defaults to 'en-US'
temperature
- Controls randomness in response generation. Higher values (e.g. 0.8) make output more random, lower values (e.g. 0.2) make it more focused. Defaults to None
top_p
- Nucleus sampling parameter. Controls diversity via cumulative probability cutoff. Range 0-1. Defaults to None
top_k
- Limits the number of tokens considered for each step of text generation. Defaults to None
candidate_count
- Number of response candidates to generate. Defaults to 1
max_output_tokens
- Maximum number of tokens allowed in model responses. Defaults to None
presence_penalty
- Penalizes tokens based on their presence in the text so far. Range -2.0 to 2.0. Defaults to None
frequency_penalty
- Penalizes tokens based on their frequency in the text so far. Range -2.0 to 2.0. Defaults to None
response_modalities
- List of enabled response types. Options: ["TEXT", "AUDIO"]. Defaults to both
input_audio_transcription
- Configuration for audio transcription features. Defaults to None
output_audio_transcription
- Configuration for audio transcription features. Defaults to None
Instance variables
var candidate_count : int | None
var frequency_penalty : float | None
var input_audio_transcription : google.genai.types.AudioTranscriptionConfig | None
var language_code : str | None
var max_output_tokens : int | None
var output_audio_transcription : google.genai.types.AudioTranscriptionConfig | None
var presence_penalty : float | None
var response_modalities : List[google.genai.types.Modality] | None
var temperature : float | None
var top_k : float | None
var top_p : float | None
var voice : Literal['Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'] | None
class GeminiRealtime (*,
api_key: str | None = None,
model: str,
config: GeminiLiveConfig | None = None,
service_account_path: str | None = None)-
Expand source code
class GeminiRealtime(RealtimeBaseModel[GeminiEventTypes]): """Gemini's realtime model for audio-only communication""" def __init__( self, *, api_key: str | None = None, model: str, config: GeminiLiveConfig | None = None, service_account_path: str | None = None, ) -> None: """ Initialize Gemini realtime model. Args: api_key: Gemini API key. If not provided, will attempt to read from GOOGLE_API_KEY env var service_account_path: Path to Google service account JSON file. model: The Gemini model identifier to use (e.g. 'gemini-pro', 'gemini-pro-vision'). config: Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID for audio output ('Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'). Defaults to 'Puck' - language_code: Language code for speech synthesis. Defaults to 'en-US' - temperature: Controls randomness in responses. Higher values (0.8) more random, lower (0.2) more focused - top_p: Nucleus sampling parameter. Controls diversity via probability cutoff. Range 0-1 - top_k: Limits number of tokens considered for each generation step - candidate_count: Number of response candidates to generate. Defaults to 1 - max_output_tokens: Maximum tokens allowed in model responses - presence_penalty: Penalizes token presence in text. Range -2.0 to 2.0 - frequency_penalty: Penalizes token frequency in text. Range -2.0 to 2.0 - response_modalities: List of enabled response types ["TEXT", "AUDIO"]. Defaults to both - output_audio_transcription: Configuration for audio transcription features Raises: ValueError: If neither api_key nor service_account_path is provided and no GOOGLE_API_KEY in env vars """ super().__init__() self.model = model self._init_client(api_key, service_account_path) self._session: Optional[GeminiSession] = None self._closing = False self._session_should_close = asyncio.Event() self._main_task = None self.loop = None self.audio_track = None self._buffered_audio = bytearray() self._is_speaking = False self._last_audio_time = 0.0 self._audio_processing_task = None self.tools = [] self._instructions: str = ( "You are a helpful voice assistant that can answer questions and help with tasks." ) self.config: GeminiLiveConfig = config or GeminiLiveConfig() self.target_sample_rate = 24000 self.input_sample_rate = 48000 self._user_speaking = False self._agent_speaking = False def set_agent(self, agent: Agent) -> None: self._instructions = agent.instructions self.tools = agent.tools self.tools_formatted = self._convert_tools_to_gemini_format(self.tools) self.formatted_tools = self.tools_formatted def _init_client(self, api_key: str | None, service_account_path: str | None): if service_account_path: os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_path self.client = genai.Client(http_options={"api_version": "v1beta"}) else: self.api_key = api_key or os.getenv("GOOGLE_API_KEY") if not self.api_key: self.emit("error", "GOOGLE_API_KEY or service account required") raise ValueError("GOOGLE_API_KEY or service account required") self.client = genai.Client( api_key=self.api_key, http_options={"api_version": "v1beta"} ) async def connect(self) -> None: """Connect to the Gemini Live API""" if self._session: await self._cleanup_session(self._session) self._session = None self._closing = False self._session_should_close.clear() try: if ( not self.audio_track and self.loop and "AUDIO" in self.config.response_modalities ): self.audio_track = CustomAudioStreamTrack(self.loop) elif not self.loop and "AUDIO" in self.config.response_modalities: self.emit( "error", "Event loop not initialized. Audio playback will not work." ) raise RuntimeError( "Event loop not initialized. Audio playback will not work." ) try: initial_session = await self._create_session() if initial_session: self._session = initial_session except Exception as e: self.emit("error", f"Initial session creation failed, will retry: {e}") if not self._main_task or self._main_task.done(): self._main_task = asyncio.create_task( self._session_loop(), name="gemini-main-loop" ) except Exception as e: self.emit("error", f"Error connecting to Gemini Live API: {e}") traceback.print_exc() raise async def _create_session(self) -> GeminiSession: """Create a new Gemini Live API session""" config = LiveConnectConfig( response_modalities=self.config.response_modalities, generation_config=GenerationConfig( candidate_count=( self.config.candidate_count if self.config.candidate_count is not None else None ), temperature=( self.config.temperature if self.config.temperature is not None else None ), top_p=self.config.top_p if self.config.top_p is not None else None, top_k=self.config.top_k if self.config.top_k is not None else None, max_output_tokens=( self.config.max_output_tokens if self.config.max_output_tokens is not None else None ), presence_penalty=( self.config.presence_penalty if self.config.presence_penalty is not None else None ), frequency_penalty=( self.config.frequency_penalty if self.config.frequency_penalty is not None else None ), ), system_instruction=self._instructions, speech_config=SpeechConfig( voice_config=VoiceConfig( prebuilt_voice_config=PrebuiltVoiceConfig( voice_name=self.config.voice ) ), language_code=self.config.language_code, ), tools=self.formatted_tools or None, input_audio_transcription=self.config.input_audio_transcription, output_audio_transcription=self.config.output_audio_transcription, ) try: session_cm = self.client.aio.live.connect(model=self.model, config=config) session = await session_cm.__aenter__() return GeminiSession(session=session, session_cm=session_cm, tasks=[]) except Exception as e: self.emit("error", f"Connection error: {e}") traceback.print_exc() raise async def _session_loop(self) -> None: """Main processing loop for Gemini sessions""" reconnect_attempts = 0 max_reconnect_attempts = 5 reconnect_delay = 1 while not self._closing: if not self._session: try: self._session = await self._create_session() reconnect_attempts = 0 reconnect_delay = 1 except Exception as e: reconnect_attempts += 1 reconnect_delay = min(30, reconnect_delay * 2) self.emit( "error", f"session creation attempt {reconnect_attempts} failed: {e}", ) if reconnect_attempts >= max_reconnect_attempts: self.emit("error", "Max reconnection attempts reached") break await asyncio.sleep(reconnect_delay) continue session = self._session recv_task = asyncio.create_task( self._receive_loop(session), name="gemini_receive" ) keep_alive_task = asyncio.create_task( self._keep_alive(session), name="gemini_keepalive" ) session.tasks.extend([recv_task, keep_alive_task]) try: await self._session_should_close.wait() finally: for task in session.tasks: if not task.done(): task.cancel() try: await asyncio.gather(*session.tasks, return_exceptions=True) except Exception as e: self.emit("error", f"Error during task cleanup: {e}") if not self._closing: await self._cleanup_session(session) self._session = None await asyncio.sleep(reconnect_delay) self._session_should_close.clear() async def _handle_tool_calls( self, response, active_response_id: str, accumulated_input_text: str ) -> str: """Handle tool calls from Gemini""" if not response.tool_call: return accumulated_input_text for tool_call in response.tool_call.function_calls: if self.tools: for tool in self.tools: if not is_function_tool(tool): continue tool_info = get_tool_info(tool) if tool_info.name == tool_call.name: if accumulated_input_text: await realtime_metrics_collector.set_user_transcript( accumulated_input_text ) accumulated_input_text = "" try: await realtime_metrics_collector.add_tool_call( tool_info.name ) result = await tool(**tool_call.args) await self.send_tool_response( [ FunctionResponse( id=tool_call.id, name=tool_call.name, response=result, ) ] ) except Exception as e: self.emit( "error", f"Error executing function {tool_call.name}: {e}", ) traceback.print_exc() break return accumulated_input_text async def _receive_loop(self, session: GeminiSession) -> None: """Process incoming messages from Gemini""" try: active_response_id = None chunk_number = 0 accumulated_text = "" final_transcription = "" accumulated_input_text = "" while not self._closing: try: async for response in session.session.receive(): if self._closing: break if response.tool_call: accumulated_input_text = await self._handle_tool_calls( response, active_response_id, accumulated_input_text ) if server_content := response.server_content: if ( input_transcription := server_content.input_transcription ): if input_transcription.text: if not self._user_speaking: await realtime_metrics_collector.set_user_speech_start() self._user_speaking = True self.emit("user_speech_started", {"type": "done"}) accumulated_input_text += input_transcription.text global_event_emitter.emit( "input_transcription", { "text": accumulated_input_text, "is_final": False, }, ) if ( output_transcription := server_content.output_transcription ): if output_transcription.text: final_transcription += output_transcription.text global_event_emitter.emit( "output_transcription", { "text": final_transcription, "is_final": False, }, ) if not active_response_id: active_response_id = f"response_{id(response)}" chunk_number = 0 accumulated_text = "" if server_content.interrupted: if active_response_id: active_response_id = None accumulated_text = "" if ( self.audio_track and "AUDIO" in self.config.response_modalities ): self.audio_track.interrupt() continue if model_turn := server_content.model_turn: if self._user_speaking: await realtime_metrics_collector.set_user_speech_end() self._user_speaking = False if accumulated_input_text: await realtime_metrics_collector.set_user_transcript( accumulated_input_text ) try: self.emit( "realtime_model_transcription", { "role": "user", "text": accumulated_input_text, "is_final": True, }, ) except Exception: pass accumulated_input_text = "" for part in model_turn.parts: if ( hasattr(part, "inline_data") and part.inline_data ): raw_audio = part.inline_data.data if not raw_audio or len(raw_audio) < 2: continue if "AUDIO" in self.config.response_modalities: chunk_number += 1 if not self._agent_speaking: await realtime_metrics_collector.set_agent_speech_start() self._agent_speaking = True if self.audio_track and self.loop: if len(raw_audio) % 2 != 0: raw_audio += b"\x00" asyncio.create_task( self.audio_track.add_new_bytes( raw_audio ), name=f"audio_chunk_{chunk_number}", ) elif hasattr(part, "text") and part.text: accumulated_text += part.text if server_content.turn_complete and active_response_id: if accumulated_input_text: await realtime_metrics_collector.set_user_transcript( accumulated_input_text ) accumulated_input_text = "" if final_transcription: await realtime_metrics_collector.set_agent_response( final_transcription ) try: self.emit( "realtime_model_transcription", { "role": "agent", "text": final_transcription, "is_final": True, }, ) except Exception: pass if ( "TEXT" in self.config.response_modalities and accumulated_text ): global_event_emitter.emit( "text_response", {"type": "done", "text": accumulated_text}, ) elif ( "TEXT" not in self.config.response_modalities and final_transcription ): global_event_emitter.emit( "text_response", {"type": "done", "text": final_transcription}, ) active_response_id = None accumulated_text = "" final_transcription = "" await realtime_metrics_collector.set_agent_speech_end( timeout=1.0 ) self._agent_speaking = False except Exception as e: if "1000 (OK)" in str(e): logger.info("Normal WebSocket closure") else: logger.error(f"Error in receive loop: {e}") traceback.print_exc() self._session_should_close.set() break await asyncio.sleep(0.1) except asyncio.CancelledError: self.emit("error", "Receive loop cancelled") except Exception as e: self.emit("error", e) traceback.print_exc() self._session_should_close.set() async def _keep_alive(self, session: GeminiSession) -> None: """Send periodic keep-alive messages""" try: while not self._closing: await asyncio.sleep(10) if self._closing: break try: await session.session.send_client_content( turns=Content(parts=[Part(text=".")], role="user"), turn_complete=False, ) except Exception as e: if "closed" in str(e).lower(): self._session_should_close.set() break self.emit("error", f"Keep-alive error: {e}") except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error in keep-alive: {e}") self._session_should_close.set() async def handle_audio_input(self, audio_data: bytes) -> None: """Handle incoming audio data from the user""" if not self._session or self._closing: return if "AUDIO" not in self.config.response_modalities: return audio_data = np.frombuffer(audio_data, dtype=np.int16) audio_data = signal.resample( audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate), ) audio_data = audio_data.astype(np.int16).tobytes() await self._session.session.send_realtime_input( audio=Blob(data=audio_data, mime_type=f"audio/pcm;rate={AUDIO_SAMPLE_RATE}") ) async def handle_video_input(self, video_data: av.VideoFrame) -> None: """Improved video input handler with error prevention""" if not self._session or self._closing: return try: if not video_data or not video_data.planes: return now = time.monotonic() if ( hasattr(self, "_last_video_frame") and (now - self._last_video_frame) < 0.5 ): return self._last_video_frame = now processed_jpeg = encode_image(video_data, DEFAULT_IMAGE_ENCODE_OPTIONS) if not processed_jpeg or len(processed_jpeg) < 100: logger.warning("Invalid JPEG data generated") return await self._session.session.send_realtime_input( video=Blob(data=processed_jpeg, mime_type="image/jpeg") ) except Exception as e: self.emit("error", f"Video processing error: {str(e)}") async def interrupt(self) -> None: """Interrupt current response""" if not self._session or self._closing: return try: await self._session.session.send_client_content( turns=Content(parts=[Part(text="stop")], role="user"), turn_complete=True, ) await realtime_metrics_collector.set_interrupted() if self.audio_track and "AUDIO" in self.config.response_modalities: self.audio_track.interrupt() except Exception as e: self.emit("error", f"Interrupt error: {e}") async def send_message(self, message: str) -> None: """Send a text message to get audio response""" retry_count = 0 max_retries = 5 while not self._session or not self._session.session: if retry_count >= max_retries: raise RuntimeError("No active Gemini session after maximum retries") logger.debug("No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: await self._session.session.send_client_content( turns=[ Content( parts=[ Part( text="Repeat the user's exact message back to them [DO NOT ADD ANYTHING ELSE]:" + message ) ], role="model", ), Content(parts=[Part(text=".")], role="user"), ], turn_complete=True, ) await asyncio.sleep(0.1) except Exception as e: self.emit("error", f"Error sending message: {e}") self._session_should_close.set() async def send_text_message(self, message: str) -> None: """Send a text message for text-only communication""" retry_count = 0 max_retries = 5 while not self._session or not self._session.session: if retry_count >= max_retries: raise RuntimeError("No active Gemini session after maximum retries") self.emit("error", "No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: await self._session.session.send_client_content( turns=Content(parts=[Part(text=message)], role="user"), turn_complete=True, ) except Exception as e: self.emit("error", f"Error sending text message: {e}") self._session_should_close.set() async def _cleanup_session(self, session: GeminiSession) -> None: """Clean up a session's resources""" for task in session.tasks: if not task.done(): task.cancel() try: await session.session_cm.__aexit__(None, None, None) except Exception as e: self.emit("error", f"Error closing session: {e}") async def aclose(self) -> None: """Clean up all resources""" if self._closing: return self._closing = True self._session_should_close.set() if self._audio_processing_task and not self._audio_processing_task.done(): self._audio_processing_task.cancel() try: await asyncio.wait_for(self._audio_processing_task, timeout=1.0) except (asyncio.CancelledError, asyncio.TimeoutError): pass if self._main_task and not self._main_task.done(): self._main_task.cancel() try: await asyncio.wait_for(self._main_task, timeout=2.0) except (asyncio.CancelledError, asyncio.TimeoutError): pass if self._session: await self._cleanup_session(self._session) self._session = None if hasattr(self.audio_track, "cleanup") and self.audio_track: try: await self.audio_track.cleanup() except Exception as e: self.emit("error", f"Error cleaning up audio track: {e}") self._buffered_audio = bytearray() async def _reconnect(self) -> None: if self._session: await self._cleanup_session(self._session) self._session = None self._session = await self._create_session() async def send_tool_response( self, function_responses: List[FunctionResponse] ) -> None: """Send tool responses back to Gemini""" if not self._session or not self._session.session: return try: await self._session.session.send_tool_response( function_responses=function_responses ) except Exception as e: self.emit("error", f"Error sending tool response: {e}") self._session_should_close.set() def _convert_tools_to_gemini_format(self, tools: List[FunctionTool]) -> List[Tool]: """Convert tool definitions to Gemini's Tool format""" function_declarations = [] for tool in tools: if not is_function_tool(tool): continue try: function_declaration = build_gemini_schema(tool) function_declarations.append(function_declaration) except Exception as e: self.emit("error", f"Failed to format tool {tool}: {e}") continue return ( [Tool(function_declarations=function_declarations)] if function_declarations else [] )
Gemini's realtime model for audio-only communication
Initialize Gemini realtime model.
Args
api_key
- Gemini API key. If not provided, will attempt to read from GOOGLE_API_KEY env var
service_account_path
- Path to Google service account JSON file.
model
- The Gemini model identifier to use (e.g. 'gemini-pro', 'gemini-pro-vision').
config
- Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID for audio output ('Puck', 'Charon', 'Kore', 'Fenrir', 'Aoede'). Defaults to 'Puck' - language_code: Language code for speech synthesis. Defaults to 'en-US' - temperature: Controls randomness in responses. Higher values (0.8) more random, lower (0.2) more focused - top_p: Nucleus sampling parameter. Controls diversity via probability cutoff. Range 0-1 - top_k: Limits number of tokens considered for each generation step - candidate_count: Number of response candidates to generate. Defaults to 1 - max_output_tokens: Maximum tokens allowed in model responses - presence_penalty: Penalizes token presence in text. Range -2.0 to 2.0 - frequency_penalty: Penalizes token frequency in text. Range -2.0 to 2.0 - response_modalities: List of enabled response types ["TEXT", "AUDIO"]. Defaults to both - output_audio_transcription: Configuration for audio transcription features
Raises
ValueError
- If neither api_key nor service_account_path is provided and no GOOGLE_API_KEY in env vars
Ancestors
- videosdk.agents.realtime_base_model.RealtimeBaseModel
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
- abc.ABC
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Clean up all resources""" if self._closing: return self._closing = True self._session_should_close.set() if self._audio_processing_task and not self._audio_processing_task.done(): self._audio_processing_task.cancel() try: await asyncio.wait_for(self._audio_processing_task, timeout=1.0) except (asyncio.CancelledError, asyncio.TimeoutError): pass if self._main_task and not self._main_task.done(): self._main_task.cancel() try: await asyncio.wait_for(self._main_task, timeout=2.0) except (asyncio.CancelledError, asyncio.TimeoutError): pass if self._session: await self._cleanup_session(self._session) self._session = None if hasattr(self.audio_track, "cleanup") and self.audio_track: try: await self.audio_track.cleanup() except Exception as e: self.emit("error", f"Error cleaning up audio track: {e}") self._buffered_audio = bytearray()
Clean up all resources
async def connect(self) ‑> None
-
Expand source code
async def connect(self) -> None: """Connect to the Gemini Live API""" if self._session: await self._cleanup_session(self._session) self._session = None self._closing = False self._session_should_close.clear() try: if ( not self.audio_track and self.loop and "AUDIO" in self.config.response_modalities ): self.audio_track = CustomAudioStreamTrack(self.loop) elif not self.loop and "AUDIO" in self.config.response_modalities: self.emit( "error", "Event loop not initialized. Audio playback will not work." ) raise RuntimeError( "Event loop not initialized. Audio playback will not work." ) try: initial_session = await self._create_session() if initial_session: self._session = initial_session except Exception as e: self.emit("error", f"Initial session creation failed, will retry: {e}") if not self._main_task or self._main_task.done(): self._main_task = asyncio.create_task( self._session_loop(), name="gemini-main-loop" ) except Exception as e: self.emit("error", f"Error connecting to Gemini Live API: {e}") traceback.print_exc() raise
Connect to the Gemini Live API
async def handle_audio_input(self, audio_data: bytes) ‑> None
-
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None: """Handle incoming audio data from the user""" if not self._session or self._closing: return if "AUDIO" not in self.config.response_modalities: return audio_data = np.frombuffer(audio_data, dtype=np.int16) audio_data = signal.resample( audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate), ) audio_data = audio_data.astype(np.int16).tobytes() await self._session.session.send_realtime_input( audio=Blob(data=audio_data, mime_type=f"audio/pcm;rate={AUDIO_SAMPLE_RATE}") )
Handle incoming audio data from the user
async def handle_video_input(self, video_data: av.VideoFrame) ‑> None
-
Expand source code
async def handle_video_input(self, video_data: av.VideoFrame) -> None: """Improved video input handler with error prevention""" if not self._session or self._closing: return try: if not video_data or not video_data.planes: return now = time.monotonic() if ( hasattr(self, "_last_video_frame") and (now - self._last_video_frame) < 0.5 ): return self._last_video_frame = now processed_jpeg = encode_image(video_data, DEFAULT_IMAGE_ENCODE_OPTIONS) if not processed_jpeg or len(processed_jpeg) < 100: logger.warning("Invalid JPEG data generated") return await self._session.session.send_realtime_input( video=Blob(data=processed_jpeg, mime_type="image/jpeg") ) except Exception as e: self.emit("error", f"Video processing error: {str(e)}")
Improved video input handler with error prevention
async def interrupt(self) ‑> None
-
Expand source code
async def interrupt(self) -> None: """Interrupt current response""" if not self._session or self._closing: return try: await self._session.session.send_client_content( turns=Content(parts=[Part(text="stop")], role="user"), turn_complete=True, ) await realtime_metrics_collector.set_interrupted() if self.audio_track and "AUDIO" in self.config.response_modalities: self.audio_track.interrupt() except Exception as e: self.emit("error", f"Interrupt error: {e}")
Interrupt current response
async def send_message(self, message: str) ‑> None
-
Expand source code
async def send_message(self, message: str) -> None: """Send a text message to get audio response""" retry_count = 0 max_retries = 5 while not self._session or not self._session.session: if retry_count >= max_retries: raise RuntimeError("No active Gemini session after maximum retries") logger.debug("No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: await self._session.session.send_client_content( turns=[ Content( parts=[ Part( text="Repeat the user's exact message back to them [DO NOT ADD ANYTHING ELSE]:" + message ) ], role="model", ), Content(parts=[Part(text=".")], role="user"), ], turn_complete=True, ) await asyncio.sleep(0.1) except Exception as e: self.emit("error", f"Error sending message: {e}") self._session_should_close.set()
Send a text message to get audio response
async def send_text_message(self, message: str) ‑> None
-
Expand source code
async def send_text_message(self, message: str) -> None: """Send a text message for text-only communication""" retry_count = 0 max_retries = 5 while not self._session or not self._session.session: if retry_count >= max_retries: raise RuntimeError("No active Gemini session after maximum retries") self.emit("error", "No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: await self._session.session.send_client_content( turns=Content(parts=[Part(text=message)], role="user"), turn_complete=True, ) except Exception as e: self.emit("error", f"Error sending text message: {e}") self._session_should_close.set()
Send a text message for text-only communication
async def send_tool_response(self, function_responses: List[FunctionResponse]) ‑> None
-
Expand source code
async def send_tool_response( self, function_responses: List[FunctionResponse] ) -> None: """Send tool responses back to Gemini""" if not self._session or not self._session.session: return try: await self._session.session.send_tool_response( function_responses=function_responses ) except Exception as e: self.emit("error", f"Error sending tool response: {e}") self._session_should_close.set()
Send tool responses back to Gemini
def set_agent(self, agent: Agent) ‑> None
-
Expand source code
def set_agent(self, agent: Agent) -> None: self._instructions = agent.instructions self.tools = agent.tools self.tools_formatted = self._convert_tools_to_gemini_format(self.tools) self.formatted_tools = self.tools_formatted
class GoogleLLM (*,
api_key: str | None = None,
model: str = 'gemini-2.0-flash-001',
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_output_tokens: int | None = None,
top_p: float | None = None,
top_k: int | None = None,
presence_penalty: float | None = None,
frequency_penalty: float | None = None)-
Expand source code
class GoogleLLM(LLM): def __init__( self, *, api_key: str | None = None, model: str = "gemini-2.0-flash-001", temperature: float = 0.7, tool_choice: ToolChoice = "auto", max_output_tokens: int | None = None, top_p: float | None = None, top_k: int | None = None, presence_penalty: float | None = None, frequency_penalty: float | None = None, ) -> None: """Initialize the Google LLM plugin Args: api_key (str): Google API key. If not provided, will attempt to read from GOOGLE_API_KEY env var model (str): The model to use for the LLM plugin. temperature (float): The temperature to use for the LLM plugin tool_choice (ToolChoice): The tool choice to use for the LLM plugin max_output_tokens (int): The maximum output tokens to use for the LLM plugin top_p (float): The top P to use for the LLM plugin top_k (int): The top K to use for the LLM plugin presence_penalty (float): The presence penalty to use for the LLM plugin frequency_penalty (float): The frequency penalty to use for the LLM plugin """ super().__init__() self.api_key = api_key or os.getenv("GOOGLE_API_KEY") if not self.api_key: raise ValueError("Google API key must be provided either through api_key parameter or GOOGLE_API_KEY environment variable") self.model = model self.temperature = temperature self.tool_choice = tool_choice self.max_output_tokens = max_output_tokens self.top_p = top_p self.top_k = top_k self.presence_penalty = presence_penalty self.frequency_penalty = frequency_penalty self._cancelled = False self._client = Client(api_key=self.api_key) async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Implement chat functionality using Google's Gemini API Args: messages: ChatContext containing conversation history tools: Optional list of function tools available to the model **kwargs: Additional arguments passed to the Google API Yields: LLMResponse objects containing the model's responses """ self._cancelled = False try: ( contents, system_instruction, ) = await self._convert_messages_to_contents_async(messages) config_params = { "temperature": self.temperature, **kwargs } if system_instruction: config_params["system_instruction"] = [types.Part(text=system_instruction)] if self.max_output_tokens is not None: config_params["max_output_tokens"] = self.max_output_tokens if self.top_p is not None: config_params["top_p"] = self.top_p if self.top_k is not None: config_params["top_k"] = self.top_k if self.presence_penalty is not None: config_params["presence_penalty"] = self.presence_penalty if self.frequency_penalty is not None: config_params["frequency_penalty"] = self.frequency_penalty if tools: function_declarations = [] for tool in tools: if is_function_tool(tool): try: gemini_tool = build_gemini_schema(tool) function_declarations.append(gemini_tool) except Exception as e: logger.error(f"Failed to format tool {tool}: {e}") continue if function_declarations: config_params["tools"] = [types.Tool(function_declarations=function_declarations)] if self.tool_choice == "required": config_params["tool_config"] = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode=types.FunctionCallingConfigMode.ANY ) ) elif self.tool_choice == "auto": config_params["tool_config"] = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode=types.FunctionCallingConfigMode.AUTO ) ) elif self.tool_choice == "none": config_params["tool_config"] = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode=types.FunctionCallingConfigMode.NONE ) ) config = types.GenerateContentConfig(**config_params) response_stream = await self._client.aio.models.generate_content_stream( model=self.model, contents=contents, config=config, ) current_content = "" current_function_calls = [] async for response in response_stream: if self._cancelled: break if response.prompt_feedback: error_msg = f"Prompt feedback error: {response.prompt_feedback}" self.emit("error", Exception(error_msg)) raise Exception(error_msg) if not response.candidates or not response.candidates[0].content: continue candidate = response.candidates[0] if not candidate.content.parts: continue for part in candidate.content.parts: if part.function_call: function_call = { "name": part.function_call.name, "arguments": dict(part.function_call.args) } current_function_calls.append(function_call) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"function_call": function_call} ) elif part.text: current_content = part.text yield LLMResponse( content=current_content, role=ChatRole.ASSISTANT ) except (ClientError, ServerError, APIError) as e: if not self._cancelled: error_msg = f"Google API error: {e}" self.emit("error", Exception(error_msg)) raise Exception(error_msg) from e except Exception as e: if not self._cancelled: self.emit("error", e) raise async def cancel_current_generation(self) -> None: self._cancelled = True async def _convert_messages_to_contents_async( self, messages: ChatContext ) -> tuple[list[types.Content], str | None]: """Convert ChatContext to Google Content format""" async def _format_content_parts_async( content: Union[str, List[ChatContent]] ) -> List[types.Part]: if isinstance(content, str): return [types.Part(text=content)] if len(content) == 1 and isinstance(content[0], str): return [types.Part(text=content[0])] formatted_parts = [] for part in content: if isinstance(part, str): formatted_parts.append(types.Part(text=part)) elif isinstance(part, ImageContent): data_url = part.to_data_url() if data_url.startswith("data:"): header, b64_data = data_url.split(",", 1) media_type = header.split(";")[0].split(":")[1] image_bytes = base64.b64decode(b64_data) formatted_parts.append( types.Part( inline_data=types.Blob( mime_type=media_type, data=image_bytes ) ) ) else: # Fetch image from URL async with httpx.AsyncClient() as client: try: response = await client.get(data_url) response.raise_for_status() image_bytes = response.content media_type = response.headers.get( "Content-Type", "image/jpeg" ) formatted_parts.append( types.Part( inline_data=types.Blob( mime_type=media_type, data=image_bytes ) ) ) except httpx.HTTPStatusError as e: logger.error(f"Failed to fetch image from URL {data_url}: {e}") continue return formatted_parts contents = [] system_instruction = None for item in messages.items: if isinstance(item, ChatMessage): if item.role == ChatRole.SYSTEM: if isinstance(item.content, list): system_instruction = next( (str(p) for p in item.content if isinstance(p, str)), "" ) else: system_instruction = str(item.content) continue elif item.role == ChatRole.USER: parts = await _format_content_parts_async(item.content) contents.append(types.Content(role="user", parts=parts)) elif item.role == ChatRole.ASSISTANT: parts = await _format_content_parts_async(item.content) contents.append(types.Content(role="model", parts=parts)) elif isinstance(item, FunctionCall): function_call = types.FunctionCall( name=item.name, args=( json.loads(item.arguments) if isinstance(item.arguments, str) else item.arguments ), ) contents.append( types.Content(role="model", parts=[types.Part(function_call=function_call)]) ) elif isinstance(item, FunctionCallOutput): function_response = types.FunctionResponse( name=item.name, response={"output": item.output} ) contents.append( types.Content( role="user", parts=[types.Part(function_response=function_response)] ) ) return contents, system_instruction async def aclose(self) -> None: await self.cancel_current_generation()
Base class for LLM implementations.
Initialize the Google LLM plugin
Args
api_key
:str
- Google API key. If not provided, will attempt to read from GOOGLE_API_KEY env var
model
:str
- The model to use for the LLM plugin.
temperature
:float
- The temperature to use for the LLM plugin
tool_choice
:ToolChoice
- The tool choice to use for the LLM plugin
max_output_tokens
:int
- The maximum output tokens to use for the LLM plugin
top_p
:float
- The top P to use for the LLM plugin
top_k
:int
- The top K to use for the LLM plugin
presence_penalty
:float
- The presence penalty to use for the LLM plugin
frequency_penalty
:float
- The frequency penalty to use for the LLM plugin
Ancestors
- videosdk.agents.llm.llm.LLM
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: await self.cancel_current_generation()
Cleanup resources.
async def cancel_current_generation(self) ‑> None
-
Expand source code
async def cancel_current_generation(self) -> None: self._cancelled = True
Cancel the current LLM generation if active.
Raises
NotImplementedError
- This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]-
Expand source code
async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Implement chat functionality using Google's Gemini API Args: messages: ChatContext containing conversation history tools: Optional list of function tools available to the model **kwargs: Additional arguments passed to the Google API Yields: LLMResponse objects containing the model's responses """ self._cancelled = False try: ( contents, system_instruction, ) = await self._convert_messages_to_contents_async(messages) config_params = { "temperature": self.temperature, **kwargs } if system_instruction: config_params["system_instruction"] = [types.Part(text=system_instruction)] if self.max_output_tokens is not None: config_params["max_output_tokens"] = self.max_output_tokens if self.top_p is not None: config_params["top_p"] = self.top_p if self.top_k is not None: config_params["top_k"] = self.top_k if self.presence_penalty is not None: config_params["presence_penalty"] = self.presence_penalty if self.frequency_penalty is not None: config_params["frequency_penalty"] = self.frequency_penalty if tools: function_declarations = [] for tool in tools: if is_function_tool(tool): try: gemini_tool = build_gemini_schema(tool) function_declarations.append(gemini_tool) except Exception as e: logger.error(f"Failed to format tool {tool}: {e}") continue if function_declarations: config_params["tools"] = [types.Tool(function_declarations=function_declarations)] if self.tool_choice == "required": config_params["tool_config"] = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode=types.FunctionCallingConfigMode.ANY ) ) elif self.tool_choice == "auto": config_params["tool_config"] = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode=types.FunctionCallingConfigMode.AUTO ) ) elif self.tool_choice == "none": config_params["tool_config"] = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode=types.FunctionCallingConfigMode.NONE ) ) config = types.GenerateContentConfig(**config_params) response_stream = await self._client.aio.models.generate_content_stream( model=self.model, contents=contents, config=config, ) current_content = "" current_function_calls = [] async for response in response_stream: if self._cancelled: break if response.prompt_feedback: error_msg = f"Prompt feedback error: {response.prompt_feedback}" self.emit("error", Exception(error_msg)) raise Exception(error_msg) if not response.candidates or not response.candidates[0].content: continue candidate = response.candidates[0] if not candidate.content.parts: continue for part in candidate.content.parts: if part.function_call: function_call = { "name": part.function_call.name, "arguments": dict(part.function_call.args) } current_function_calls.append(function_call) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"function_call": function_call} ) elif part.text: current_content = part.text yield LLMResponse( content=current_content, role=ChatRole.ASSISTANT ) except (ClientError, ServerError, APIError) as e: if not self._cancelled: error_msg = f"Google API error: {e}" self.emit("error", Exception(error_msg)) raise Exception(error_msg) from e except Exception as e: if not self._cancelled: self.emit("error", e) raise
Implement chat functionality using Google's Gemini API
Args
messages
- ChatContext containing conversation history
tools
- Optional list of function tools available to the model
**kwargs
- Additional arguments passed to the Google API
Yields
LLMResponse objects containing the model's responses
class GoogleSTT (*,
api_key: Optional[str] = None,
languages: Union[str, list[str]] = 'en-US',
model: str = 'latest_long',
sample_rate: int = 16000,
interim_results: bool = True,
punctuate: bool = True,
min_confidence_threshold: float = 0.1,
location: str = 'global',
**kwargs: Any)-
Expand source code
class GoogleSTT(BaseSTT): def __init__( self, *, api_key: Optional[str] = None, languages: Union[str, list[str]] = "en-US", model: str = "latest_long", sample_rate: int = 16000, interim_results: bool = True, punctuate: bool = True, min_confidence_threshold: float = 0.1, location: str = "global", **kwargs: Any ) -> None: """Initialize the Google STT plugin. Args: api_key (Optional[str], optional): Google API key. Defaults to None. languages (Union[str, list[str]]): The languages to use for the STT plugin. Defaults to "en-US". model (str): The model to use for the STT plugin. Defaults to "latest_long". sample_rate (int): The sample rate to use for the STT plugin. Defaults to 16000. interim_results (bool): Whether to use interim results for the STT plugin. Defaults to True. punctuate (bool): Whether to use punctuation for the STT plugin. Defaults to True. min_confidence_threshold (float): The minimum confidence threshold for the STT plugin. Defaults to 0.1. location (str): The location to use for the STT plugin. Defaults to "global". """ super().__init__() if not GOOGLE_V2_AVAILABLE: logger.error("Google Cloud Speech V2 is not available") raise ImportError("google-cloud-speech is not installed") if api_key: os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = api_key try: gauth_default() except DefaultCredentialsError as e: logger.error("Google credentials are not configured", exc_info=True) raise ValueError("Google credentials are not configured.") from e self.input_sample_rate = 48000 self.target_sample_rate = sample_rate if isinstance(languages, str): languages = [languages] self._config = { "languages": languages, "model": model, "sample_rate": self.target_sample_rate, "interim_results": interim_results, "punctuate": punctuate, "min_confidence_threshold": min_confidence_threshold, "location": location, } self._client: Optional[SpeechAsyncClient] = None self._stream: Optional[SpeechStream] = None async def _ensure_client(self): if self._client: return try: opts = None if self._config["location"] != "global": opts = ClientOptions(api_endpoint=f"{self._config['location']}-speech.googleapis.com") self._client = SpeechAsyncClient(client_options=opts) except Exception as e: logger.error("Failed to create SpeechAsyncClient", exc_info=True) raise e async def process_audio(self, audio_frames: bytes, **kwargs: Any) -> None: try: if not self._stream: await self._start_stream() if self._stream: if SCIPY_AVAILABLE: try: audio_data = np.frombuffer(audio_frames, dtype=np.int16) resampled_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate)) resampled_bytes = resampled_data.astype(np.int16).tobytes() await self._stream.push_audio(resampled_bytes) except Exception as e: logger.error("Error resampling audio", exc_info=True) self.emit("error", {"message": "Error resampling audio", "error": str(e)}) else: await self._stream.push_audio(audio_frames) except Exception as e: logger.error("process_audio failed", exc_info=True) if self._stream: self.emit("error", {"message": "Failed to process audio", "error": str(e)}) async def _start_stream(self): await self._ensure_client() try: self._stream = SpeechStream(self._client, self._config, self._transcript_callback) await self._stream.start() except Exception as e: logger.error("Failed to start SpeechStream", exc_info=True) raise e async def aclose(self) -> None: try: if self._stream: await self._stream.close() self._stream = None self._client = None except Exception as e: logger.error("Error during aclose", exc_info=True)
Base class for Speech-to-Text implementations
Initialize the Google STT plugin.
Args
api_key
:Optional[str]
, optional- Google API key. Defaults to None.
languages
:Union[str, list[str]]
- The languages to use for the STT plugin. Defaults to "en-US".
model
:str
- The model to use for the STT plugin. Defaults to "latest_long".
sample_rate
:int
- The sample rate to use for the STT plugin. Defaults to 16000.
interim_results
:bool
- Whether to use interim results for the STT plugin. Defaults to True.
punctuate
:bool
- Whether to use punctuation for the STT plugin. Defaults to True.
min_confidence_threshold
:float
- The minimum confidence threshold for the STT plugin. Defaults to 0.1.
location
:str
- The location to use for the STT plugin. Defaults to "global".
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: try: if self._stream: await self._stream.close() self._stream = None self._client = None except Exception as e: logger.error("Error during aclose", exc_info=True)
Cleanup resources
async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None
-
Expand source code
async def process_audio(self, audio_frames: bytes, **kwargs: Any) -> None: try: if not self._stream: await self._start_stream() if self._stream: if SCIPY_AVAILABLE: try: audio_data = np.frombuffer(audio_frames, dtype=np.int16) resampled_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate)) resampled_bytes = resampled_data.astype(np.int16).tobytes() await self._stream.push_audio(resampled_bytes) except Exception as e: logger.error("Error resampling audio", exc_info=True) self.emit("error", {"message": "Error resampling audio", "error": str(e)}) else: await self._stream.push_audio(audio_frames) except Exception as e: logger.error("process_audio failed", exc_info=True) if self._stream: self.emit("error", {"message": "Failed to process audio", "error": str(e)})
Process audio frames and convert to text
Args
audio_frames
- Iterator of bytes to process
language
- Optional language code for recognition
**kwargs
- Additional provider-specific arguments
Returns
AsyncIterator yielding STTResponse objects
class GoogleTTS (*,
api_key: str | None = None,
speed: float = 1.0,
pitch: float = 0.0,
response_format: "Literal['pcm']" = 'pcm',
voice_config: GoogleVoiceConfig | None = None)-
Expand source code
class GoogleTTS(TTS): def __init__( self, *, api_key: str | None = None, speed: float = 1.0, pitch: float = 0.0, response_format: Literal["pcm"] = "pcm", voice_config: GoogleVoiceConfig | None = None, ) -> None: """Initialize the Google TTS plugin. Args: api_key (Optional[str], optional): Google API key. Defaults to None. speed (float): The speed to use for the TTS plugin. Defaults to 1.0. pitch (float): The pitch to use for the TTS plugin. Defaults to 0.0. response_format (Literal["pcm"]): The response format to use for the TTS plugin. Defaults to "pcm". voice_config (GoogleVoiceConfig | None): The voice configuration to use for the TTS plugin. Defaults to None. """ super().__init__(sample_rate=GOOGLE_SAMPLE_RATE, num_channels=GOOGLE_CHANNELS) self.speed = speed self.pitch = pitch self.response_format = response_format self.audio_track = None self.loop = None self._first_chunk_sent = False self.voice_config = voice_config or GoogleVoiceConfig() self.api_key = api_key or os.getenv("GOOGLE_API_KEY") if not self.api_key: raise ValueError( "Google TTS API key required. Provide either:\n" "1. api_key parameter, OR\n" "2. GOOGLE_API_KEY environment variable" ) self._http_client = httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=30.0, write=5.0, pool=5.0), follow_redirects=True, ) def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False async def synthesize( self, text: AsyncIterator[str] | str, **kwargs: Any, ) -> None: try: if isinstance(text, AsyncIterator): async for segment in segment_text(text): await self._synthesize_audio(segment) else: await self._synthesize_audio(text) if not self.audio_track or not self.loop: self.emit("error", "Audio track or loop not initialized") return except Exception as e: self.emit("error", f"Google TTS synthesis failed: {str(e)}") async def _synthesize_audio(self, text: str) -> None: """Synthesize text to speech using Google TTS REST API""" try: voice_config = { "languageCode": self.voice_config.languageCode, "name": self.voice_config.name, } if not self.voice_config.name.startswith("en-US-Studio"): voice_config["ssmlGender"] = self.voice_config.ssmlGender payload = { "input": {"text": text}, "voice": voice_config, "audioConfig": { "audioEncoding": "LINEAR16", "speakingRate": self.speed, "pitch": self.pitch, "sampleRateHertz": GOOGLE_SAMPLE_RATE, }, } response = await self._http_client.post( GOOGLE_TTS_ENDPOINT, params={"key": self.api_key}, json=payload ) response.raise_for_status() response_data = response.json() audio_content = response_data.get("audioContent") if not audio_content: self.emit("error", "No audio content received from Google TTS") return audio_bytes = base64.b64decode(audio_content) if not audio_bytes: self.emit("error", "Decoded audio bytes are empty") return await self._stream_audio_chunks(audio_bytes) except httpx.HTTPStatusError as e: if e.response.status_code == 403: self.emit( "error", "Google TTS authentication failed. Please check your API key.") elif e.response.status_code == 400: try: error_data = e.response.json() error_msg = error_data.get("error", {}).get( "message", "Bad request") self.emit( "error", f"Google TTS request error: {error_msg}") except: self.emit( "error", "Google TTS bad request. Please check your configuration.") else: self.emit( "error", f"Google TTS HTTP error: {e.response.status_code}") raise async def _stream_audio_chunks(self, audio_bytes: bytes) -> None: """Stream audio data in chunks to avoid beeps and ensure smooth playback""" chunk_size = 960 audio_data = self._remove_wav_header(audio_bytes) for i in range(0, len(audio_data), chunk_size): chunk = audio_data[i:i + chunk_size] if len(chunk) < chunk_size and len(chunk) > 0: padding_needed = chunk_size - len(chunk) chunk += b'\x00' * padding_needed if len(chunk) == chunk_size: if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() asyncio.create_task(self.audio_track.add_new_bytes(chunk)) await asyncio.sleep(0.001) def _remove_wav_header(self, audio_bytes: bytes) -> bytes: """Remove WAV header if present to get raw PCM data""" if audio_bytes.startswith(b"RIFF"): data_pos = audio_bytes.find(b"data") if data_pos != -1: return audio_bytes[data_pos + 8:] return audio_bytes async def aclose(self) -> None: if self._http_client: await self._http_client.aclose() await super().aclose() async def interrupt(self) -> None: if self.audio_track: self.audio_track.interrupt()
Base class for Text-to-Speech implementations
Initialize the Google TTS plugin.
Args
api_key
:Optional[str]
, optional- Google API key. Defaults to None.
speed
:float
- The speed to use for the TTS plugin. Defaults to 1.0.
pitch
:float
- The pitch to use for the TTS plugin. Defaults to 0.0.
- response_format (Literal["pcm"]): The response format to use for the TTS plugin. Defaults to "pcm".
voice_config
:GoogleVoiceConfig | None
- The voice configuration to use for the TTS plugin. Defaults to None.
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: if self._http_client: await self._http_client.aclose() await super().aclose()
Cleanup resources
async def interrupt(self) ‑> None
-
Expand source code
async def interrupt(self) -> None: if self.audio_track: self.audio_track.interrupt()
Interrupt the TTS process
def reset_first_audio_tracking(self) ‑> None
-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False
Reset the first audio tracking state for next TTS task
async def synthesize(self, text: AsyncIterator[str] | str, **kwargs: Any) ‑> None
-
Expand source code
async def synthesize( self, text: AsyncIterator[str] | str, **kwargs: Any, ) -> None: try: if isinstance(text, AsyncIterator): async for segment in segment_text(text): await self._synthesize_audio(segment) else: await self._synthesize_audio(text) if not self.audio_track or not self.loop: self.emit("error", "Audio track or loop not initialized") return except Exception as e: self.emit("error", f"Google TTS synthesis failed: {str(e)}")
Convert text to speech
Args
text
- Text to convert to speech (either string or async iterator of strings)
voice_id
- Optional voice identifier
**kwargs
- Additional provider-specific arguments
Returns
None
class GoogleVoiceConfig (languageCode: str = 'en-US',
name: str = 'en-US-Chirp3-HD-Aoede',
ssmlGender: str = 'FEMALE')-
Expand source code
@dataclass class GoogleVoiceConfig: languageCode: str = "en-US" name: str = "en-US-Chirp3-HD-Aoede" ssmlGender: str = "FEMALE"
GoogleVoiceConfig(languageCode: 'str' = 'en-US', name: 'str' = 'en-US-Chirp3-HD-Aoede', ssmlGender: 'str' = 'FEMALE')
Instance variables
var languageCode : str
var name : str
var ssmlGender : str