Package videosdk.plugins.cartesia

Sub-modules

videosdk.plugins.cartesia.stt
videosdk.plugins.cartesia.tts

Classes

class CartesiaSTT (*,
api_key: str | None = None,
model: str = 'ink-whisper',
language: str = 'en',
sample_rate: int = 48000,
base_url: str = 'wss://api.cartesia.ai/stt/websocket')
Expand source code
class CartesiaSTT(BaseSTT):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "ink-whisper",
        language: str = "en",
        sample_rate: int = 48000,
        base_url: str = "wss://api.cartesia.ai/stt/websocket",
    ) -> None:
        """Initialize the Cartesia STT plugin

        Args:
            api_key (str | None, optional): Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the STT plugin. Defaults to "ink-whisper".
            language (str): The language to use for the STT plugin, e.g. "en". Defaults to "en".
            sample_rate (int): The sample rate to use for the STT plugin. Defaults to 48000.
            base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.cartesia.ai/stt/websocket".
        """
        super().__init__()

        self.api_key = api_key or os.getenv("CARTESIA_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Cartesia API key must be provided either through api_key parameter or CARTESIA_API_KEY environment variable")

        self.model = model
        self.language = language
        self.sample_rate = sample_rate
        self.base_url = base_url
        self._session: Optional[aiohttp.ClientSession] = None
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self._ws_task: Optional[asyncio.Task] = None
        self._last_interim_at = 0.0
        self.input_sample_rate = sample_rate
        self.target_sample_rate = 16000

    async def process_audio(
        self,
        audio_frames: bytes,
        language: Optional[str] = None,
        **kwargs: Any
    ) -> None:
        """Process audio frames and send to Cartesia's STT API"""

        if not self._ws:
            await self._connect_ws()

            self._ws_task = asyncio.create_task(self._listen_for_responses())

        try:

            audio_data = np.frombuffer(audio_frames, dtype=np.int16)

            if audio_data.ndim == 1 and len(audio_data) % 2 == 0 and self.input_sample_rate == self.sample_rate and self.input_sample_rate != self.target_sample_rate:
                audio_data = audio_data.reshape(-1, 2).mean(axis=1).astype(np.int16)

            if self.input_sample_rate != self.target_sample_rate:
                audio_data = self._resample_audio(audio_data, self.input_sample_rate, self.target_sample_rate)
                audio_data = np.clip(audio_data, -32768, 32767)
                audio_data = audio_data.astype(np.int16)

            audio_bytes = audio_data.astype(np.int16).tobytes()
            await self._ws.send_bytes(audio_bytes)

        except Exception as e:
            self.emit("error", str(e))
            if self._ws:
                await self._ws.close()
                self._ws = None
                if self._ws_task:
                    self._ws_task.cancel()
                    self._ws_task = None

    async def _listen_for_responses(self) -> None:
        """Background task to listen for WebSocket responses"""
        if not self._ws:
            return
        try:
            async for msg in self._ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = msg.json()
                    responses = self._handle_ws_message(data)
                    for response in responses:
                        if self._transcript_callback:
                            await self._transcript_callback(response)
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    error = f"WebSocket error: {self._ws.exception()}"
                    self.emit("error", error)
                    break
                elif msg.type == aiohttp.WSMsgType.CLOSED:
                    logger.info("WebSocket connection closed")
                    break
        except Exception as e:
            self.emit("error", f"Error listening for responses: {str(e)}")
        finally:
            if self._ws:
                await self._ws.close()
                self._ws = None

    async def _connect_ws(self) -> None:
        """Establish WebSocket connection with Cartesia's STT API"""

        if not self._session:
            self._session = aiohttp.ClientSession()

        query_params = {
            "model": self.model,
            "language": self.language,
            "encoding": "pcm_s16le",
            "sample_rate": str(self.target_sample_rate),
            "api_key": self.api_key,
        }

        headers = {
            "Cartesia-Version": "2024-11-13",
            "User-Agent": "VideoSDK-Cartesia-STT",
        }

        ws_url = f"{self.base_url}?{urlencode(query_params)}"

        try:
            self._ws = await self._session.ws_connect(ws_url, headers=headers)

        except Exception as e:
            logger.error(f"Error connecting to WebSocket: {str(e)}")
            if self._ws:
                await self._ws.close()
                self._ws = None
            raise

    def _handle_ws_message(self, msg: dict) -> list[STTResponse]:
        """Handle incoming WebSocket messages and generate STT responses"""
        responses = []
        try:
            msg_type = msg.get("type")

            if msg_type == "transcript":
                transcript = msg.get("text", "")
                is_final = msg.get("is_final", False)
                language = msg.get("language", self.language)
                duration = msg.get("duration", 0.0)

                if transcript:
                    current_time = time.time()

                    if is_final:
                        responses.append(STTResponse(
                            event_type=SpeechEventType.FINAL,
                            data=SpeechData(
                                text=transcript,
                                confidence=1.0,
                                language=language,
                                start_time=0.0,
                                end_time=duration,
                            ),
                            metadata={
                                "model": self.model,
                                "request_id": msg.get("request_id"),
                                "duration": duration,
                            }
                        ))
                    else:
                        if current_time - self._last_interim_at > 0.1:
                            responses.append(STTResponse(
                                event_type=SpeechEventType.INTERIM,
                                data=SpeechData(
                                    text=transcript,
                                    confidence=1.0,
                                    language=language,
                                    start_time=0.0,
                                    end_time=duration,
                                ),
                                metadata={
                                    "model": self.model,
                                    "request_id": msg.get("request_id"),
                                    "duration": duration,
                                }
                            ))
                            self._last_interim_at = current_time

            elif msg_type == "flush_done":
                logger.info("Cartesia STT: Flush completed")

            elif msg_type == "done":
                logger.info("Cartesia STT: Session ended")

            elif msg_type == "error":
                error_msg = msg.get("message", "Unknown error")
                error_code = msg.get("code", "unknown")
                self.emit("error", f"{error_code}: {error_msg}")

        except Exception as e:
            logger.error(f"Error handling WebSocket message: {str(e)}")

        return responses

    def _resample_audio(self, audio: np.ndarray, orig_sr: int, target_sr : int) -> np.ndarray :
        """
        Use polyphase filtering for resampling, which is more accurate for integer-ratio conversions.
        Assumes input is np.int16.
        """
        if orig_sr == target_sr:
            return audio
        
        gcd = math.gcd(orig_sr, target_sr)
        up = target_sr // gcd
        down = orig_sr // gcd
        
        return resample_poly(audio, up, down)

    async def aclose(self) -> None:
        """Cleanup resources"""
        if self._ws and not self._ws.closed:
            try:
                await self._ws.send_str("done")
                await asyncio.sleep(0.1)
            except Exception as e:
                logger.error(f"Error sending done command: {str(e)}")

        if self._ws_task:
            self._ws_task.cancel()
            try:
                await self._ws_task
            except asyncio.CancelledError:
                pass
            self._ws_task = None

        if self._ws:
            await self._ws.close()
            self._ws = None

        if self._session:
            await self._session.close()
            self._session = None

        await super().aclose()

