Package videosdk.plugins.cartesia

Sub-modules

videosdk.plugins.cartesia.stt
videosdk.plugins.cartesia.tts

Classes

class CartesiaSTT (*,
api_key: str | None = None,
model: str = 'ink-whisper',
language: str = 'en',
sample_rate: int = 48000,
base_url: str = 'wss://api.cartesia.ai/stt/websocket')
Expand source code
class CartesiaSTT(BaseSTT):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "ink-whisper",
        language: str = "en",
        sample_rate: int = 48000,
        base_url: str = "wss://api.cartesia.ai/stt/websocket",
    ) -> None:
        """Initialize the Cartesia STT plugin

        Args:
            api_key (str | None, optional): Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the STT plugin. Defaults to "ink-whisper".
            language (str): The language to use for the STT plugin, e.g. "en". Defaults to "en".
            sample_rate (int): The sample rate to use for the STT plugin. Defaults to 48000.
            base_url (str): The base URL to use for the STT plugin. Defaults to "wss://api.cartesia.ai/stt/websocket".
        """
        super().__init__()

        self.api_key = api_key or os.getenv("CARTESIA_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Cartesia API key must be provided either through api_key parameter or CARTESIA_API_KEY environment variable")

        self.model = model
        self.language = language
        self.sample_rate = sample_rate
        self.base_url = base_url
        self._session: Optional[aiohttp.ClientSession] = None
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self._ws_task: Optional[asyncio.Task] = None
        self._last_interim_at = 0.0
        self.input_sample_rate = sample_rate
        self.target_sample_rate = 16000

    async def process_audio(
        self,
        audio_frames: bytes,
        language: Optional[str] = None,
        **kwargs: Any
    ) -> None:
        """Process audio frames and send to Cartesia's STT API"""

        if not self._ws:
            await self._connect_ws()

            self._ws_task = asyncio.create_task(self._listen_for_responses())

        try:

            audio_data = np.frombuffer(audio_frames, dtype=np.int16)
            if self.input_sample_rate != self.target_sample_rate:
                audio_data = signal.resample(
                    audio_data,
                    int(len(audio_data) * self.target_sample_rate /
                        self.input_sample_rate)
                )
            audio_bytes = audio_data.astype(np.int16).tobytes()
            await self._ws.send_bytes(audio_bytes)

        except Exception as e:
            self.emit("error", str(e))
            if self._ws:
                await self._ws.close()
                self._ws = None
                if self._ws_task:
                    self._ws_task.cancel()
                    self._ws_task = None

    async def _listen_for_responses(self) -> None:
        """Background task to listen for WebSocket responses"""
        if not self._ws:
            return
        try:
            async for msg in self._ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = msg.json()
                    responses = self._handle_ws_message(data)
                    for response in responses:
                        if self._transcript_callback:
                            await self._transcript_callback(response)
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    error = f"WebSocket error: {self._ws.exception()}"
                    self.emit("error", error)
                    break
                elif msg.type == aiohttp.WSMsgType.CLOSED:
                    logger.info("WebSocket connection closed")
                    break
        except Exception as e:
            self.emit("error", f"Error listening for responses: {str(e)}")
        finally:
            if self._ws:
                await self._ws.close()
                self._ws = None

    async def _connect_ws(self) -> None:
        """Establish WebSocket connection with Cartesia's STT API"""

        if not self._session:
            self._session = aiohttp.ClientSession()

        query_params = {
            "model": self.model,
            "language": self.language,
            "encoding": "pcm_s16le",
            "sample_rate": str(self.target_sample_rate),
            "api_key": self.api_key,
        }

        headers = {
            "Cartesia-Version": "2024-11-13",
            "User-Agent": "VideoSDK-Cartesia-STT",
        }

        ws_url = f"{self.base_url}?{urlencode(query_params)}"

        try:
            self._ws = await self._session.ws_connect(ws_url, headers=headers)

        except Exception as e:
            logger.error(f"Error connecting to WebSocket: {str(e)}")
            if self._ws:
                await self._ws.close()
                self._ws = None
            raise

    def _handle_ws_message(self, msg: dict) -> list[STTResponse]:
        """Handle incoming WebSocket messages and generate STT responses"""
        responses = []
        try:
            msg_type = msg.get("type")

            if msg_type == "transcript":
                transcript = msg.get("text", "")
                is_final = msg.get("is_final", False)
                language = msg.get("language", self.language)
                duration = msg.get("duration", 0.0)

                if transcript:
                    current_time = time.time()

                    if is_final:
                        responses.append(STTResponse(
                            event_type=SpeechEventType.FINAL,
                            data=SpeechData(
                                text=transcript,
                                confidence=1.0,
                                language=language,
                                start_time=0.0,
                                end_time=duration,
                            ),
                            metadata={
                                "model": self.model,
                                "request_id": msg.get("request_id"),
                                "duration": duration,
                            }
                        ))
                    else:
                        if current_time - self._last_interim_at > 0.1:
                            responses.append(STTResponse(
                                event_type=SpeechEventType.INTERIM,
                                data=SpeechData(
                                    text=transcript,
                                    confidence=1.0,
                                    language=language,
                                    start_time=0.0,
                                    end_time=duration,
                                ),
                                metadata={
                                    "model": self.model,
                                    "request_id": msg.get("request_id"),
                                    "duration": duration,
                                }
                            ))
                            self._last_interim_at = current_time

            elif msg_type == "flush_done":
                logger.info("Cartesia STT: Flush completed")

            elif msg_type == "done":
                logger.info("Cartesia STT: Session ended")

            elif msg_type == "error":
                error_msg = msg.get("message", "Unknown error")
                error_code = msg.get("code", "unknown")
                self.emit("error", f"{error_code}: {error_msg}")

        except Exception as e:
            logger.error(f"Error handling WebSocket message: {str(e)}")

        return responses

    async def aclose(self) -> None:
        """Cleanup resources"""
        if self._ws and not self._ws.closed:
            try:
                await self._ws.send_str("done")
                await asyncio.sleep(0.1)
            except Exception as e:
                logger.error(f"Error sending done command: {str(e)}")

        if self._ws_task:
            self._ws_task.cancel()
            try:
                await self._ws_task
            except asyncio.CancelledError:
                pass
            self._ws_task = None

        if self._ws:
            await self._ws.close()
            self._ws = None

        if self._session:
            await self._session.close()
            self._session = None

