Package videosdk.plugins.humeai

Sub-modules

videosdk.plugins.humeai.tts

Classes

class HumeAITTS (*,
api_key: Optional[str] = None,
voice: Optional[str] = 'Serene Assistant',
speed: float = 1.0,
response_format: "Literal['pcm', 'mp3', 'wav']" = 'pcm',
instant_mode: bool = True)
Expand source code
class HumeAITTS(TTS):
    def __init__(
        self,
        *,
        api_key: Optional[str] = None,
        voice: Optional[str] = "Serene Assistant",
        speed: float = 1.0,
        response_format: Literal["pcm", "mp3", "wav"] = "pcm",
        instant_mode: bool = True,
    ) -> None:
        """Initialize the HumeAI TTS plugin.

        Args:
            api_key (Optional[str], optional): HumeAI API key. Defaults to None.
            voice (Optional[str], optional): The voice to use for the TTS plugin. Defaults to "Serene Assistant".
            speed (float): The speed to use for the TTS plugin. Defaults to 1.0.
            response_format (Literal["pcm", "mp3", "wav"]): The response format to use for the TTS plugin. Defaults to "pcm".
            instant_mode (bool): Whether to use instant mode for the TTS plugin. Defaults to True.
        """
        super().__init__(sample_rate=24000, num_channels=1)

        self.voice = voice
        self.speed = speed
        self.response_format = response_format
        self.instant_mode = instant_mode
        self.audio_track = None
        self.loop = None
        self._first_chunk_sent = False

        if self.instant_mode and not self.voice:
            raise ValueError("Voice required for instant mode")

        self.api_key = api_key or os.getenv("HUMEAI_API_KEY")
        if not self.api_key:
            raise ValueError("HUMEAI_API_KEY required")

        self._session = httpx.AsyncClient(timeout=30.0)

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track not set")
                return

            if isinstance(text, AsyncIterator):
                async for segment in segment_text(text):
                    await self._synthesize_audio(segment, voice_id, **kwargs)
            else:
                await self._synthesize_audio(text, voice_id, **kwargs)

        except Exception as e:
            self.emit("error", f"Synthesis failed: {str(e)}")

    async def _synthesize_audio(self, text: str, voice_id: Optional[str] = None, **kwargs: Any) -> None:
        """Synthesize a single text segment"""
        if not text.strip():
            return

        utterance = {
            "text": text,
            "speed": kwargs.get("speed", self.speed)
        }
        if self.instant_mode:
            utterance["voice"] = {
                "name": voice_id or self.voice, "provider": "HUME_AI"}

        payload = {
            "utterances": [utterance],
            "format": {"type": self.response_format},
            "instant_mode": self.instant_mode,
            "strip_headers": False,
        }

        await self._stream_synthesis(payload)

    async def _stream_synthesis(self, payload: dict) -> None:
        """Stream audio from Hume AI API"""
        url = f"{API_BASE_URL}/tts/stream/json"
        headers = {
            "X-Hume-Api-Key": self.api_key,
            "Content-Type": "application/json"
        }

        try:
            async with self._session.stream(
                "POST", url, headers=headers, json=payload
            ) as response:
                response.raise_for_status()

                buffer = b""
                async for chunk in response.aiter_bytes():
                    lines = (buffer + chunk).split(b"\n")
                    buffer = lines.pop()

                    for line in lines:
                        if line.strip():
                            try:
                                data = json.loads(line)
                                if "audio" in data and data["audio"]:
                                    audio_bytes = base64.b64decode(
                                        data["audio"])
                                    if self.response_format == "wav":
                                        audio_bytes = self._remove_wav_header(
                                            audio_bytes)
                                    await self._stream_audio_chunks(audio_bytes)
                            except json.JSONDecodeError:
                                continue

                if buffer.strip():
                    try:
                        data = json.loads(buffer)
                        if "audio" in data and data["audio"]:
                            audio_bytes = base64.b64decode(data["audio"])
                            if self.response_format == "wav":
                                audio_bytes = self._remove_wav_header(
                                    audio_bytes)
                            await self._stream_audio_chunks(audio_bytes)
                    except json.JSONDecodeError:
                        pass

        except Exception as e:
            self.emit("error", f"Streaming failed: {str(e)}")

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        """Stream audio with 48kHz->24kHz resampling"""
        if not audio_bytes:
            return

        try:
            audio_array = np.frombuffer(audio_bytes, dtype=np.int16)
            if len(audio_array) == 0:
                return

            resampled_audio = audio_array[::2]
            audio_bytes = resampled_audio.tobytes()

            chunk_size = 960
            for i in range(0, len(audio_bytes), chunk_size):
                chunk = audio_bytes[i: i + chunk_size]
                if len(chunk) < chunk_size and len(chunk) > 0:
                    chunk += b"\x00" * (chunk_size - len(chunk))
                if chunk:
                    if not self._first_chunk_sent and self._first_audio_callback:
                        self._first_chunk_sent = True
                        await self._first_audio_callback()

                    asyncio.create_task(self.audio_track.add_new_bytes(chunk))
                    await asyncio.sleep(0.001)
        except Exception as e:
            self.emit("error", f"Audio streaming failed: {str(e)}")

    def _remove_wav_header(self, audio_bytes: bytes) -> bytes:
        """Remove WAV header if present"""
        if audio_bytes.startswith(b"RIFF"):
            data_pos = audio_bytes.find(b"data")
            if data_pos != -1:
                return audio_bytes[data_pos + 8:]
        return audio_bytes

    async def aclose(self) -> None:
        """Cleanup resources"""
        if self._session:
            await self._session.aclose()
        await super().aclose()

    async def interrupt(self) -> None:
        """Interrupt TTS"""
        if self.audio_track:
            self.audio_track.interrupt()

Base class for Text-to-Speech implementations

Initialize the HumeAI TTS plugin.

Args

api_key : Optional[str], optional
HumeAI API key. Defaults to None.
voice : Optional[str], optional
The voice to use for the TTS plugin. Defaults to "Serene Assistant".
speed : float
The speed to use for the TTS plugin. Defaults to 1.0.
response_format (Literal["pcm", "mp3", "wav"]): The response format to use for the TTS plugin. Defaults to "pcm".
instant_mode : bool
Whether to use instant mode for the TTS plugin. Defaults to True.

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    if self._session:
        await self._session.aclose()
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt TTS"""
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt TTS

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[str] | str,
    voice_id: Optional[str] = None,
    **kwargs: Any,
) -> None:
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track not set")
            return

        if isinstance(text, AsyncIterator):
            async for segment in segment_text(text):
                await self._synthesize_audio(segment, voice_id, **kwargs)
        else:
            await self._synthesize_audio(text, voice_id, **kwargs)

    except Exception as e:
        self.emit("error", f"Synthesis failed: {str(e)}")

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None