Base class for Speech-to-Text implementations

Initialize the Cartesia STT plugin

Args

api_key : str | None, optional
Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the STT plugin. Defaults to "ink-whisper".
language : str
The language to use for the STT plugin, e.g. "en". Defaults to "en".
sample_rate : int
The sample rate to use for the STT plugin. Defaults to 48000.
base_url : str
The base URL to use for the STT plugin. Defaults to "wss://api.cartesia.ai/stt/websocket".

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    if self._ws and not self._ws.closed:
        try:
            await self._ws.send_str("done")
            await asyncio.sleep(0.1)
        except Exception as e:
            logger.error(f"Error sending done command: {str(e)}")

    if self._ws_task:
        self._ws_task.cancel()
        try:
            await self._ws_task
        except asyncio.CancelledError:
            pass
        self._ws_task = None

    if self._ws:
        await self._ws.close()
        self._ws = None

    if self._session:
        await self._session.close()
        self._session = None

    await super().aclose()

Cleanup resources

async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None
Expand source code
async def process_audio(
    self,
    audio_frames: bytes,
    language: Optional[str] = None,
    **kwargs: Any
) -> None:
    """Process audio frames and send to Cartesia's STT API"""

    if not self._ws:
        await self._connect_ws()

        self._ws_task = asyncio.create_task(self._listen_for_responses())

    try:

        audio_data = np.frombuffer(audio_frames, dtype=np.int16)

        if audio_data.ndim == 1 and len(audio_data) % 2 == 0 and self.input_sample_rate == self.sample_rate and self.input_sample_rate != self.target_sample_rate:
            audio_data = audio_data.reshape(-1, 2).mean(axis=1).astype(np.int16)

        if self.input_sample_rate != self.target_sample_rate:
            audio_data = self._resample_audio(audio_data, self.input_sample_rate, self.target_sample_rate)
            audio_data = np.clip(audio_data, -32768, 32767)
            audio_data = audio_data.astype(np.int16)

        audio_bytes = audio_data.astype(np.int16).tobytes()
        await self._ws.send_bytes(audio_bytes)

    except Exception as e:
        self.emit("error", str(e))
        if self._ws:
            await self._ws.close()
            self._ws = None
            if self._ws_task:
                self._ws_task.cancel()
                self._ws_task = None