Base class for Speech-to-Text implementations

Initialize the Cartesia STT plugin

Args

api_key : str | None, optional
Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the STT plugin. Defaults to "ink-whisper".
language : str
The language to use for the STT plugin, e.g. "en". Defaults to "en".
sample_rate : int
The sample rate to use for the STT plugin. Defaults to 48000.
base_url : str
The base URL to use for the STT plugin. Defaults to "wss://api.cartesia.ai/stt/websocket".

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    if self._ws and not self._ws.closed:
        try:
            await self._ws.send_str("done")
            await asyncio.sleep(0.1)
        except Exception as e:
            logger.error(f"Error sending done command: {str(e)}")

    if self._ws_task:
        self._ws_task.cancel()
        try:
            await self._ws_task
        except asyncio.CancelledError:
            pass
        self._ws_task = None

    if self._ws:
        await self._ws.close()
        self._ws = None

    if self._session:
        await self._session.close()
        self._session = None

Cleanup resources

async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None
Expand source code
async def process_audio(
    self,
    audio_frames: bytes,
    language: Optional[str] = None,
    **kwargs: Any
) -> None:
    """Process audio frames and send to Cartesia's STT API"""

    if not self._ws:
        await self._connect_ws()

        self._ws_task = asyncio.create_task(self._listen_for_responses())

    try:

        audio_data = np.frombuffer(audio_frames, dtype=np.int16)
        if self.input_sample_rate != self.target_sample_rate:
            audio_data = signal.resample(
                audio_data,
                int(len(audio_data) * self.target_sample_rate /
                    self.input_sample_rate)
            )
        audio_bytes = audio_data.astype(np.int16).tobytes()
        await self._ws.send_bytes(audio_bytes)

    except Exception as e:
        self.emit("error", str(e))
        if self._ws:
            await self._ws.close()
            self._ws = None
            if self._ws_task:
                self._ws_task.cancel()
                self._ws_task = None

Process audio frames and send to Cartesia's STT API

