Package videosdk.plugins.cartesia
Sub-modules
videosdk.plugins.cartesia.stt
videosdk.plugins.cartesia.tts
Classes
class CartesiaSTT (*,
api_key: str | None = None,
model: str = 'ink-whisper',
language: str = 'en',
sample_rate: int = 48000,
base_url: str = 'wss://api.cartesia.ai/stt/websocket')-
Expand source code
class CartesiaSTT(BaseSTT): def __init__( self, *, api_key: str | None = None, model: str = "ink-whisper", language: str = "en", sample_rate: int = 48000, base_url: str = "wss://api.cartesia.ai/stt/websocket", ) -> None: """Initialize the Cartesia STT plugin Args: api_key (str | None, optional): Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None. model (str): The model to use for the STT plugin. Defaults to "ink-whisper". language (str): The language to use for the STT plugin, e.g. "en". Defaults to "en". sample_rate (int): The sample rate to use for the STT plugin. Defaults to 48000. base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.cartesia.ai/stt/websocket". """ super().__init__() self.api_key = api_key or os.getenv("CARTESIA_API_KEY") if not self.api_key: raise ValueError( "Cartesia API key must be provided either through api_key parameter or CARTESIA_API_KEY environment variable") self.model = model self.language = language self.sample_rate = sample_rate self.base_url = base_url self._session: Optional[aiohttp.ClientSession] = None self._ws: Optional[aiohttp.ClientWebSocketResponse] = None self._ws_task: Optional[asyncio.Task] = None self._last_interim_at = 0.0 self.input_sample_rate = sample_rate self.target_sample_rate = 16000 async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """Process audio frames and send to Cartesia's STT API""" if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: audio_data = np.frombuffer(audio_frames, dtype=np.int16) if self.input_sample_rate != self.target_sample_rate: audio_data = signal.resample( audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate) ) audio_bytes = audio_data.astype(np.int16).tobytes() await self._ws.send_bytes(audio_bytes) except Exception as e: self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = None async def _listen_for_responses(self) -> None: """Background task to listen for WebSocket responses""" if not self._ws: return try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: data = msg.json() responses = self._handle_ws_message(data) for response in responses: if self._transcript_callback: await self._transcript_callback(response) elif msg.type == aiohttp.WSMsgType.ERROR: error = f"WebSocket error: {self._ws.exception()}" self.emit("error", error) break elif msg.type == aiohttp.WSMsgType.CLOSED: logger.info("WebSocket connection closed") break except Exception as e: self.emit("error", f"Error listening for responses: {str(e)}") finally: if self._ws: await self._ws.close() self._ws = None async def _connect_ws(self) -> None: """Establish WebSocket connection with Cartesia's STT API""" if not self._session: self._session = aiohttp.ClientSession() query_params = { "model": self.model, "language": self.language, "encoding": "pcm_s16le", "sample_rate": str(self.target_sample_rate), "api_key": self.api_key, } headers = { "Cartesia-Version": "2024-11-13", "User-Agent": "VideoSDK-Cartesia-STT", } ws_url = f"{self.base_url}?{urlencode(query_params)}" try: self._ws = await self._session.ws_connect(ws_url, headers=headers) except Exception as e: logger.error(f"Error connecting to WebSocket: {str(e)}") if self._ws: await self._ws.close() self._ws = None raise def _handle_ws_message(self, msg: dict) -> list[STTResponse]: """Handle incoming WebSocket messages and generate STT responses""" responses = [] try: msg_type = msg.get("type") if msg_type == "transcript": transcript = msg.get("text", "") is_final = msg.get("is_final", False) language = msg.get("language", self.language) duration = msg.get("duration", 0.0) if transcript: current_time = time.time() if is_final: responses.append(STTResponse( event_type=SpeechEventType.FINAL, data=SpeechData( text=transcript, confidence=1.0, language=language, start_time=0.0, end_time=duration, ), metadata={ "model": self.model, "request_id": msg.get("request_id"), "duration": duration, } )) else: if current_time - self._last_interim_at > 0.1: responses.append(STTResponse( event_type=SpeechEventType.INTERIM, data=SpeechData( text=transcript, confidence=1.0, language=language, start_time=0.0, end_time=duration, ), metadata={ "model": self.model, "request_id": msg.get("request_id"), "duration": duration, } )) self._last_interim_at = current_time elif msg_type == "flush_done": logger.info("Cartesia STT: Flush completed") elif msg_type == "done": logger.info("Cartesia STT: Session ended") elif msg_type == "error": error_msg = msg.get("message", "Unknown error") error_code = msg.get("code", "unknown") self.emit("error", f"{error_code}: {error_msg}") except Exception as e: logger.error(f"Error handling WebSocket message: {str(e)}") return responses async def aclose(self) -> None: """Cleanup resources""" if self._ws and not self._ws.closed: try: await self._ws.send_str("done") await asyncio.sleep(0.1) except Exception as e: logger.error(f"Error sending done command: {str(e)}") if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: await self._ws.close() self._ws = None if self._session: await self._session.close() self._session = None
Base class for Speech-to-Text implementations
Initialize the Cartesia STT plugin
Args
api_key
:str | None
, optional- Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
model
:str
- The model to use for the STT plugin. Defaults to "ink-whisper".
language
:str
- The language to use for the STT plugin, e.g. "en". Defaults to "en".
sample_rate
:int
- The sample rate to use for the STT plugin. Defaults to 48000.
base_url
:str
- The base URL to use for the STT plugin. Defaults to "wss://api.cartesia.ai/stt/websocket".
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" if self._ws and not self._ws.closed: try: await self._ws.send_str("done") await asyncio.sleep(0.1) except Exception as e: logger.error(f"Error sending done command: {str(e)}") if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: await self._ws.close() self._ws = None if self._session: await self._session.close() self._session = None
Cleanup resources
async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None
-
Expand source code
async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """Process audio frames and send to Cartesia's STT API""" if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: audio_data = np.frombuffer(audio_frames, dtype=np.int16) if self.input_sample_rate != self.target_sample_rate: audio_data = signal.resample( audio_data, int(len(audio_data) * self.target_sample_rate / self.input_sample_rate) ) audio_bytes = audio_data.astype(np.int16).tobytes() await self._ws.send_bytes(audio_bytes) except Exception as e: self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = None
Process audio frames and send to Cartesia's STT API
class CartesiaTTS (*,
api_key: str | None = None,
model: str = 'sonic-2',
voice_id: Union[str, List[float]] = '794f9389-aac1-45b6-b726-9d9369183238',
language: str = 'en',
base_url: str = 'https://api.cartesia.ai')-
Expand source code
class CartesiaTTS(TTS): def __init__( self, *, api_key: str | None = None, model: str = DEFAULT_MODEL, voice_id: Union[str, List[float]] = DEFAULT_VOICE_ID, language: str = "en", base_url: str = "https://api.cartesia.ai", ) -> None: """Initialize the Cartesia TTS plugin Args: api_key (str | None, optional): Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None. model (str): The model to use for the TTS plugin. Defaults to "sonic-2". voice_id (Union[str, List[float]]): The voice ID to use for the TTS plugin. Defaults to "794f9389-aac1-45b6-b726-9d9369183238". api_key (str | None, optional): Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None. language (str): The language to use for the TTS plugin. Defaults to "en". base_url (str): The base URL to use for the TTS plugin. Defaults to "https://api.cartesia.ai". """ super().__init__(sample_rate=CARTESIA_SAMPLE_RATE, num_channels=CARTESIA_CHANNELS) self.model = model self.language = language self.base_url = base_url self._voice = voice_id self._first_chunk_sent = False self._audio_buffer = bytearray() self._interrupted = False self._current_tasks: list[asyncio.Task] = [] api_key = api_key or os.getenv("CARTESIA_API_KEY") if not api_key: raise ValueError("Cartesia API key must be provided") self._api_key = api_key self._ws_session: aiohttp.ClientSession | None = None self._ws_connection: aiohttp.ClientWebSocketResponse | None = None self._connection_lock = asyncio.Lock() def reset_first_audio_tracking(self) -> None: self._first_chunk_sent = False self._audio_buffer.clear() self._interrupted = False async def _ensure_ws_connection(self) -> aiohttp.ClientWebSocketResponse: async with self._connection_lock: if self._ws_connection and not self._ws_connection.closed: return self._ws_connection if self._ws_session is None or self._ws_session.closed: self._ws_session = aiohttp.ClientSession() ws_url = self.base_url.replace('http', 'ws', 1) full_ws_url = f"{ws_url}/tts/websocket?api_key={self._api_key}&cartesia_version={API_VERSION}" try: self._ws_connection = await asyncio.wait_for( self._ws_session.ws_connect(full_ws_url, heartbeat=30.0), timeout=5.0 ) return self._ws_connection except Exception as e: self.emit( "error", f"Failed to establish WebSocket connection: {e}") raise async def _send_task(self, ws: aiohttp.ClientWebSocketResponse, text_iterator: AsyncIterator[str]): context_id = os.urandom(8).hex() voice_payload: dict[str, Any] = {} if isinstance(self._voice, str): voice_payload["mode"] = "id" voice_payload["id"] = self._voice else: voice_payload["mode"] = "embedding" voice_payload["embedding"] = self._voice base_payload = { "model_id": self.model, "language": self.language, "voice": voice_payload, "output_format": {"container": "raw", "encoding": "pcm_s16le", "sample_rate": self.sample_rate}, "add_timestamps": True, "context_id": context_id, } delimiters = {'.', '!', '?', '\n'} buffer = "" async def send_sentence(sentence: str) -> None: if not sentence: return payload = {**base_payload, "transcript": sentence + " ", "continue": True} await ws.send_str(json.dumps(payload)) def first_delim_pos(buf: str) -> int: return min((p for p in (buf.find(d) for d in delimiters) if p != -1), default=-1) def over_thresholds(buf: str) -> bool: if len(buf) >= MIN_CHARS_FLUSH: return True if len(buf.split()) >= MIN_WORDS_FLUSH: return True return False aiter = text_iterator.__aiter__() while not self._interrupted: try: next_task = asyncio.create_task(aiter.__anext__()) try: text_chunk = await asyncio.wait_for(next_task, timeout=INACTIVITY_TIMEOUT_SEC) except asyncio.TimeoutError: # Inactivity flush next_task.cancel() if buffer.strip() and not self._interrupted: await send_sentence(buffer.strip()) buffer = "" continue if not self._interrupted: buffer += text_chunk # Greedy punctuation split first while True: pos = first_delim_pos(buffer) if pos == -1: break sentence = buffer[:pos + 1].strip() buffer = buffer[pos + 1:] if sentence: await send_sentence(sentence) # Threshold-based flush if over_thresholds(buffer): await send_sentence(buffer.strip()) buffer = "" except StopAsyncIteration: break if buffer.strip(): await send_sentence(buffer.strip()) final_payload = {**base_payload, "transcript": " ", "continue": False} await ws.send_str(json.dumps(final_payload)) async def _receive_task(self, ws: aiohttp.ClientWebSocketResponse): while not self._interrupted: msg = await ws.receive() if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING): break if msg.type != aiohttp.WSMsgType.TEXT: continue data = json.loads(msg.data) if data.get("type") == "error": error_details = json.dumps(data, indent=2) self.emit("error", f"Cartesia error: {error_details}") break if "data" in data and data["data"]: audio_chunk = base64.b64decode(data["data"]) await self._stream_audio(audio_chunk) if data.get("done"): break await self._flush_audio_buffer() async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[Union[str, List[float]]] = None, **kwargs: Any, ) -> None: if voice_id: self._voice = voice_id if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._interrupted = False self._current_tasks.clear() if isinstance(text, str): async def _string_iterator(): yield text text_iterator = _string_iterator() else: text_iterator = text try: ws = await self._ensure_ws_connection() send_task = asyncio.create_task(self._send_task(ws, text_iterator)) receive_task = asyncio.create_task(self._receive_task(ws)) self._current_tasks.extend([send_task, receive_task]) await asyncio.gather(send_task, receive_task) except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close() self._ws_connection = None async def _stream_audio(self, audio_chunk: bytes): if self._interrupted: return if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() self._audio_buffer.extend(audio_chunk) while len(self._audio_buffer) >= PLAYBACK_CHUNK_SIZE and not self._interrupted: playback_chunk = self._audio_buffer[:PLAYBACK_CHUNK_SIZE] self._audio_buffer = self._audio_buffer[PLAYBACK_CHUNK_SIZE:] if self.audio_track: await self.audio_track.add_new_bytes(playback_chunk) async def _flush_audio_buffer(self): if self._audio_buffer: chunk = self._audio_buffer self._audio_buffer = bytearray() if len(chunk) < PLAYBACK_CHUNK_SIZE: chunk.extend(b'\x00' * (PLAYBACK_CHUNK_SIZE - len(chunk))) if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() if self.audio_track: await self.audio_track.add_new_bytes(chunk) async def aclose(self) -> None: await super().aclose() if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close() if self._ws_session and not self._ws_session.closed: await self._ws_session.close() async def interrupt(self) -> None: """Interrupt TTS synthesis""" self._interrupted = True for task in self._current_tasks: if not task.done(): task.cancel() if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close() self._ws_connection = None if self.audio_track: self.audio_track.interrupt()
Base class for Text-to-Speech implementations
Initialize the Cartesia TTS plugin
Args
api_key
:str | None
, optional- Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
model
:str
- The model to use for the TTS plugin. Defaults to "sonic-2".
voice_id
:Union[str, List[float]]
- The voice ID to use for the TTS plugin. Defaults to "794f9389-aac1-45b6-b726-9d9369183238".
api_key
:str | None
, optional- Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
language
:str
- The language to use for the TTS plugin. Defaults to "en".
base_url
:str
- The base URL to use for the TTS plugin. Defaults to "https://api.cartesia.ai".
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: await super().aclose() if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close() if self._ws_session and not self._ws_session.closed: await self._ws_session.close()
Cleanup resources
async def interrupt(self) ‑> None
-
Expand source code
async def interrupt(self) -> None: """Interrupt TTS synthesis""" self._interrupted = True for task in self._current_tasks: if not task.done(): task.cancel() if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close() self._ws_connection = None if self.audio_track: self.audio_track.interrupt()
Interrupt TTS synthesis
def reset_first_audio_tracking(self) ‑> None
-
Expand source code
def reset_first_audio_tracking(self) -> None: self._first_chunk_sent = False self._audio_buffer.clear() self._interrupted = False
Reset the first audio tracking state for next TTS task
async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[Union[str, List[float]]] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[Union[str, List[float]]] = None, **kwargs: Any, ) -> None: if voice_id: self._voice = voice_id if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._interrupted = False self._current_tasks.clear() if isinstance(text, str): async def _string_iterator(): yield text text_iterator = _string_iterator() else: text_iterator = text try: ws = await self._ensure_ws_connection() send_task = asyncio.create_task(self._send_task(ws, text_iterator)) receive_task = asyncio.create_task(self._receive_task(ws)) self._current_tasks.extend([send_task, receive_task]) await asyncio.gather(send_task, receive_task) except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close() self._ws_connection = None
Convert text to speech
Args
text
- Text to convert to speech (either string or async iterator of strings)
voice_id
- Optional voice identifier
**kwargs
- Additional provider-specific arguments
Returns
None