Package videosdk.plugins.inworldai
Sub-modules
videosdk.plugins.inworldai.tts
Classes
class InworldAITTS (*,
api_key: str | None = None,
model_id: str = 'inworld-tts-1',
voice_id: str = 'Hades',
temperature: float = 0.8,
audio_encoding: str = 'LINEAR16',
sample_rate: int = 24000)-
Expand source code
class InworldAITTS(TTS): def __init__( self, *, api_key: str | None = None, model_id: str = DEFAULT_MODEL, voice_id: str = DEFAULT_VOICE, temperature: float = DEFAULT_TEMPERATURE, audio_encoding: str = "LINEAR16", sample_rate: int = INWORLD_SAMPLE_RATE, ) -> None: """Initialize the InworldAI TTS plugin. Args: api_key (Optional[str], optional): InworldAI API key. Defaults to None. model_id (str): The model ID to use for the TTS plugin. Defaults to "inworld-tts-1". voice_id (str): The voice ID to use for the TTS plugin. Defaults to "Hades". temperature (float): The temperature to use for the TTS plugin. Defaults to 0.8. audio_encoding (str): The audio encoding to use for the TTS plugin. Defaults to "LINEAR16". sample_rate (int): The sample rate to use for the TTS plugin. Defaults to 24000. """ super().__init__(sample_rate=sample_rate, num_channels=INWORLD_CHANNELS) self.model_id = model_id self.voice_id = voice_id self.temperature = temperature self.audio_encoding = audio_encoding self.audio_track = None self.loop = None self._first_chunk_sent = False self.api_key = api_key or os.getenv("INWORLD_API_KEY") if not self.api_key: raise ValueError( "InworldAI API key must be provided either through:\n" "1. api_key parameter, OR\n" "2. INWORLD_API_KEY environment variable" ) self._auth_header = f"Basic {self.api_key}" self._http_client = httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=30.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ) def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """ Convert text to speech using InworldAI's streaming TTS API Args: text: Text to convert to speech voice_id: Optional voice override **kwargs: Additional provider-specific arguments """ try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return if isinstance(text, AsyncIterator): async for segment in segment_text(text): await self._synthesize_streaming(segment, voice_id) else: await self._synthesize_streaming(text, voice_id) except Exception as e: self.emit("error", f"InworldAI TTS synthesis failed: {str(e)}") async def _synthesize_streaming( self, text: str, voice_id: Optional[str] = None ) -> None: """Synthesize text using the streaming endpoint""" try: payload = { "text": text, "voiceId": voice_id or self.voice_id, "modelId": self.model_id, "audioConfig": { "temperature": self.temperature, "audioEncoding": self.audio_encoding, "sampleRateHertz": self._sample_rate, }, } headers = { "Authorization": self._auth_header, "Content-Type": "application/json", "Accept": "application/json", } async with self._http_client.stream( "POST", INWORLD_TTS_STREAMING_ENDPOINT, headers=headers, json=payload, ) as response: response.raise_for_status() async for line in response.aiter_lines(): if not line: continue try: data = json.loads(line) if "error" in data: error = data["error"] self.emit( "error", f"InworldAI API error: {error.get('message', 'Unknown error')}") return if "result" in data and "audioContent" in data["result"]: audio_content_b64 = data["result"]["audioContent"] if audio_content_b64: audio_bytes = base64.b64decode( audio_content_b64) await self._stream_audio_chunk(audio_bytes) except json.JSONDecodeError: continue except Exception as e: self.emit( "error", f"Error processing stream chunk: {str(e)}") except httpx.HTTPStatusError as e: if e.response.status_code == 401: self.emit( "error", "InworldAI authentication failed. Please check your API key.") elif e.response.status_code == 400: try: error_data = e.response.json() error_msg = error_data.get("error", {}).get( "message", "Bad request") self.emit("error", f"InworldAI request error: {error_msg}") except: self.emit( "error", "InworldAI bad request. Please check your parameters.") else: self.emit( "error", f"InworldAI HTTP error: {e.response.status_code}") raise async def _stream_audio_chunk(self, audio_bytes: bytes) -> None: """Stream a single audio chunk, removing WAV header if present""" if not audio_bytes: return audio_data = self._remove_wav_header(audio_bytes) if audio_data: if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() asyncio.create_task(self.audio_track.add_new_bytes(audio_data)) await asyncio.sleep(0.001) def _remove_wav_header(self, audio_bytes: bytes) -> bytes: """Remove WAV header if present to get raw PCM data""" if audio_bytes.startswith(b"RIFF"): data_pos = audio_bytes.find(b"data") if data_pos != -1: return audio_bytes[data_pos + 8:] return audio_bytes async def aclose(self) -> None: """Cleanup resources""" if self._http_client: await self._http_client.aclose() await super().aclose() async def interrupt(self) -> None: """Interrupt the TTS process""" if self.audio_track: self.audio_track.interrupt()
Base class for Text-to-Speech implementations
Initialize the InworldAI TTS plugin.
Args
api_key
:Optional[str]
, optional- InworldAI API key. Defaults to None.
model_id
:str
- The model ID to use for the TTS plugin. Defaults to "inworld-tts-1".
voice_id
:str
- The voice ID to use for the TTS plugin. Defaults to "Hades".
temperature
:float
- The temperature to use for the TTS plugin. Defaults to 0.8.
audio_encoding
:str
- The audio encoding to use for the TTS plugin. Defaults to "LINEAR16".
sample_rate
:int
- The sample rate to use for the TTS plugin. Defaults to 24000.
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" if self._http_client: await self._http_client.aclose() await super().aclose()
Cleanup resources
async def interrupt(self) ‑> None
-
Expand source code
async def interrupt(self) -> None: """Interrupt the TTS process""" if self.audio_track: self.audio_track.interrupt()
Interrupt the TTS process
def reset_first_audio_tracking(self) ‑> None
-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False
Reset the first audio tracking state for next TTS task
async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """ Convert text to speech using InworldAI's streaming TTS API Args: text: Text to convert to speech voice_id: Optional voice override **kwargs: Additional provider-specific arguments """ try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return if isinstance(text, AsyncIterator): async for segment in segment_text(text): await self._synthesize_streaming(segment, voice_id) else: await self._synthesize_streaming(text, voice_id) except Exception as e: self.emit("error", f"InworldAI TTS synthesis failed: {str(e)}")
Convert text to speech using InworldAI's streaming TTS API
Args
text
- Text to convert to speech
voice_id
- Optional voice override
**kwargs
- Additional provider-specific arguments