class CartesiaTTS (*,
api_key: str | None = None,
model: str = 'sonic-2',
voice_id: Union[str, List[float]] = '794f9389-aac1-45b6-b726-9d9369183238',
language: str = 'en',
base_url: str = 'https://api.cartesia.ai')
Expand source code
class CartesiaTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = DEFAULT_MODEL,
        voice_id: Union[str, List[float]] = DEFAULT_VOICE_ID,
        language: str = "en",
        base_url: str = "https://api.cartesia.ai",
    ) -> None:
        """Initialize the Cartesia TTS plugin

        Args:
            api_key (str | None, optional): Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
            model (str): The model to use for the TTS plugin. Defaults to "sonic-2".
            voice_id (Union[str, List[float]]): The voice ID to use for the TTS plugin. Defaults to "794f9389-aac1-45b6-b726-9d9369183238".
            api_key (str | None, optional): Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
            language (str): The language to use for the TTS plugin. Defaults to "en".
            base_url (str): The base URL to use for the TTS plugin. Defaults to "https://api.cartesia.ai".
        """
        super().__init__(sample_rate=CARTESIA_SAMPLE_RATE, num_channels=CARTESIA_CHANNELS)

        self.model = model
        self.language = language
        self.base_url = base_url
        self._voice = voice_id
        self._first_chunk_sent = False
        self._audio_buffer = bytearray()
        self._interrupted = False
        self._current_tasks: list[asyncio.Task] = []

        api_key = api_key or os.getenv("CARTESIA_API_KEY")
        if not api_key:
            raise ValueError("Cartesia API key must be provided")
        self._api_key = api_key

        self._ws_session: aiohttp.ClientSession | None = None
        self._ws_connection: aiohttp.ClientWebSocketResponse | None = None
        self._connection_lock = asyncio.Lock()

    def reset_first_audio_tracking(self) -> None:
        self._first_chunk_sent = False
        self._audio_buffer.clear()
        self._interrupted = False

    async def _ensure_ws_connection(self) -> aiohttp.ClientWebSocketResponse:
        async with self._connection_lock:
            if self._ws_connection and not self._ws_connection.closed:
                return self._ws_connection

            if self._ws_session is None or self._ws_session.closed:
                self._ws_session = aiohttp.ClientSession()

            ws_url = self.base_url.replace('http', 'ws', 1)
            full_ws_url = f"{ws_url}/tts/websocket?api_key={self._api_key}&cartesia_version={API_VERSION}"

            try:
                self._ws_connection = await asyncio.wait_for(
                    self._ws_session.ws_connect(full_ws_url, heartbeat=30.0), timeout=5.0
                )
                return self._ws_connection
            except Exception as e:
                self.emit(
                    "error", f"Failed to establish WebSocket connection: {e}")
                raise

    async def _send_task(self, ws: aiohttp.ClientWebSocketResponse, text_iterator: AsyncIterator[str]):
        context_id = os.urandom(8).hex()

        voice_payload: dict[str, Any] = {}
        if isinstance(self._voice, str):
            voice_payload["mode"] = "id"
            voice_payload["id"] = self._voice
        else:
            voice_payload["mode"] = "embedding"
            voice_payload["embedding"] = self._voice

        base_payload = {
            "model_id": self.model, "language": self.language,
            "voice": voice_payload,
            "output_format": {"container": "raw", "encoding": "pcm_s16le", "sample_rate": self.sample_rate},
            "add_timestamps": True, "context_id": context_id,
        }

        delimiters = {'.', '!', '?', '\n'}
        buffer = ""

        async def send_sentence(sentence: str) -> None:
            if not sentence:
                return
            payload = {**base_payload,
                       "transcript": sentence + " ", "continue": True}
            await ws.send_str(json.dumps(payload))

        def first_delim_pos(buf: str) -> int:
            return min((p for p in (buf.find(d) for d in delimiters) if p != -1), default=-1)

        def over_thresholds(buf: str) -> bool:
            if len(buf) >= MIN_CHARS_FLUSH:
                return True
            if len(buf.split()) >= MIN_WORDS_FLUSH:
                return True
            return False

        aiter = text_iterator.__aiter__()
        while not self._interrupted:
            try:
                next_task = asyncio.create_task(aiter.__anext__())
                try:
                    text_chunk = await asyncio.wait_for(next_task, timeout=INACTIVITY_TIMEOUT_SEC)
                except asyncio.TimeoutError:
                    # Inactivity flush
                    next_task.cancel()
                    if buffer.strip() and not self._interrupted:
                        await send_sentence(buffer.strip())
                        buffer = ""
                    continue

                if not self._interrupted:
                    buffer += text_chunk

                # Greedy punctuation split first
                while True:
                    pos = first_delim_pos(buffer)
                    if pos == -1:
                        break
                    sentence = buffer[:pos + 1].strip()
                    buffer = buffer[pos + 1:]
                    if sentence:
                        await send_sentence(sentence)

                # Threshold-based flush
                if over_thresholds(buffer):
                    await send_sentence(buffer.strip())
                    buffer = ""

            except StopAsyncIteration:
                break

        if buffer.strip():
            await send_sentence(buffer.strip())
        final_payload = {**base_payload, "transcript": " ", "continue": False}
        await ws.send_str(json.dumps(final_payload))

    async def _receive_task(self, ws: aiohttp.ClientWebSocketResponse):
        while not self._interrupted:
            msg = await ws.receive()
            if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING):
                break
            if msg.type != aiohttp.WSMsgType.TEXT:
                continue
            data = json.loads(msg.data)
            if data.get("type") == "error":
                error_details = json.dumps(data, indent=2)
                self.emit("error", f"Cartesia error: {error_details}")
                break
            if "data" in data and data["data"]:
                audio_chunk = base64.b64decode(data["data"])
                await self._stream_audio(audio_chunk)
            if data.get("done"):
                break
        await self._flush_audio_buffer()

    async def synthesize(
        self, text: AsyncIterator[str] | str, voice_id: Optional[Union[str, List[float]]] = None, **kwargs: Any,
    ) -> None:
        if voice_id:
            self._voice = voice_id

        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        self._interrupted = False
        self._current_tasks.clear()

        if isinstance(text, str):
            async def _string_iterator():
                yield text
            text_iterator = _string_iterator()
        else:
            text_iterator = text

        try:
            ws = await self._ensure_ws_connection()
            send_task = asyncio.create_task(self._send_task(ws, text_iterator))
            receive_task = asyncio.create_task(self._receive_task(ws))
            self._current_tasks.extend([send_task, receive_task])
            await asyncio.gather(send_task, receive_task)
        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")
            if self._ws_connection and not self._ws_connection.closed:
                await self._ws_connection.close()
            self._ws_connection = None

    async def _stream_audio(self, audio_chunk: bytes):
        if self._interrupted:
            return

        if not self._first_chunk_sent and self._first_audio_callback:
            self._first_chunk_sent = True
            await self._first_audio_callback()

        self._audio_buffer.extend(audio_chunk)
        while len(self._audio_buffer) >= PLAYBACK_CHUNK_SIZE and not self._interrupted:
            playback_chunk = self._audio_buffer[:PLAYBACK_CHUNK_SIZE]
            self._audio_buffer = self._audio_buffer[PLAYBACK_CHUNK_SIZE:]
            if self.audio_track:
                await self.audio_track.add_new_bytes(playback_chunk)

    async def _flush_audio_buffer(self):
        if self._audio_buffer:
            chunk = self._audio_buffer
            self._audio_buffer = bytearray()
            if len(chunk) < PLAYBACK_CHUNK_SIZE:
                chunk.extend(b'\x00' * (PLAYBACK_CHUNK_SIZE - len(chunk)))
            if not self._first_chunk_sent and self._first_audio_callback:
                self._first_chunk_sent = True
                await self._first_audio_callback()
            if self.audio_track:
                await self.audio_track.add_new_bytes(chunk)

    async def aclose(self) -> None:
        await super().aclose()
        if self._ws_connection and not self._ws_connection.closed:
            await self._ws_connection.close()
        if self._ws_session and not self._ws_session.closed:
            await self._ws_session.close()

    async def interrupt(self) -> None:
        """Interrupt TTS synthesis"""
        self._interrupted = True

        for task in self._current_tasks:
            if not task.done():
                task.cancel()

        if self._ws_connection and not self._ws_connection.closed:
            await self._ws_connection.close()
            self._ws_connection = None

        if self.audio_track:
            self.audio_track.interrupt()

