Package videosdk.plugins.neuphonic

Sub-modules

videosdk.plugins.neuphonic.tts

Classes

class NeuphonicTTS (*,
api_key: str | None = None,
lang_code: str = 'en',
voice_id: Optional[str] = None,
speed: float = 0.8,
sampling_rate: int = 22050,
encoding: "Literal['pcm_linear', 'pcm_mulaw']" = 'pcm_linear',
base_url: str = 'wss://eu-west-1.api.neuphonic.com')
Expand source code
class NeuphonicTTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        lang_code: str = "en",
        voice_id: Optional[str] = None,
        speed: float = 0.8,
        sampling_rate: int = NEUPHONIC_DEFAULT_SAMPLE_RATE,
        encoding: Literal["pcm_linear", "pcm_mulaw"] = "pcm_linear",
        base_url: str = NEUPHONIC_BASE_URL,
    ) -> None:
        """Initialize the Neuphonic TTS plugin.

        Args:
            api_key (Optional[str], optional): Neuphonic API key. Defaults to None.
            lang_code (str): The language code to use for the TTS plugin. Defaults to "en".
            voice_id (Optional[str], optional): The voice ID to use for the TTS plugin. Defaults to None.
            speed (float): The speed to use for the TTS plugin. Must be between 0.7 and 2.0. Defaults to 0.8.
            sampling_rate (int): The sampling rate to use for the TTS plugin. Must be one of: 8000, 16000, 22050. Defaults to 22050.
            encoding (Literal["pcm_linear", "pcm_mulaw"]): The encoding to use for the TTS plugin. Defaults to "pcm_linear".
            base_url (str): The base URL to use for the TTS plugin. Defaults to "wss://eu-west-1.api.neuphonic.com".
        """
        super().__init__(sample_rate=sampling_rate, num_channels=NEUPHONIC_CHANNELS)

        self.lang_code = lang_code
        self.voice_id = voice_id
        self.speed = speed
        self.encoding = encoding
        self.base_url = base_url
        self.audio_track = None
        self.loop = None
        self._first_chunk_sent = False
        self._interrupted = False
        self._current_tasks: list[asyncio.Task] = []

        self.api_key = api_key or os.getenv("NEUPHONIC_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Neuphonic API key must be provided either through api_key parameter "
                "or NEUPHONIC_API_KEY environment variable"
            )

        if not 0.7 <= self.speed <= 2.0:
            raise ValueError(
                f"Speed must be between 0.7 and 2.0, got {self.speed}")

        if sampling_rate not in [8000, 16000, 22050]:
            raise ValueError(
                f"Sampling rate must be one of 8000, 16000, 22050, got {sampling_rate}")

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False
        self._interrupted = False

    async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        **kwargs: Any,
    ) -> None:
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            self._interrupted = False
            self._current_tasks.clear()

            if isinstance(text, AsyncIterator):
                await self._streaming_websocket_synthesis(text)
            else:
                await self._websocket_synthesis(text)

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")

    async def _streaming_websocket_synthesis(self, text: AsyncIterator[str]) -> None:
        """Streaming synthesis with single WebSocket connection for multiple text segments"""
        params = {
            "api_key": self.api_key,
            "speed": self.speed,
            "sampling_rate": self._sample_rate,
            "encoding": self.encoding
        }

        if self.voice_id:
            params["voice_id"] = self.voice_id

        query_string = urlencode(params)
        ws_url = f"{self.base_url}/speak/{self.lang_code}?{query_string}"

        try:
            async with aiohttp.ClientSession() as session:
                async with session.ws_connect(ws_url) as ws:
                    listener_task = asyncio.create_task(
                        self._listen_to_ws_messages(ws))
                    self._current_tasks.append(listener_task)

                    async for segment in segment_text(text):
                        if self._interrupted:
                            break
                        if segment.strip():
                            await ws.send_str(f"{segment} <STOP>")
                            await asyncio.sleep(0.01)

                    if not self._interrupted:
                        await listener_task

        except aiohttp.ClientError as e:
            self.emit("error", f"WebSocket connection failed: {str(e)}")
        except Exception as e:
            self.emit("error", f"Streaming synthesis failed: {str(e)}")

    async def _listen_to_ws_messages(self, ws) -> None:
        """Listen to WebSocket messages concurrently while sending text"""
        try:
            async for msg in ws:
                if self._interrupted:
                    break
                if msg.type == aiohttp.WSMsgType.TEXT:
                    try:
                        data = json.loads(msg.data)
                        if "data" in data and "audio" in data["data"]:
                            audio_data = base64.b64decode(
                                data["data"]["audio"])

                            if self.encoding == "pcm_linear":
                                await self._stream_audio_chunks(audio_data)
                            elif self.encoding == "pcm_mulaw":
                                await self._stream_audio_chunks(audio_data)

                    except json.JSONDecodeError:
                        self.emit(
                            "error", f"Invalid JSON response: {msg.data}")

                elif msg.type == aiohttp.WSMsgType.ERROR:
                    self.emit(
                        "error", f"WebSocket connection error: {ws.exception()}")
                    break
                elif msg.type == aiohttp.WSMsgType.CLOSED:
                    break

        except Exception as e:
            self.emit("error", f"WebSocket message listening failed: {str(e)}")

    async def _websocket_synthesis(self, text: str) -> None:
        """WebSocket-based streaming synthesis"""
        params = {
            "api_key": self.api_key,
            "speed": self.speed,
            "sampling_rate": self._sample_rate,
            "encoding": self.encoding,
        }

        if self.voice_id:
            params["voice_id"] = self.voice_id

        query_string = urlencode(params)
        ws_url = f"{self.base_url}/speak/{self.lang_code}?{query_string}"

        try:
            async with aiohttp.ClientSession() as session:
                async with session.ws_connect(ws_url) as ws:
                    await ws.send_str(f"{text} <STOP>")

                    async for msg in ws:
                        if self._interrupted:
                            break
                        if msg.type == aiohttp.WSMsgType.TEXT:
                            try:
                                data = json.loads(msg.data)
                                if "data" in data and "audio" in data["data"]:
                                    audio_data = base64.b64decode(
                                        data["data"]["audio"])

                                    if self.encoding == "pcm_linear":
                                        await self._stream_audio_chunks(audio_data)
                                    elif self.encoding == "pcm_mulaw":
                                        await self._stream_audio_chunks(audio_data)

                            except json.JSONDecodeError:
                                self.emit(
                                    "error", f"Invalid JSON response: {msg.data}")

                        elif msg.type == aiohttp.WSMsgType.ERROR:
                            self.emit(
                                "error", f"WebSocket connection error: {ws.exception()}")
                            break
                        elif msg.type == aiohttp.WSMsgType.CLOSED:
                            break

        except aiohttp.ClientError as e:
            self.emit("error", f"WebSocket connection failed: {str(e)}")
        except Exception as e:
            self.emit("error", f"Streaming synthesis failed: {str(e)}")

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        """Stream audio data in chunks for smooth playback"""
        if self._interrupted:
            return

        chunk_duration_ms = 20
        bytes_per_sample = 2
        chunk_size = int(self._sample_rate * NEUPHONIC_CHANNELS *
                         bytes_per_sample * chunk_duration_ms / 1000)

        if chunk_size % 2 != 0:
            chunk_size += 1

        for i in range(0, len(audio_bytes), chunk_size):
            if self._interrupted:
                break
            chunk = audio_bytes[i:i + chunk_size]

            if len(chunk) < chunk_size and len(chunk) > 0:
                padding_needed = chunk_size - len(chunk)
                chunk += b'\x00' * padding_needed

            if len(chunk) == chunk_size:
                if not self._first_chunk_sent and self._first_audio_callback:
                    self._first_chunk_sent = True
                    await self._first_audio_callback()

                asyncio.create_task(self.audio_track.add_new_bytes(chunk))
                await asyncio.sleep(0.001)

    async def _sse_synthesis(self, text: str) -> None:
        """SSE-based synthesis (alternative to WebSocket)"""
        url = f"{NEUPHONIC_SSE_BASE_URL}/sse/speak/{self.lang_code}"

        headers = {
            "X-API-KEY": self.api_key,
            "Content-Type": "application/json",
            "Accept": "text/event-stream",
        }

        payload = {
            "text": text,
            "speed": self.speed,
            "sampling_rate": self._sample_rate,
            "encoding": self.encoding,
        }

        if self.voice_id:
            payload["voice_id"] = self.voice_id

        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(url, headers=headers, json=payload) as response:
                    response.raise_for_status()

                    async for line in response.content:
                        line_str = line.decode('utf-8').strip()

                        if line_str.startswith("data: "):
                            try:
                                json_data = json.loads(line_str[6:])
                                if "data" in json_data and "audio" in json_data["data"]:
                                    audio_data = base64.b64decode(
                                        json_data["data"]["audio"])
                                    await self._stream_audio_chunks(audio_data)
                            except json.JSONDecodeError:
                                continue

        except aiohttp.ClientResponseError as e:
            if e.status == 403:
                self.emit(
                    "error", "Neuphonic authentication failed. Please check your API key.")
            else:
                self.emit("error", f"Neuphonic HTTP error: {e.status}")
        except Exception as e:
            self.emit("error", f"SSE synthesis failed: {str(e)}")

    async def aclose(self) -> None:
        """Cleanup resources"""
        await super().aclose()

    async def interrupt(self) -> None:
        """Interrupt the TTS process"""
        self._interrupted = True

        for task in self._current_tasks:
            if not task.done():
                task.cancel()

        if self.audio_track:
            self.audio_track.interrupt()

