Module videosdk.plugins.cartesia.tts
Classes
class CartesiaTTS (*,
api_key: str | None = None,
model: str = 'sonic-2',
voice_id: Union[str, List[float]] = 'f786b574-daa5-4673-aa0c-cbe3e8534c02',
language: str = 'en',
base_url: str = 'https://api.cartesia.ai',
generation_config: GenerationConfig = GenerationConfig(volume=1.0, speed=1.0, emotion='neutral'))-
Expand source code
class CartesiaTTS(TTS): def __init__( self, *, api_key: str | None = None, model: str = DEFAULT_MODEL, voice_id: Union[str, List[float]] = DEFAULT_VOICE_ID, language: str = "en", base_url: str = "https://api.cartesia.ai", generation_config: GenerationConfig = GenerationConfig(), ) -> None: """Initialize the Cartesia TTS plugin Args: api_key (str | None, optional): Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None. model (str): The model to use for the TTS plugin. Defaults to "sonic-2". voice_id (Union[str, List[float]]): The voice ID to use for the TTS plugin. Defaults to "794f9389-aac1-45b6-b726-9d9369183238". api_key (str | None, optional): Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None. language (str): The language to use for the TTS plugin. Defaults to "en". base_url (str): The base URL to use for the TTS plugin. Defaults to "https://api.cartesia.ai". generation_config (GenerationConfig): The generation config to use for the TTS plugin. Defaults to GenerationConfig(). """ super().__init__(sample_rate=CARTESIA_SAMPLE_RATE, num_channels=CARTESIA_CHANNELS) self.model = model self.language = language self.base_url = base_url self._voice = voice_id self._first_chunk_sent = False self._interrupted = False self._generation_config = generation_config api_key = api_key or os.getenv("CARTESIA_API_KEY") if not api_key: raise ValueError( "Cartesia API key must be provided either through api_key parameter or CARTESIA_API_KEY environment variable") self._api_key = api_key self._ws_session: aiohttp.ClientSession | None = None self._ws_connection: aiohttp.ClientWebSocketResponse | None = None self._connection_lock = asyncio.Lock() self._receive_task: asyncio.Task | None = None self._context_futures: dict[str, asyncio.Future[None]] = {} def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for the next TTS task""" self._first_chunk_sent = False async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[Union[str, List[float]]] = None, **kwargs: Any, ) -> None: """Synthesize text to speech using Cartesia's streaming WebSocket API.""" context_id = "" try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return if voice_id: self._voice = voice_id self._interrupted = False await self._ensure_ws_connection() if not self._ws_connection: raise RuntimeError("WebSocket connection is not available.") context_id = os.urandom(8).hex() done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future() self._context_futures[context_id] = done_future async def _string_iterator(s: str) -> AsyncIterator[str]: yield s text_iterator = _string_iterator(text) if isinstance(text, str) else text send_task = asyncio.create_task(self._send_task(text_iterator, context_id)) await done_future await send_task except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") raise finally: if context_id and context_id in self._context_futures: del self._context_futures[context_id] async def _send_task(self, text_iterator: AsyncIterator[str], context_id: str): """The dedicated task for sending text chunks over the WebSocket.""" has_sent_transcript = False try: voice_payload: dict[str, Any] = {} if isinstance(self._voice, str): voice_payload["mode"] = "id" voice_payload["id"] = self._voice else: voice_payload["mode"] = "embedding" voice_payload["embedding"] = self._voice base_payload = { "model_id": self.model, "language": self.language, "voice": voice_payload, "output_format": {"container": "raw", "encoding": "pcm_s16le", "sample_rate": self.sample_rate}, "add_timestamps": True, "context_id": context_id, "generation_config": asdict(self._generation_config), } async for text_chunk in text_iterator: if self._interrupted: break if text_chunk and text_chunk.strip(): if not has_sent_transcript: pass payload = {**base_payload, "transcript": text_chunk + " ", "continue": True} if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.send_str(json.dumps(payload)) has_sent_transcript = True except Exception as e: future = self._context_futures.get(context_id) if future and not future.done(): future.set_exception(e) finally: if has_sent_transcript and not self._interrupted: final_payload = {**base_payload, "transcript": " ", "continue": False} if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.send_str(json.dumps(final_payload)) async def _receive_loop(self): """A single, long-running task that handles all incoming messages from the WebSocket.""" try: while self._ws_connection and not self._ws_connection.closed: msg = await self._ws_connection.receive() if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): break if msg.type != aiohttp.WSMsgType.TEXT: continue data = json.loads(msg.data) context_id = data.get("context_id") future = self._context_futures.get(context_id) if not future or future.done(): continue if data.get("type") == "error": future.set_exception(RuntimeError(f"Cartesia API error: {json.dumps(data)}")) elif "data" in data and data["data"]: await self._stream_audio(base64.b64decode(data["data"])) elif data.get("done"): future.set_result(None) except Exception as e: for future in self._context_futures.values(): if not future.done(): future.set_exception(e) finally: self._context_futures.clear() async def _ensure_ws_connection(self) -> None: """Establishes or re-establishes the WebSocket connection if needed.""" async with self._connection_lock: if self._ws_connection and not self._ws_connection.closed: return if self._receive_task and not self._receive_task.done(): self._receive_task.cancel() if self._ws_connection: await self._ws_connection.close() if self._ws_session: await self._ws_session.close() try: self._ws_session = aiohttp.ClientSession() ws_url = self.base_url.replace('http', 'ws', 1) full_ws_url = f"{ws_url}/tts/websocket?api_key={self._api_key}&cartesia_version={API_VERSION}" self._ws_connection = await asyncio.wait_for( self._ws_session.ws_connect(full_ws_url, heartbeat=30.0), timeout=5.0 ) self._receive_task = asyncio.create_task(self._receive_loop()) except Exception as e: self.emit("error", f"Failed to establish WebSocket connection: {e}") raise async def _stream_audio(self, audio_chunk: bytes): """Streams a chunk of audio to the audio track.""" if self._interrupted or not audio_chunk: return if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() if self.audio_track: await self.audio_track.add_new_bytes(audio_chunk) async def interrupt(self) -> None: """Interrupts any ongoing TTS, stopping audio playback and network activity.""" self._interrupted = True if self.audio_track: self.audio_track.interrupt() if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close() async def aclose(self) -> None: """Gracefully cleans up all resources.""" await super().aclose() self._interrupted = True if self._receive_task and not self._receive_task.done(): self._receive_task.cancel() if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close() if self._ws_session and not self._ws_session.closed: await self._ws_session.close()Base class for Text-to-Speech implementations
Initialize the Cartesia TTS plugin
Args
api_key:str | None, optional- Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
model:str- The model to use for the TTS plugin. Defaults to "sonic-2".
voice_id:Union[str, List[float]]- The voice ID to use for the TTS plugin. Defaults to "794f9389-aac1-45b6-b726-9d9369183238".
api_key:str | None, optional- Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
language:str- The language to use for the TTS plugin. Defaults to "en".
base_url:str- The base URL to use for the TTS plugin. Defaults to "https://api.cartesia.ai".
generation_config:GenerationConfig- The generation config to use for the TTS plugin. Defaults to GenerationConfig().
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Gracefully cleans up all resources.""" await super().aclose() self._interrupted = True if self._receive_task and not self._receive_task.done(): self._receive_task.cancel() if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close() if self._ws_session and not self._ws_session.closed: await self._ws_session.close()Gracefully cleans up all resources.
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Interrupts any ongoing TTS, stopping audio playback and network activity.""" self._interrupted = True if self.audio_track: self.audio_track.interrupt() if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close()Interrupts any ongoing TTS, stopping audio playback and network activity.
def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for the next TTS task""" self._first_chunk_sent = FalseReset the first audio tracking state for the next TTS task
async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[Union[str, List[float]]] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[Union[str, List[float]]] = None, **kwargs: Any, ) -> None: """Synthesize text to speech using Cartesia's streaming WebSocket API.""" context_id = "" try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return if voice_id: self._voice = voice_id self._interrupted = False await self._ensure_ws_connection() if not self._ws_connection: raise RuntimeError("WebSocket connection is not available.") context_id = os.urandom(8).hex() done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future() self._context_futures[context_id] = done_future async def _string_iterator(s: str) -> AsyncIterator[str]: yield s text_iterator = _string_iterator(text) if isinstance(text, str) else text send_task = asyncio.create_task(self._send_task(text_iterator, context_id)) await done_future await send_task except Exception as e: self.emit("error", f"TTS synthesis failed: {str(e)}") raise finally: if context_id and context_id in self._context_futures: del self._context_futures[context_id]Synthesize text to speech using Cartesia's streaming WebSocket API.
class GenerationConfig (volume: float = 1.0, speed: float = 1.0, emotion: str = 'neutral')-
Expand source code
@dataclass class GenerationConfig: volume: float = 1.0 speed: float = 1.0 emotion: str = "neutral"GenerationConfig(volume: 'float' = 1.0, speed: 'float' = 1.0, emotion: 'str' = 'neutral')
Instance variables
var emotion : strvar speed : floatvar volume : float