Package videosdk.plugins.cartesia
Sub-modules
videosdk.plugins.cartesia.sttvideosdk.plugins.cartesia.tts
Classes
class CartesiaSTT (*,
api_key: str | None = None,
model: str = 'ink-whisper',
language: str = 'en',
sample_rate: int = 48000,
base_url: str = 'wss://api.cartesia.ai/stt/websocket')-
Expand source code
class CartesiaSTT(BaseSTT): def __init__( self, *, api_key: str | None = None, model: str = "ink-whisper", language: str = "en", sample_rate: int = 48000, base_url: str = "wss://api.cartesia.ai/stt/websocket", ) -> None: """Initialize the Cartesia STT plugin Args: api_key (str | None, optional): Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None. model (str): The model to use for the STT plugin. Defaults to "ink-whisper". language (str): The language to use for the STT plugin, e.g. "en". Defaults to "en". sample_rate (int): The sample rate to use for the STT plugin. Defaults to 48000. base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.cartesia.ai/stt/websocket". """ super().__init__() self.api_key = api_key or os.getenv("CARTESIA_API_KEY") if not self.api_key: raise ValueError( "Cartesia API key must be provided either through api_key parameter or CARTESIA_API_KEY environment variable") self.model = model self.language = language self.sample_rate = sample_rate self.base_url = base_url self._session: Optional[aiohttp.ClientSession] = None self._ws: Optional[aiohttp.ClientWebSocketResponse] = None self._ws_task: Optional[asyncio.Task] = None self._last_interim_at = 0.0 self.input_sample_rate = sample_rate self.target_sample_rate = 16000 async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """Process audio frames and send to Cartesia's STT API""" if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: audio_data = np.frombuffer(audio_frames, dtype=np.int16) if audio_data.ndim == 1 and len(audio_data) % 2 == 0 and self.input_sample_rate == self.sample_rate and self.input_sample_rate != self.target_sample_rate: audio_data = audio_data.reshape(-1, 2).mean(axis=1).astype(np.int16) if self.input_sample_rate != self.target_sample_rate: audio_data = self._resample_audio(audio_data, self.input_sample_rate, self.target_sample_rate) audio_data = np.clip(audio_data, -32768, 32767) audio_data = audio_data.astype(np.int16) audio_bytes = audio_data.astype(np.int16).tobytes() await self._ws.send_bytes(audio_bytes) except Exception as e: self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = None async def flush(self) -> None: """Send `finalize` to Cartesia to force a final transcription now. """ if not self._ws or self._ws.closed: return try: await self._ws.send_str("finalize") except Exception as e: logger.error(f"Error flushing Cartesia STT: {str(e)}") async def _listen_for_responses(self) -> None: """Background task to listen for WebSocket responses""" if not self._ws: return try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: data = msg.json() responses = self._handle_ws_message(data) for response in responses: if self._transcript_callback: await self._transcript_callback(response) elif msg.type == aiohttp.WSMsgType.ERROR: error = f"WebSocket error: {self._ws.exception()}" self.emit("error", error) break elif msg.type == aiohttp.WSMsgType.CLOSED: logger.info("WebSocket connection closed") break except Exception as e: self.emit("error", f"Error listening for responses: {str(e)}") finally: if self._ws: await self._ws.close() self._ws = None async def _connect_ws(self) -> None: """Establish WebSocket connection with Cartesia's STT API""" if not self._session: self._session = aiohttp.ClientSession() query_params = { "model": self.model, "language": self.language, "encoding": "pcm_s16le", "sample_rate": str(self.target_sample_rate), "api_key": self.api_key, } headers = { "Cartesia-Version": "2024-11-13", "User-Agent": "VideoSDK-Cartesia-STT", } ws_url = f"{self.base_url}?{urlencode(query_params)}" try: self._ws = await self._session.ws_connect(ws_url, headers=headers) except Exception as e: logger.error(f"Error connecting to WebSocket: {str(e)}") if self._ws: await self._ws.close() self._ws = None raise def _handle_ws_message(self, msg: dict) -> list[STTResponse]: """Handle incoming WebSocket messages and generate STT responses""" responses = [] try: msg_type = msg.get("type") if msg_type == "transcript": transcript = msg.get("text", "") is_final = msg.get("is_final", False) language = msg.get("language", self.language) duration = msg.get("duration", 0.0) if transcript: current_time = time.time() if is_final: responses.append(STTResponse( event_type=SpeechEventType.FINAL, data=SpeechData( text=transcript, confidence=1.0, language=language, start_time=0.0, end_time=duration, duration=duration, ), metadata={ "model": self.model, "request_id": msg.get("request_id"), "duration": duration, } )) else: if current_time - self._last_interim_at > 0.1: responses.append(STTResponse( event_type=SpeechEventType.INTERIM, data=SpeechData( text=transcript, confidence=1.0, language=language, start_time=0.0, end_time=duration, ), metadata={ "model": self.model, "request_id": msg.get("request_id"), "duration": duration, } )) self._last_interim_at = current_time elif msg_type == "flush_done": logger.info("Cartesia STT: Flush completed") elif msg_type == "done": logger.info("Cartesia STT: Session ended") elif msg_type == "error": error_msg = msg.get("message", "Unknown error") error_code = msg.get("code", "unknown") self.emit("error", f"{error_code}: {error_msg}") except Exception as e: logger.error(f"Error handling WebSocket message: {str(e)}") return responses def _resample_audio(self, audio: np.ndarray, orig_sr: int, target_sr : int) -> np.ndarray : """ Use polyphase filtering for resampling, which is more accurate for integer-ratio conversions. Assumes input is np.int16. """ if orig_sr == target_sr: return audio gcd = math.gcd(orig_sr, target_sr) up = target_sr // gcd down = orig_sr // gcd return resample_poly(audio, up, down) async def aclose(self) -> None: """Cleanup resources""" if self._ws and not self._ws.closed: try: await self._ws.send_str("done") await asyncio.sleep(0.1) except Exception as e: logger.error(f"Error sending done command: {str(e)}") if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: await self._ws.close() self._ws = None if self._session: await self._session.close() self._session = None await super().aclose()Base class for Speech-to-Text implementations
Initialize the Cartesia STT plugin
Args
api_key:str | None, optional- Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
model:str- The model to use for the STT plugin. Defaults to "ink-whisper".
language:str- The language to use for the STT plugin, e.g. "en". Defaults to "en".
sample_rate:int- The sample rate to use for the STT plugin. Defaults to 48000.
base_url:str- The base URL to use for the STT plugin. Defaults to "wss://api.cartesia.ai/stt/websocket".
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" if self._ws and not self._ws.closed: try: await self._ws.send_str("done") await asyncio.sleep(0.1) except Exception as e: logger.error(f"Error sending done command: {str(e)}") if self._ws_task: self._ws_task.cancel() try: await self._ws_task except asyncio.CancelledError: pass self._ws_task = None if self._ws: await self._ws.close() self._ws = None if self._session: await self._session.close() self._session = None await super().aclose()Cleanup resources
async def flush(self) ‑> None-
Expand source code
async def flush(self) -> None: """Send `finalize` to Cartesia to force a final transcription now. """ if not self._ws or self._ws.closed: return try: await self._ws.send_str("finalize") except Exception as e: logger.error(f"Error flushing Cartesia STT: {str(e)}")Send
finalizeto Cartesia to force a final transcription now. async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None-
Expand source code
async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """Process audio frames and send to Cartesia's STT API""" if not self._ws: await self._connect_ws() self._ws_task = asyncio.create_task(self._listen_for_responses()) try: audio_data = np.frombuffer(audio_frames, dtype=np.int16) if audio_data.ndim == 1 and len(audio_data) % 2 == 0 and self.input_sample_rate == self.sample_rate and self.input_sample_rate != self.target_sample_rate: audio_data = audio_data.reshape(-1, 2).mean(axis=1).astype(np.int16) if self.input_sample_rate != self.target_sample_rate: audio_data = self._resample_audio(audio_data, self.input_sample_rate, self.target_sample_rate) audio_data = np.clip(audio_data, -32768, 32767) audio_data = audio_data.astype(np.int16) audio_bytes = audio_data.astype(np.int16).tobytes() await self._ws.send_bytes(audio_bytes) except Exception as e: self.emit("error", str(e)) if self._ws: await self._ws.close() self._ws = None if self._ws_task: self._ws_task.cancel() self._ws_task = NoneProcess audio frames and send to Cartesia's STT API
class CartesiaTTS (*,
api_key: str | None = None,
model: str = 'sonic-2',
voice_id: Union[str, List[float]] = 'f8f5f1b2-f02d-4d8e-a40d-fd850a487b3d',
language: str = 'en',
base_url: str = 'https://api.cartesia.ai',
generation_config: GenerationConfig | None = None,
pronunciation_dict_id: str | None = None,
max_buffer_delay_ms: int | None = None,
word_timestamps: bool = False,
max_connection_age_sec: float = 300.0)-
Expand source code
class CartesiaTTS(TTS): def __init__( self, *, api_key: str | None = None, model: str = DEFAULT_MODEL, voice_id: Union[str, List[float]] = DEFAULT_VOICE_ID, language: str = "en", base_url: str = "https://api.cartesia.ai", generation_config: GenerationConfig | None = None, pronunciation_dict_id: str | None = None, max_buffer_delay_ms: int | None = None, word_timestamps: bool = False, max_connection_age_sec: float = DEFAULT_CONNECTION_MAX_AGE_SEC, ) -> None: """Initialize the Cartesia TTS plugin. Args: api_key: Cartesia API key. Falls back to CARTESIA_API_KEY env var. model: Cartesia model id. Defaults to "sonic-2". voice_id: Either a Cartesia voice id (str) or a voice embedding (list of floats). language: BCP-47 language tag. base_url: Cartesia base URL. generation_config: Voice generation params (sonic-3 only). Only fields you set are forwarded; defaults are not sent so they don't override Cartesia's model defaults on older models. pronunciation_dict_id: Custom pronunciation dictionary id. max_buffer_delay_ms: Deprecated. Sentence-paced flushing now drives buffer behavior; this value is no longer forwarded. word_timestamps: If True, request per-word timestamps for transcript sync. max_connection_age_sec: Refresh the WebSocket after this many seconds to avoid hitting Cartesia's idle/session limits. """ super().__init__( sample_rate=CARTESIA_SAMPLE_RATE, num_channels=CARTESIA_CHANNELS, word_timestamps=word_timestamps, ) self.model = model self.language = language self.base_url = base_url self._voice = voice_id self._first_chunk_sent = False self._interrupted = False self._generation_config = generation_config or GenerationConfig() self.pronunciation_dictionary_id = pronunciation_dict_id self.max_buffer_delay_ms = max_buffer_delay_ms self._max_connection_age_sec = max_connection_age_sec api_key = api_key or os.getenv("CARTESIA_API_KEY") if not api_key: raise ValueError( "Cartesia API key must be provided either through api_key parameter or CARTESIA_API_KEY environment variable" ) self._api_key = api_key self._ws_session: aiohttp.ClientSession | None = None self._ws_connection: aiohttp.ClientWebSocketResponse | None = None self._ws_connect_time: float = 0.0 self._ws_request_id: str | None = None self._connection_lock = asyncio.Lock() self._receive_task: asyncio.Task | None = None self._context_futures: dict[str, asyncio.Future[None]] = {} self._current_context_id: str | None = None self._audio_start_time: Optional[float] = None self._spoken_words: List[str] = [] self._word_schedule_tasks: List[asyncio.Task] = [] self._last_scheduled_start_sec: float = -1.0 def reset_first_audio_tracking(self) -> None: self._first_chunk_sent = False self._audio_start_time = None self._spoken_words = [] for task in self._word_schedule_tasks: if not task.done(): task.cancel() self._word_schedule_tasks = [] self._last_scheduled_start_sec = -1.0 async def prewarm(self) -> None: """Pre-establish the Cartesia WebSocket so the first ``synthesize()`` call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly.""" try: await self._ensure_ws_connection() except Exception as e: logger.warning(f"Cartesia TTS prewarm failed: {e}") async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[Union[str, List[float]]] = None, **kwargs: Any, ) -> None: """Synthesize text to speech using Cartesia's streaming WebSocket API.""" context_id = "" try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return if voice_id: self._voice = voice_id self._interrupted = False await self._ensure_ws_connection() if not self._ws_connection: raise RuntimeError("WebSocket connection is not available.") context_id = os.urandom(8).hex() self._current_context_id = context_id done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future() self._context_futures[context_id] = done_future async def _string_iterator(s: str) -> AsyncIterator[str]: yield s text_iterator = _string_iterator(text) if isinstance(text, str) else text send_task = asyncio.create_task(self._send_task(text_iterator, context_id)) try: await done_future except asyncio.CancelledError: pass await send_task except Exception as e: logger.error(f"TTS synthesis failed (context_id={context_id}): {e}") self.emit("error", f"TTS synthesis failed: {str(e)}") raise finally: if context_id and context_id in self._context_futures: del self._context_futures[context_id] if self._current_context_id == context_id: self._current_context_id = None def _build_voice_payload(self) -> dict[str, Any]: if isinstance(self._voice, str): return {"mode": "id", "id": self._voice} return {"mode": "embedding", "embedding": self._voice} def _build_base_payload(self, context_id: str) -> dict[str, Any]: payload: dict[str, Any] = { "model_id": self.model, "language": self.language, "voice": self._build_voice_payload(), "output_format": { "container": "raw", "encoding": "pcm_s16le", "sample_rate": self.sample_rate, }, "add_timestamps": self.supports_word_timestamps, "context_id": context_id, } if _supports_generation_config(self.model, API_VERSION): gen_cfg = _build_generation_config_payload(self._generation_config) if gen_cfg: payload["generation_config"] = gen_cfg if self.pronunciation_dictionary_id: payload["pronunciation_dict_id"] = self.pronunciation_dictionary_id return payload async def _send_text_packet(self, base_payload: dict[str, Any], text: str) -> bool: if not text or not text.strip(): return False if not self._ws_connection or self._ws_connection.closed: return False payload = { **base_payload, "transcript": text + " ", "continue": True, "max_buffer_delay_ms": 0, } await self._ws_connection.send_str(json.dumps(payload)) return True async def _send_task(self, text_iterator: AsyncIterator[Union[str, FlushMarker]], context_id: str) -> None: """Forward sentence-bounded chunks to Cartesia. Sentence segmentation is handled upstream by the framework (BasicSentenceTokenizer in pipeline_orchestrator) — each chunk arriving here is already one sentence, for both LLM streams and ``session.say()`` static text. Cartesia's server progressively streams audio with ``max_buffer_delay_ms: 0`` so we just forward each sentence verbatim. ``FlushMarker`` is a no-op since there is no client-side buffer to drain. """ has_sent_transcript = False base_payload = self._build_base_payload(context_id) try: async for chunk in text_iterator: if self._interrupted: break if isinstance(chunk, FlushMarker): continue if not chunk: continue if await self._send_text_packet(base_payload, chunk): has_sent_transcript = True except Exception as e: logger.error(f"Error in send_task (context_id={context_id}): {e}") future = self._context_futures.get(context_id) if future and not future.done(): future.set_exception(e) finally: if has_sent_transcript and not self._interrupted: final_payload = {**base_payload, "transcript": " ", "continue": False} if self._ws_connection and not self._ws_connection.closed: try: await self._ws_connection.send_str(json.dumps(final_payload)) except Exception as e: logger.debug( f"Error sending final continue:false (context_id={context_id}): {e}" ) async def _receive_loop(self) -> None: try: while self._ws_connection and not self._ws_connection.closed: msg = await self._ws_connection.receive() if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): break if msg.type != aiohttp.WSMsgType.TEXT: continue data = json.loads(msg.data) context_id = data.get("context_id") future = self._context_futures.get(context_id) # Drop frames for cancelled / unknown contexts. A future that's # been cancelled (via interrupt()) has done()==True, so its # trailing audio bytes from Cartesia are silently dropped here # rather than bleeding into the next context's playback. if not future or future.done(): continue if data.get("type") == "error": logger.error( f"Cartesia API error (context_id={context_id}, request_id={self._ws_request_id}): {data}" ) future.set_exception(RuntimeError(f"Cartesia API error: {json.dumps(data)}")) elif data.get("type") == "timestamps" and "word_timestamps" in data: wt = data["word_timestamps"] or {} words = wt.get("words", []) or [] starts = wt.get("start", []) or [] for word, start_sec in zip(words, starts): start_sec_f = float(start_sec) if start_sec_f <= self._last_scheduled_start_sec: continue self._last_scheduled_start_sec = start_sec_f task = asyncio.create_task( self._schedule_word_emit(word, start_sec_f) ) self._word_schedule_tasks.append(task) elif "data" in data and data["data"]: await self._stream_audio(base64.b64decode(data["data"])) elif data.get("done"): future.set_result(None) except asyncio.CancelledError: raise except Exception as e: logger.error(f"Cartesia receive loop error (request_id={self._ws_request_id}): {e}") for fut in self._context_futures.values(): if not fut.done(): fut.set_exception(e) finally: for fut in self._context_futures.values(): if not fut.done(): fut.set_exception(RuntimeError("Cartesia WebSocket closed")) self._context_futures.clear() async def _ensure_ws_connection(self) -> None: async with self._connection_lock: now = asyncio.get_event_loop().time() if self._ws_connection and not self._ws_connection.closed: age = now - self._ws_connect_time if age < self._max_connection_age_sec: return logger.info(f"Refreshing Cartesia WebSocket (age={age:.1f}s)") await self._close_connection_locked() elif self._ws_connection or self._ws_session: await self._close_connection_locked() try: self._ws_session = aiohttp.ClientSession() ws_url = self.base_url.replace("http", "ws", 1) full_ws_url = f"{ws_url}/tts/websocket?cartesia_version={API_VERSION}" self._ws_connection = await asyncio.wait_for( self._ws_session.ws_connect( full_ws_url, headers={"X-API-Key": self._api_key}, heartbeat=30.0, ), timeout=5.0, ) self._ws_connect_time = now try: resp = getattr(self._ws_connection, "_response", None) if resp is not None: self._ws_request_id = resp.headers.get(CARTESIA_REQUEST_ID_HEADER) except Exception: self._ws_request_id = None logger.debug( f"Cartesia WebSocket established (request_id={self._ws_request_id})" ) self._receive_task = asyncio.create_task(self._receive_loop()) except Exception as e: logger.error(f"Failed to establish Cartesia WebSocket: {e}") self.emit("error", f"Failed to establish WebSocket connection: {e}") if self._ws_session and not self._ws_session.closed: try: await self._ws_session.close() except Exception: pass self._ws_session = None self._ws_connection = None raise async def _close_connection_locked(self) -> None: if self._receive_task and not self._receive_task.done(): self._receive_task.cancel() try: await self._receive_task except asyncio.CancelledError: pass except Exception: pass self._receive_task = None if self._ws_connection and not self._ws_connection.closed: try: await self._ws_connection.close() except Exception: pass self._ws_connection = None if self._ws_session and not self._ws_session.closed: try: await self._ws_session.close() except Exception: pass self._ws_session = None self._ws_request_id = None async def _stream_audio(self, audio_chunk: bytes) -> None: if self._interrupted or not audio_chunk: return if not self._first_chunk_sent: self._first_chunk_sent = True self._audio_start_time = asyncio.get_event_loop().time() if self._first_audio_callback: await self._first_audio_callback() if self.audio_track: await self.audio_track.add_new_bytes(audio_chunk) async def _schedule_word_emit(self, word: str, start_sec: float) -> None: while self._audio_start_time is None and not self._interrupted: await asyncio.sleep(0.01) if self._interrupted or self._audio_start_time is None: return target_time = self._audio_start_time + float(start_sec) now = asyncio.get_event_loop().time() delay = target_time - now if delay > 0: try: await asyncio.sleep(delay) except asyncio.CancelledError: return if self._interrupted: return self._spoken_words.append(word) cumulative = " ".join(self._spoken_words) try: self.emit("word_spoken", {"word": word, "cumulative_text": cumulative}) except Exception: pass async def interrupt(self) -> None: """Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; audio chunks for the cancelled context are dropped client-side via context_id filtering.""" self._interrupted = True for task in self._word_schedule_tasks: if not task.done(): task.cancel() self._word_schedule_tasks = [] if self.audio_track: self.audio_track.interrupt() for fut in list(self._context_futures.values()): if not fut.done(): fut.cancel() async def aclose(self) -> None: await super().aclose() self._interrupted = True async with self._connection_lock: await self._close_connection_locked()Base class for Text-to-Speech implementations
Initialize the Cartesia TTS plugin.
Args
api_key- Cartesia API key. Falls back to CARTESIA_API_KEY env var.
model- Cartesia model id. Defaults to "sonic-2".
voice_id- Either a Cartesia voice id (str) or a voice embedding (list of floats).
language- BCP-47 language tag.
base_url- Cartesia base URL.
generation_config- Voice generation params (sonic-3 only). Only fields you set are forwarded; defaults are not sent so they don't override Cartesia's model defaults on older models.
pronunciation_dict_id- Custom pronunciation dictionary id.
max_buffer_delay_ms- Deprecated. Sentence-paced flushing now drives buffer behavior; this value is no longer forwarded.
word_timestamps- If True, request per-word timestamps for transcript sync.
max_connection_age_sec- Refresh the WebSocket after this many seconds to avoid hitting Cartesia's idle/session limits.
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: await super().aclose() self._interrupted = True async with self._connection_lock: await self._close_connection_locked()Cleanup resources
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; audio chunks for the cancelled context are dropped client-side via context_id filtering.""" self._interrupted = True for task in self._word_schedule_tasks: if not task.done(): task.cancel() self._word_schedule_tasks = [] if self.audio_track: self.audio_track.interrupt() for fut in list(self._context_futures.values()): if not fut.done(): fut.cancel()Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; audio chunks for the cancelled context are dropped client-side via context_id filtering.
async def prewarm(self) ‑> None-
Expand source code
async def prewarm(self) -> None: """Pre-establish the Cartesia WebSocket so the first ``synthesize()`` call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly.""" try: await self._ensure_ws_connection() except Exception as e: logger.warning(f"Cartesia TTS prewarm failed: {e}")Pre-establish the Cartesia WebSocket so the first
synthesize()call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly. def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: self._first_chunk_sent = False self._audio_start_time = None self._spoken_words = [] for task in self._word_schedule_tasks: if not task.done(): task.cancel() self._word_schedule_tasks = [] self._last_scheduled_start_sec = -1.0Reset the first audio tracking state for next TTS task
async def synthesize(self,
text: AsyncIterator[Union[str, FlushMarker]] | str,
voice_id: Optional[Union[str, List[float]]] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[Union[str, List[float]]] = None, **kwargs: Any, ) -> None: """Synthesize text to speech using Cartesia's streaming WebSocket API.""" context_id = "" try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return if voice_id: self._voice = voice_id self._interrupted = False await self._ensure_ws_connection() if not self._ws_connection: raise RuntimeError("WebSocket connection is not available.") context_id = os.urandom(8).hex() self._current_context_id = context_id done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future() self._context_futures[context_id] = done_future async def _string_iterator(s: str) -> AsyncIterator[str]: yield s text_iterator = _string_iterator(text) if isinstance(text, str) else text send_task = asyncio.create_task(self._send_task(text_iterator, context_id)) try: await done_future except asyncio.CancelledError: pass await send_task except Exception as e: logger.error(f"TTS synthesis failed (context_id={context_id}): {e}") self.emit("error", f"TTS synthesis failed: {str(e)}") raise finally: if context_id and context_id in self._context_futures: del self._context_futures[context_id] if self._current_context_id == context_id: self._current_context_id = NoneSynthesize text to speech using Cartesia's streaming WebSocket API.
class GenerationConfig (speed: float | None = None,
emotion: str | None = None,
volume: float | None = None)-
Expand source code
@dataclass class GenerationConfig: """Voice generation parameters. Only sent to Cartesia for sonic-3+ models on a sufficiently recent API version, and only for fields explicitly set.""" speed: float | None = None emotion: str | None = None volume: float | None = NoneVoice generation parameters. Only sent to Cartesia for sonic-3+ models on a sufficiently recent API version, and only for fields explicitly set.
Instance variables
var emotion : str | Nonevar speed : float | Nonevar volume : float | None