Base class for Text-to-Speech implementations

Initialize the Neuphonic TTS plugin.

Args

api_key : Optional[str], optional
Neuphonic API key. Defaults to None.
lang_code : str
The language code to use for the TTS plugin. Defaults to "en".
voice_id : Optional[str], optional
The voice ID to use for the TTS plugin. Defaults to None.
speed : float
The speed to use for the TTS plugin. Must be between 0.7 and 2.0. Defaults to 0.8.
sampling_rate : int
The sampling rate to use for the TTS plugin. Must be one of: 8000, 16000, 22050. Defaults to 22050.
encoding (Literal["pcm_linear", "pcm_mulaw"]): The encoding to use for the TTS plugin. Defaults to "pcm_linear".
base_url : str
The base URL to use for the TTS plugin. Defaults to "wss://eu-west-1.api.neuphonic.com".

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt the TTS process"""
    self._interrupted = True

    for task in self._current_tasks:
        if not task.done():
            task.cancel()

    if self.audio_track:
        self.audio_track.interrupt()

Interrupt the TTS process

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False
    self._interrupted = False

Reset the first audio tracking state for next TTS task

async def synthesize(self, text: AsyncIterator[str] | str, **kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[str] | str,
    **kwargs: Any,
) -> None:
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        self._interrupted = False
        self._current_tasks.clear()

        if isinstance(text, AsyncIterator):
            await self._streaming_websocket_synthesis(text)
        else:
            await self._websocket_synthesis(text)

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None