Process audio frames and send to Cartesia's STT API

class CartesiaTTS (*,
api_key: str | None = None,
model: str = 'sonic-2',
voice_id: Union[str, List[float]] = 'f786b574-daa5-4673-aa0c-cbe3e8534c02',
language: str = 'en',
base_url: str = 'https://api.cartesia.ai')
Expand source code
class CartesiaTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = DEFAULT_MODEL,
        voice_id: Union[str, List[float]] = DEFAULT_VOICE_ID,
        language: str = "en",
        base_url: str = "https://api.cartesia.ai",
    ) -> None:
        """Initialize the Cartesia TTS plugin
        Args:
            api_key (str | None, optional): Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the TTS plugin. Defaults to "sonic-2".
            voice_id (Union[str, List[float]]): The voice ID to use for the TTS plugin. Defaults to "794f9389-aac1-45b6-b726-9d9369183238".
            api_key (str | None, optional): Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
            language (str): The language to use for the TTS plugin. Defaults to "en".
            base_url (str): The base URL to use for the TTS plugin. Defaults to "https://api.cartesia.ai".
        """
        super().__init__(sample_rate=CARTESIA_SAMPLE_RATE, num_channels=CARTESIA_CHANNELS)

        self.model = model
        self.language = language
        self.base_url = base_url
        self._voice = voice_id
        self._first_chunk_sent = False
        self._interrupted = False

        api_key = api_key or os.getenv("CARTESIA_API_KEY")
        if not api_key:
            raise ValueError(
                "Cartesia API key must be provided either through api_key parameter or CARTESIA_API_KEY environment variable")
        self._api_key = api_key

        self._ws_session: aiohttp.ClientSession | None = None
        self._ws_connection: aiohttp.ClientWebSocketResponse | None = None
        self._connection_lock = asyncio.Lock()
        self._receive_task: asyncio.Task | None = None
        self._context_futures: dict[str, asyncio.Future[None]] = {}

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for the next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self, text: AsyncIterator[str] | str, voice_id: Optional[Union[str, List[float]]] = None, **kwargs: Any,
    ) -> None:
        """Synthesize text to speech using Cartesia's streaming WebSocket API."""
        context_id = ""
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            if voice_id:
                self._voice = voice_id

            self._interrupted = False

            await self._ensure_ws_connection()
            if not self._ws_connection:
                raise RuntimeError("WebSocket connection is not available.")

            context_id = os.urandom(8).hex()
            done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future()
            self._context_futures[context_id] = done_future

            async def _string_iterator(s: str) -> AsyncIterator[str]:
                yield s

            text_iterator = _string_iterator(text) if isinstance(text, str) else text
            send_task = asyncio.create_task(self._send_task(text_iterator, context_id))

            await done_future
            await send_task

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")
        finally:
            if context_id and context_id in self._context_futures:
                del self._context_futures[context_id]

    async def _send_task(self, text_iterator: AsyncIterator[str], context_id: str):
        """The dedicated task for sending text chunks over the WebSocket."""
        has_sent_transcript = False
        try:
            voice_payload: dict[str, Any] = {}
            if isinstance(self._voice, str):
                voice_payload["mode"] = "id"
                voice_payload["id"] = self._voice
            else:
                voice_payload["mode"] = "embedding"
                voice_payload["embedding"] = self._voice

            base_payload = {
                "model_id": self.model, "language": self.language,
                "voice": voice_payload,
                "output_format": {"container": "raw", "encoding": "pcm_s16le", "sample_rate": self.sample_rate},
                "add_timestamps": True, "context_id": context_id,
            }

            async for text_chunk in text_iterator:
                if self._interrupted: break
                if text_chunk and text_chunk.strip():
                    if not has_sent_transcript:
                        pass

                    payload = {**base_payload, "transcript": text_chunk + " ", "continue": True}
                    if self._ws_connection and not self._ws_connection.closed:
                        await self._ws_connection.send_str(json.dumps(payload))
                    has_sent_transcript = True

        except Exception as e:
            future = self._context_futures.get(context_id)
            if future and not future.done():
                future.set_exception(e)
        finally:
            if has_sent_transcript and not self._interrupted:
                final_payload = {**base_payload, "transcript": " ", "continue": False}
                if self._ws_connection and not self._ws_connection.closed:
                    await self._ws_connection.send_str(json.dumps(final_payload))

    async def _receive_loop(self):
        """A single, long-running task that handles all incoming messages from the WebSocket."""
        try:
            while self._ws_connection and not self._ws_connection.closed:
                msg = await self._ws_connection.receive()
                if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): break
                if msg.type != aiohttp.WSMsgType.TEXT: continue

                data = json.loads(msg.data)
                context_id = data.get("context_id")
                future = self._context_futures.get(context_id)

                if not future or future.done(): continue

                if data.get("type") == "error":
                    future.set_exception(RuntimeError(f"Cartesia API error: {json.dumps(data)}"))
                elif "data" in data and data["data"]:
                    await self._stream_audio(base64.b64decode(data["data"]))
                elif data.get("done"):
                    future.set_result(None)
        except Exception as e:
            for future in self._context_futures.values():
                if not future.done(): future.set_exception(e)
        finally:
            self._context_futures.clear()

    async def _ensure_ws_connection(self) -> None:
        """Establishes or re-establishes the WebSocket connection if needed."""
        async with self._connection_lock:
            if self._ws_connection and not self._ws_connection.closed: return

            if self._receive_task and not self._receive_task.done(): self._receive_task.cancel()
            if self._ws_connection: await self._ws_connection.close()
            if self._ws_session: await self._ws_session.close()

            try:
                self._ws_session = aiohttp.ClientSession()
                ws_url = self.base_url.replace('http', 'ws', 1)
                full_ws_url = f"{ws_url}/tts/websocket?api_key={self._api_key}&cartesia_version={API_VERSION}"

                self._ws_connection = await asyncio.wait_for(
                    self._ws_session.ws_connect(full_ws_url, heartbeat=30.0), timeout=5.0
                )
                self._receive_task = asyncio.create_task(self._receive_loop())
            except Exception as e:
                self.emit("error", f"Failed to establish WebSocket connection: {e}")
                raise

    async def _stream_audio(self, audio_chunk: bytes):
        """Streams a chunk of audio to the audio track."""
        if self._interrupted or not audio_chunk: return

        if not self._first_chunk_sent and self._first_audio_callback:
            self._first_chunk_sent = True
            await self._first_audio_callback()

        if self.audio_track:
            await self.audio_track.add_new_bytes(audio_chunk)

    async def interrupt(self) -> None:
        """Interrupts any ongoing TTS, stopping audio playback and network activity."""
        self._interrupted = True
        if self.audio_track: self.audio_track.interrupt()
        if self._ws_connection and not self._ws_connection.closed:
            await self._ws_connection.close()

    async def aclose(self) -> None:
        """Gracefully cleans up all resources."""
        await super().aclose()
        self._interrupted = True
        if self._receive_task and not self._receive_task.done(): self._receive_task.cancel()
        if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close()
        if self._ws_session and not self._ws_session.closed: await self._ws_session.close()