Base class for Text-to-Speech implementations

Initialize the Cartesia TTS plugin

Args

api_key : str | None, optional
Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
model : str
The model to use for the TTS plugin. Defaults to "sonic-2".
voice_id : Union[str, List[float]]
The voice ID to use for the TTS plugin. Defaults to "794f9389-aac1-45b6-b726-9d9369183238".
api_key : str | None, optional
Cartesia API key. Uses CARTESIA_API_KEY environment variable if not provided. Defaults to None.
language : str
The language to use for the TTS plugin. Defaults to "en".
base_url : str
The base URL to use for the TTS plugin. Defaults to "https://api.cartesia.ai".

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await super().aclose()
    if self._ws_connection and not self._ws_connection.closed:
        await self._ws_connection.close()
    if self._ws_session and not self._ws_session.closed:
        await self._ws_session.close()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt TTS synthesis"""
    self._interrupted = True

    for task in self._current_tasks:
        if not task.done():
            task.cancel()

    if self._ws_connection and not self._ws_connection.closed:
        await self._ws_connection.close()
        self._ws_connection = None

    if self.audio_track:
        self.audio_track.interrupt()

Interrupt TTS synthesis

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    self._first_chunk_sent = False
    self._audio_buffer.clear()
    self._interrupted = False

Reset the first audio tracking state for next TTS task

async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[Union[str, List[float]]] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self, text: AsyncIterator[str] | str, voice_id: Optional[Union[str, List[float]]] = None, **kwargs: Any,
) -> None:
    if voice_id:
        self._voice = voice_id

    if not self.audio_track or not self.loop:
        self.emit("error", "Audio track or event loop not set")
        return

    self._interrupted = False
    self._current_tasks.clear()

    if isinstance(text, str):
        async def _string_iterator():
            yield text
        text_iterator = _string_iterator()
    else:
        text_iterator = text

    try:
        ws = await self._ensure_ws_connection()
        send_task = asyncio.create_task(self._send_task(ws, text_iterator))
        receive_task = asyncio.create_task(self._receive_task(ws))
        self._current_tasks.extend([send_task, receive_task])
        await asyncio.gather(send_task, receive_task)
    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")
        if self._ws_connection and not self._ws_connection.closed:
            await self._ws_connection.close()
        self._ws_connection = None

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None