Module videosdk.plugins.google.stt
Classes
class GoogleSTT (*,
api_key: Optional[str] = None,
languages: Union[str, list[str]] = 'en-US',
model: str = 'latest_long',
sample_rate: int = 16000,
interim_results: bool = True,
punctuate: bool = True,
min_confidence_threshold: float = 0.1,
location: str = 'global',
**kwargs: Any)-
Expand source code
class GoogleSTT(BaseSTT): def __init__( self, *, api_key: Optional[str] = None, languages: Union[str, list[str]] = "en-US", model: str = "latest_long", sample_rate: int = 16000, interim_results: bool = True, punctuate: bool = True, min_confidence_threshold: float = 0.1, location: str = "global", **kwargs: Any ) -> None: """Initialize the Google STT plugin. Args: api_key (Optional[str], optional): Google API key. Defaults to None. languages (Union[str, list[str]]): The languages to use for the STT plugin. Defaults to "en-US". model (str): The model to use for the STT plugin. Defaults to "latest_long". sample_rate (int): The sample rate to use for the STT plugin. Defaults to 16000. interim_results (bool): Whether to use interim results for the STT plugin. Defaults to True. punctuate (bool): Whether to use punctuation for the STT plugin. Defaults to True. min_confidence_threshold (float): The minimum confidence threshold for the STT plugin. Defaults to 0.1. location (str): The location to use for the STT plugin. Defaults to "global". """ super().__init__() if not GOOGLE_V2_AVAILABLE: logger.error("Google Cloud Speech V2 is not available") raise ImportError("google-cloud-speech is not installed") if api_key: os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = api_key try: gauth_default() except DefaultCredentialsError as e: logger.error("Google credentials are not configured", exc_info=True) raise ValueError("Google credentials are not configured.") from e self.input_sample_rate = 48000 self.target_sample_rate = sample_rate if isinstance(languages, str): languages = [languages] self._config = { "languages": languages, "model": model, "sample_rate": self.target_sample_rate, "interim_results": interim_results, "punctuate": punctuate, "min_confidence_threshold": min_confidence_threshold, "location": location, } self._client: Optional[SpeechAsyncClient] = None self._stream: Optional[SpeechStream] = None async def _ensure_client(self): if self._client: return try: opts = None if self._config["location"] != "global": opts = ClientOptions(api_endpoint=f"{self._config['location']}-speech.googleapis.com") self._client = SpeechAsyncClient(client_options=opts) except Exception as e: logger.error("Failed to create SpeechAsyncClient", exc_info=True) raise e async def process_audio(self, audio_frames: bytes, **kwargs: Any) -> None: try: if not self._stream: await self._start_stream() if self._stream: if SCIPY_AVAILABLE: try: audio_data = np.frombuffer(audio_frames, dtype=np.int16) resampled_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate)) resampled_bytes = resampled_data.astype(np.int16).tobytes() await self._stream.push_audio(resampled_bytes) except Exception as e: logger.error("Error resampling audio", exc_info=True) self.emit("error", {"message": "Error resampling audio", "error": str(e)}) else: await self._stream.push_audio(audio_frames) except Exception as e: logger.error("process_audio failed", exc_info=True) if self._stream: self.emit("error", {"message": "Failed to process audio", "error": str(e)}) async def _start_stream(self): await self._ensure_client() try: self._stream = SpeechStream(self._client, self._config, self._transcript_callback) await self._stream.start() except Exception as e: logger.error("Failed to start SpeechStream", exc_info=True) raise e async def aclose(self) -> None: try: if self._stream: await self._stream.close() self._stream = None self._client = None except Exception as e: logger.error("Error during aclose", exc_info=True)Base class for Speech-to-Text implementations
Initialize the Google STT plugin.
Args
api_key:Optional[str], optional- Google API key. Defaults to None.
languages:Union[str, list[str]]- The languages to use for the STT plugin. Defaults to "en-US".
model:str- The model to use for the STT plugin. Defaults to "latest_long".
sample_rate:int- The sample rate to use for the STT plugin. Defaults to 16000.
interim_results:bool- Whether to use interim results for the STT plugin. Defaults to True.
punctuate:bool- Whether to use punctuation for the STT plugin. Defaults to True.
min_confidence_threshold:float- The minimum confidence threshold for the STT plugin. Defaults to 0.1.
location:str- The location to use for the STT plugin. Defaults to "global".
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: try: if self._stream: await self._stream.close() self._stream = None self._client = None except Exception as e: logger.error("Error during aclose", exc_info=True)Cleanup resources
async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None-
Expand source code
async def process_audio(self, audio_frames: bytes, **kwargs: Any) -> None: try: if not self._stream: await self._start_stream() if self._stream: if SCIPY_AVAILABLE: try: audio_data = np.frombuffer(audio_frames, dtype=np.int16) resampled_data = signal.resample(audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate)) resampled_bytes = resampled_data.astype(np.int16).tobytes() await self._stream.push_audio(resampled_bytes) except Exception as e: logger.error("Error resampling audio", exc_info=True) self.emit("error", {"message": "Error resampling audio", "error": str(e)}) else: await self._stream.push_audio(audio_frames) except Exception as e: logger.error("process_audio failed", exc_info=True) if self._stream: self.emit("error", {"message": "Failed to process audio", "error": str(e)})Process audio frames and convert to text
Args
audio_frames- Iterator of bytes to process
language- Optional language code for recognition
**kwargs- Additional provider-specific arguments
Returns
AsyncIterator yielding STTResponse objects
class SpeechStream (client: SpeechAsyncClient, config: dict, transcript_callback)-
Expand source code
class SpeechStream: def __init__(self, client: SpeechAsyncClient, config: dict, transcript_callback): self._client = client self._config = config self._transcript_callback = transcript_callback self._audio_queue = asyncio.Queue() self._running = False self._stream_task: Optional[asyncio.Task] = None self.emit = lambda event, payload: logger.warning(f"Emit: {event}, Payload: {payload}") # mock async def start(self): if self._running: return try: self._running = True self._stream_task = asyncio.create_task(self._stream_loop()) except Exception as e: self.emit("error", {"message": "Failed to start stream loop", "error": str(e)}) async def push_audio(self, audio_frames: bytes): if not self._running: logger.warning("Tried to push audio when stream is not running") return try: await self._audio_queue.put(audio_frames) except Exception as e: self.emit("error", {"message": "Failed to push audio", "error": str(e)}) async def _audio_generator(self) -> AsyncGenerator[speech_types.StreamingRecognizeRequest, None]: try: _, project_id = gauth_default() recognizer = f"projects/{project_id}/locations/{self._config['location']}/recognizers/_" except Exception as e: self.emit("error", {"message": "Failed to get project id", "error": str(e)}) return try: streaming_config = speech_types.StreamingRecognitionConfig( config=speech_types.RecognitionConfig( explicit_decoding_config=speech_types.ExplicitDecodingConfig( encoding='LINEAR16', sample_rate_hertz=self._config["sample_rate"], audio_channel_count=2, ), language_codes=self._config["languages"], model=self._config["model"], features=speech_types.RecognitionFeatures( enable_automatic_punctuation=self._config["punctuate"], ), ), streaming_features=speech_types.StreamingRecognitionFeatures( interim_results=self._config["interim_results"], ), ) yield speech_types.StreamingRecognizeRequest( recognizer=recognizer, streaming_config=streaming_config ) except Exception as e: self.emit("error", {"message": "Failed to configure streaming", "error": str(e)}) return while self._running: try: chunk = await asyncio.wait_for(self._audio_queue.get(), timeout=0.1) yield speech_types.StreamingRecognizeRequest(audio=chunk) except asyncio.TimeoutError: continue except Exception as e: self.emit("error", {"message": "Audio chunk error", "error": str(e)}) async def _stream_loop(self): session_started_at = 0 while self._running: try: session_started_at = time.time() stream = await self._client.streaming_recognize(requests=self._audio_generator()) async for response in stream: if time.time() - session_started_at > _MAX_SESSION_DURATION: break self._handle_response(response) except (DeadlineExceeded, asyncio.TimeoutError) as e: self.emit("error", {"message": "Streaming timeout", "error": str(e)}) except GoogleAPICallError as e: self.emit("error", {"message": "Google API call error", "error": str(e)}) await asyncio.sleep(2) except Exception as e: self.emit("error", {"message": "Google STT error", "error": str(e)}) await asyncio.sleep(2) while not self._audio_queue.empty(): try: self._audio_queue.get_nowait() except Exception as e: logger.warning("Failed to flush audio queue", exc_info=True) def _handle_response(self, response: speech_types.StreamingRecognizeResponse): try: if not response.results or not response.results[0].alternatives: return alt = response.results[0].alternatives[0] transcript = alt.transcript.strip() if not transcript: return is_final = response.results[0].is_final confidence = alt.confidence if confidence >= self._config["min_confidence_threshold"]: if self._transcript_callback: event = STTResponse( event_type=SpeechEventType.FINAL if is_final else SpeechEventType.INTERIM, data=SpeechData( text=transcript, confidence=confidence, language=response.results[0].language_code or self._config["languages"][0] ) ) asyncio.create_task(self._transcript_callback(event)) except Exception as e: self.emit("error", {"message": "Error handling response", "error": str(e)}) async def close(self): self._running = False if self._stream_task: self._stream_task.cancel() try: await self._stream_task except asyncio.CancelledError: pass await super().aclose()Methods
async def close(self)-
Expand source code
async def close(self): self._running = False if self._stream_task: self._stream_task.cancel() try: await self._stream_task except asyncio.CancelledError: pass await super().aclose() async def push_audio(self, audio_frames: bytes)-
Expand source code
async def push_audio(self, audio_frames: bytes): if not self._running: logger.warning("Tried to push audio when stream is not running") return try: await self._audio_queue.put(audio_frames) except Exception as e: self.emit("error", {"message": "Failed to push audio", "error": str(e)}) async def start(self)-
Expand source code
async def start(self): if self._running: return try: self._running = True self._stream_task = asyncio.create_task(self._stream_loop()) except Exception as e: self.emit("error", {"message": "Failed to start stream loop", "error": str(e)})