Module videosdk.plugins.elevenlabs.stt
Classes
class ElevenLabsSTT (*,
api_key: str | None = None,
model_id: str = 'scribe_v2_realtime',
language_code: str = 'en',
sample_rate: int = 48000,
commit_strategy: str = 'vad',
vad_silence_threshold_secs: float = 0.8,
vad_threshold: float = 0.4,
min_speech_duration_ms: int = 50,
min_silence_duration_ms: int = 50,
base_url: str = 'wss://api.elevenlabs.io/v1/speech-to-text/realtime')-
Expand source code
class ElevenLabsSTT(BaseSTT): """ ElevenLabs Realtime Speech-to-Text (STT) client. """ def __init__( self, *, api_key: str | None = None, model_id: str = "scribe_v2_realtime", language_code: str = "en", sample_rate: int = 48000, commit_strategy: str = "vad", vad_silence_threshold_secs: float = 0.8, vad_threshold: float = 0.4, min_speech_duration_ms: int = 50, min_silence_duration_ms: int = 50, base_url: str = "wss://api.elevenlabs.io/v1/speech-to-text/realtime", ) -> None: """ Initialize the ElevenLabs STT client. Args: api_key: ElevenLabs API key for authentication. Defaults to env variable ELEVENLABS_API_KEY. model_id: STT model identifier. language_code: Language code for transcription. sample_rate: Sample rate of input audio in Hz. commit_strategy: Strategy for committing transcripts ('vad' is by default). vad_silence_threshold_secs: Duration of silence to detect end-of-speech. vad_threshold: Threshold for detecting voice activity. min_speech_duration_ms: Minimum duration in milliseconds for a speech segment. min_silence_duration_ms: Minimum duration in milliseconds of silence to consider end-of-speech. base_url: WebSocket endpoint for ElevenLabs STT. Raises: ValueError: If required parameters are missing or invalid. """ super().__init__() self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY") if not self.api_key: raise ValueError("ElevenLabs API key must be provided via api_key or ELEVENLABS_API_KEY env var") self.model_id = model_id self.language_code = language_code self.commit_strategy = commit_strategy self.base_url = base_url self.sample_rate = sample_rate if self.sample_rate not in SUPPORTED_SAMPLE_RATES: raise ValueError(f"Unsupported sample_rate: {self.sample_rate}. Supported rates: {SUPPORTED_SAMPLE_RATES}") self.vad_silence_threshold_secs = vad_silence_threshold_secs self.vad_threshold = vad_threshold self.min_speech_duration_ms = min_speech_duration_ms self.min_silence_duration_ms = min_silence_duration_ms self._last_final_text = "" self._last_final_time = 0.0 self._duplicate_suppression_window = 0.75 self._stream_buffer = bytearray() self._target_chunk_size = int(0.1 * self.sample_rate * 2) self.heartbeat = 15.0 self._session: Optional[aiohttp.ClientSession] = None self._ws: Optional[aiohttp.ClientWebSocketResponse] = None self._ws_task: Optional[asyncio.Task] = None async def process_audio( self, audio_frames: bytes, **kwargs: Any ) -> None: """ Process and send audio frames. Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead. """ if not self._ws or self._ws.closed: await self._connect_ws() if not self._ws_task or self._ws_task.done(): self._ws_task = asyncio.create_task(self._listen_for_responses()) elif self._ws_task and self._ws_task.done(): logger.warning("WebSocket listener stopped unexpectedly, restarting") self._ws_task = asyncio.create_task(self._listen_for_responses()) try: mono_audio = self._convert_to_mono(audio_frames) if not mono_audio: return self._stream_buffer.extend(mono_audio) while len(self._stream_buffer) >= self._target_chunk_size: chunk = self._stream_buffer[:self._target_chunk_size] await self._send_audio(chunk) self._stream_buffer = self._stream_buffer[self._target_chunk_size:] except Exception as e: logger.exception("Error in process_audio: %s", e) self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None async def _connect_ws(self) -> None: if not self._session: self._session = aiohttp.ClientSession() query_params = { "model_id": str(self.model_id), "language_code": str(self.language_code), "audio_format": f"pcm_{self.sample_rate}", "commit_strategy": str(self.commit_strategy), "vad_silence_threshold_secs": self.vad_silence_threshold_secs, "vad_threshold": self.vad_threshold, "min_speech_duration_ms": self.min_speech_duration_ms, "min_silence_duration_ms": self.min_silence_duration_ms, "include_timestamps": True, } ws_url = f"{self.base_url}?{urlencode(query_params)}" headers = {"xi-api-key": self.api_key} try: self._ws = await self._session.ws_connect(ws_url, headers=headers, heartbeat=self.heartbeat) logger.info("Connected to ElevenLabs Realtime STT WebSocket.") except Exception as e: logger.exception("Error connecting to ElevenLabs WebSocket: %s", e) raise async def _send_audio(self, audio_bytes: bytes) -> None: if not self._ws: return payload = { "message_type": "input_audio_chunk", "audio_base_64": base64.b64encode(audio_bytes).decode(), "sample_rate": self.sample_rate, } try: await self._ws.send_str(json.dumps(payload)) except Exception as e: logger.exception("Error sending audio chunk: %s", e) self.emit("error", str(e)) await self.aclose() def _convert_to_mono(self, audio_bytes: bytes) -> bytes: """ Convert input audio bytes to mono. """ if not audio_bytes: return b"" try: raw_audio = np.frombuffer(audio_bytes, dtype=np.int16) if raw_audio.size == 0: return b"" if raw_audio.size % 2 == 0: try: stereo = raw_audio.reshape(-1, 2).astype(np.float32) mono = stereo.mean(axis=1) return mono.astype(np.int16).tobytes() except ValueError: pass return audio_bytes except Exception as e: logger.error("Error converting to mono: %s", e) return b"" async def _listen_for_responses(self) -> None: """ Listen for incoming WebSocket messages from ElevenLabs STT. """ if not self._ws: return try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: data = None try: data = msg.json() except Exception: try: data = json.loads(msg.data) except Exception: logger.debug("Received non-json ws text message") continue responses = await self._handle_ws_event(data) if responses: for r in responses: if self._transcript_callback: try: await self._transcript_callback(r) except Exception: logger.exception("Error in transcript callback") elif msg.type == aiohttp.WSMsgType.ERROR: logger.error("WebSocket error: %s", self._ws.exception()) self.emit("error", f"WebSocket error: {self._ws.exception()}") break elif msg.type == aiohttp.WSMsgType.CLOSED: logger.info("WebSocket closed by server.") break except asyncio.CancelledError: logger.debug("WebSocket listener cancelled") except Exception as e: logger.exception("Error in WebSocket listener: %s", e) self.emit("error", str(e)) finally: if self._ws: try: await self._ws.close() except Exception: pass self._ws = None self._ws_task = None async def _handle_ws_event(self, data: dict) -> List[STTResponse]: """ Process a single WebSocket event from ElevenLabs STT. Args: data: JSON-decoded WebSocket message. Returns: List of STTResponse objects for this event. """ responses: List[STTResponse] = [] message_type = data.get("message_type") logger.debug("Received WS event: %s", message_type) if message_type in STT_ERROR_MSGS: logger.error("ElevenLabs STT error: %s", data) self.emit("error", data) return responses if message_type == "session_started": global_event_emitter.emit("speech_session_started") return responses if message_type == "committed_transcript_with_timestamps": logger.debug("==== Received final transcript event: %s", data) text = data.get("text", "") clean_text = text.strip() avg_conf, duration = self.get_avg_confidence_and_duration(data) now = time.time() if clean_text == "": global_event_emitter.emit("speech_stopped") self._last_final_text = "" self._last_final_time = now return responses resp = STTResponse( event_type=SpeechEventType.FINAL, data=SpeechData( text=clean_text, confidence=avg_conf, duration=duration, ), metadata={"model": self.model_id, "raw_event": data}, ) responses.append(resp) global_event_emitter.emit("speech_stopped") self._last_final_text = clean_text self._last_final_time = now return responses if message_type == "partial_transcript": text = data.get("text", "") clean_text = text.strip() if ( self._last_final_text and clean_text and clean_text == self._last_final_text and (time.time() - self._last_final_time) < self._duplicate_suppression_window ): logger.debug("Dropping duplicate partial matching recent final transcript") return responses resp = STTResponse( event_type=SpeechEventType.INTERIM, data=SpeechData( text=text, confidence=float(data.get("confidence", 0.0)), ), metadata={"model": self.model_id, "raw_event": data}, ) responses.append(resp) if clean_text: global_event_emitter.emit("speech_started") return responses logger.debug("Ignoring unrecognized message_type: %s", message_type) return responses async def aclose(self) -> None: """ Close the WebSocket connection and cleanup session resources. Cancels the listener task, closes WebSocket and HTTP session, and calls the parent class cleanup. """ if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None if self._session: try: await self._session.close() except Exception: pass finally: self._session = None await super().aclose() def get_avg_confidence_and_duration(self,payload: Dict) -> Tuple[float, float]: """ Computes: - Average confidence from log probabilities - Total transcript duration from last word end timestamp Args: payload (dict): Transcript JSON message Returns: (avg_confidence, duration) """ words = payload.get("words", []) total_confidence = 0.0 word_count = 0 last_end_time = 0.0 for w in words: if w.get("type") != "word": continue logprob = w.get("logprob") if logprob is not None: total_confidence += math.exp(logprob) word_count += 1 last_end_time = max(last_end_time, w.get("end", 0.0)) avg_confidence = total_confidence / word_count if word_count > 0 else 0.0 return avg_confidence, last_end_timeElevenLabs Realtime Speech-to-Text (STT) client.
Initialize the ElevenLabs STT client.
Args
api_key- ElevenLabs API key for authentication. Defaults to env variable ELEVENLABS_API_KEY.
model_id- STT model identifier.
language_code- Language code for transcription.
sample_rate- Sample rate of input audio in Hz.
commit_strategy- Strategy for committing transcripts ('vad' is by default).
vad_silence_threshold_secs- Duration of silence to detect end-of-speech.
vad_threshold- Threshold for detecting voice activity.
min_speech_duration_ms- Minimum duration in milliseconds for a speech segment.
min_silence_duration_ms- Minimum duration in milliseconds of silence to consider end-of-speech.
base_url- WebSocket endpoint for ElevenLabs STT.
Raises
ValueError- If required parameters are missing or invalid.
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """ Close the WebSocket connection and cleanup session resources. Cancels the listener task, closes WebSocket and HTTP session, and calls the parent class cleanup. """ if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None if self._session: try: await self._session.close() except Exception: pass finally: self._session = None await super().aclose()Close the WebSocket connection and cleanup session resources.
Cancels the listener task, closes WebSocket and HTTP session, and calls the parent class cleanup.
def get_avg_confidence_and_duration(self, payload: Dict) ‑> Tuple[float, float]-
Expand source code
def get_avg_confidence_and_duration(self,payload: Dict) -> Tuple[float, float]: """ Computes: - Average confidence from log probabilities - Total transcript duration from last word end timestamp Args: payload (dict): Transcript JSON message Returns: (avg_confidence, duration) """ words = payload.get("words", []) total_confidence = 0.0 word_count = 0 last_end_time = 0.0 for w in words: if w.get("type") != "word": continue logprob = w.get("logprob") if logprob is not None: total_confidence += math.exp(logprob) word_count += 1 last_end_time = max(last_end_time, w.get("end", 0.0)) avg_confidence = total_confidence / word_count if word_count > 0 else 0.0 return avg_confidence, last_end_timeComputes: - Average confidence from log probabilities - Total transcript duration from last word end timestamp
Args
payload:dict- Transcript JSON message
Returns
(avg_confidence, duration)
async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None-
Expand source code
async def process_audio( self, audio_frames: bytes, **kwargs: Any ) -> None: """ Process and send audio frames. Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead. """ if not self._ws or self._ws.closed: await self._connect_ws() if not self._ws_task or self._ws_task.done(): self._ws_task = asyncio.create_task(self._listen_for_responses()) elif self._ws_task and self._ws_task.done(): logger.warning("WebSocket listener stopped unexpectedly, restarting") self._ws_task = asyncio.create_task(self._listen_for_responses()) try: mono_audio = self._convert_to_mono(audio_frames) if not mono_audio: return self._stream_buffer.extend(mono_audio) while len(self._stream_buffer) >= self._target_chunk_size: chunk = self._stream_buffer[:self._target_chunk_size] await self._send_audio(chunk) self._stream_buffer = self._stream_buffer[self._target_chunk_size:] except Exception as e: logger.exception("Error in process_audio: %s", e) self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = NoneProcess and send audio frames. Converts to mono (required by ElevenLabs) and buffers 100ms chunks to reduce overhead.