Base class for Text-to-Speech implementations

Initialize the Cartesia TTS plugin

Args

api_key : str | None, optional
Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the TTS plugin. Defaults to "sonic-2".
voice_id : Union[str, List[float]]
The voice ID to use for the TTS plugin. Defaults to "794f9389-aac1-45b6-b726-9d9369183238".
api_key : str | None, optional
Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
language : str
The language to use for the TTS plugin. Defaults to "en".
base_url : str
The base URL to use for the TTS plugin. Defaults to "https://api.cartesia.ai".

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Gracefully cleans up all resources."""
    await super().aclose()
    self._interrupted = True
    if self._receive_task and not self._receive_task.done(): self._receive_task.cancel()
    if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close()
    if self._ws_session and not self._ws_session.closed: await self._ws_session.close()

Gracefully cleans up all resources.

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupts any ongoing TTS, stopping audio playback and network activity."""
    self._interrupted = True
    if self.audio_track: self.audio_track.interrupt()
    if self._ws_connection and not self._ws_connection.closed:
        await self._ws_connection.close()

Interrupts any ongoing TTS, stopping audio playback and network activity.

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for the next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for the next TTS task

async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[Union[str, List[float]]] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self, text: AsyncIterator[str] | str, voice_id: Optional[Union[str, List[float]]] = None, **kwargs: Any,
) -> None:
    """Synthesize text to speech using Cartesia's streaming WebSocket API."""
    context_id = ""
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        if voice_id:
            self._voice = voice_id

        self._interrupted = False

        await self._ensure_ws_connection()
        if not self._ws_connection:
            raise RuntimeError("WebSocket connection is not available.")

        context_id = os.urandom(8).hex()
        done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future()
        self._context_futures[context_id] = done_future

        async def _string_iterator(s: str) -> AsyncIterator[str]:
            yield s

        text_iterator = _string_iterator(text) if isinstance(text, str) else text
        send_task = asyncio.create_task(self._send_task(text_iterator, context_id))

        await done_future
        await send_task

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")
    finally:
        if context_id and context_id in self._context_futures:
            del self._context_futures[context_id]

Synthesize text to speech using Cartesia's streaming WebSocket API.