Module videosdk.plugins.inworldai.tts
Classes
class InworldAITTS (*,
api_key: str | None = None,
model_id: str = 'inworld-tts-1.5-max',
voice_id: str = 'Sarah',
temperature: float = 0.8,
sample_rate: int = 24000,
enable_streaming: bool = True,
auto_mode: bool = True,
max_buffer_delay_ms: int | None = None,
buffer_char_threshold: int | None = None,
apply_text_normalization: str | None = None,
speaking_rate: float | None = None,
max_connection_age_sec: float = 300.0)-
Expand source code
class InworldAITTS(TTS): """ Inworld AI Text-to-Speech plugin. Supports two transports: - WebSocket (default, ``enable_streaming=True``) — uses the Inworld bidirectional streaming endpoint with per-call contexts. Server-side ``autoMode`` handles intelligent flushing for low-latency, full-quality prosody. Best fit for the agent pipeline because the server combines sentence chunks into one continuous synthesis context. - HTTP (``enable_streaming=False``) — uses ``voice:stream`` with chunked transfer encoding. One POST per ``synthesize()`` call. Both transports forward incoming text chunks verbatim and never re-segment client-side: the upstream tokenizer / text filter already delivers sentence-sized, verbalized segments, and re-tokenizing here would split currency / multi-digit tokens (``$50,000``, ``₹50,00,000``) and break prosody at chunk boundaries. """ def __init__( self, *, api_key: str | None = None, model_id: str = DEFAULT_MODEL, voice_id: str = DEFAULT_VOICE, temperature: float = DEFAULT_TEMPERATURE, sample_rate: int = INWORLD_SAMPLE_RATE, enable_streaming: bool = True, auto_mode: bool = True, max_buffer_delay_ms: int | None = None, buffer_char_threshold: int | None = None, apply_text_normalization: str | None = None, speaking_rate: float | None = None, max_connection_age_sec: float = DEFAULT_CONNECTION_MAX_AGE_SEC, ) -> None: """Initialize the InworldAI TTS plugin. Audio is always requested as raw 16-bit signed LE PCM (Inworld ``"PCM"`` encoding) so it can be forwarded directly to the agent's audio track without header parsing. Args: api_key: Inworld API key. Falls back to ``INWORLD_API_KEY`` env var. model_id: TTS model id (e.g. ``"inworld-tts-1.5-max"``, ``"inworld-tts-1"``). voice_id: Voice id (e.g. ``"Sarah"``, ``"Hades"``, ``"Ashley"``). temperature: Sampling temperature, 0.0–2.0. Defaults to 0.8. sample_rate: Output sample rate in Hz. Defaults to 24000. enable_streaming: ``True`` (default) → WebSocket bidirectional streaming. ``False`` → HTTP streaming POST. auto_mode: WSS only — when ``True`` (default), the server controls buffer flushing for minimal latency. Recommended when text arrives as full sentences/phrases (which is what the agent pipeline produces). max_buffer_delay_ms: WSS only — server-side max wait time before flushing accumulated text. ``None`` = unbounded. buffer_char_threshold: WSS only — server-side character count that auto-triggers flushing. Defaults to 1000 server-side; cannot exceed 1000. apply_text_normalization: ``"ON"``, ``"OFF"``, or ``None`` (server decides). When on, ``Dr. Smith`` → ``Doctor Smith`` and ``3/10/25`` → ``March tenth, twenty twenty-five``. speaking_rate: Speed multiplier in the range [0.5, 1.5]. ``None`` uses the voice's natural rate (1.0). """ super().__init__(sample_rate=sample_rate, num_channels=INWORLD_CHANNELS) self.api_key = api_key or os.getenv("INWORLD_API_KEY") if not self.api_key: raise ValueError( "InworldAI API key must be provided either through:\n" "1. api_key parameter, OR\n" "2. INWORLD_API_KEY environment variable" ) self.model_id = model_id self.voice_id = voice_id self.temperature = temperature self.enable_streaming = enable_streaming self.auto_mode = auto_mode self.max_buffer_delay_ms = max_buffer_delay_ms self.buffer_char_threshold = buffer_char_threshold self.apply_text_normalization = apply_text_normalization self.speaking_rate = speaking_rate self._max_connection_age_sec = max_connection_age_sec self.audio_track = None self.loop = None self._first_chunk_sent = False self._interrupted = False self._ws_connect_time: float = 0.0 self._auth_header = f"Basic {self.api_key}" # HTTP transport self._http_client = httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=30.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ) # WSS transport self._ws_session: aiohttp.ClientSession | None = None self._ws_connection: aiohttp.ClientWebSocketResponse | None = None self._connection_lock = asyncio.Lock() self._receive_task: asyncio.Task | None = None self._context_futures: dict[str, asyncio.Future[None]] = {} def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = False async def prewarm(self) -> None: """Pre-establish the Inworld WebSocket so the first ``synthesize()`` call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly. Skipped automatically when ``enable_streaming=False``.""" if not self.enable_streaming: return try: await self._ensure_ws_connection() except Exception as e: logger.warning(f"Inworld TTS prewarm failed (non-fatal): {e}") async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """Convert text to speech and stream PCM frames to the audio track. For ``AsyncIterator`` inputs, chunks are forwarded verbatim into a single synthesis context (WSS) or accumulated into a single request (HTTP). Either way, no client-side re-segmentation happens — currency tokens, comma-grouped digits, and Devanagari verbalizations span a single coherent synthesis call. """ try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._interrupted = False if self.enable_streaming: await self._stream_synthesis(text, voice_id) else: await self._http_synthesis(text, voice_id) except Exception as e: self.emit("error", f"InworldAI TTS synthesis failed: {str(e)}") async def aclose(self) -> None: """Cleanup resources""" self._interrupted = True await self._close_ws_resources() if self._http_client and not self._http_client.is_closed: await self._http_client.aclose() await super().aclose() async def interrupt(self) -> None: """Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; in-flight audio chunks for the cancelled context are dropped via context_id filtering in the receive loop (cancelled futures stop new audio from streaming).""" self._interrupted = True if self.audio_track: self.audio_track.interrupt() for fut in list(self._context_futures.values()): if not fut.done(): fut.cancel() # ── WSS path ─────────────────────────────────────────────────────────── async def _stream_synthesis( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str], ) -> None: """WebSocket-based synthesis using the Inworld bidirectional endpoint. Per call: open a fresh context with audio config, forward text chunks verbatim, dispatch a ``flush_context`` on every ``FlushMarker`` (mid- stream sentence boundaries) plus once at end-of-stream, then send ``close_context`` and await ``flushCompleted``. """ context_id = "" try: await self._ensure_ws_connection() if not self._ws_connection: raise RuntimeError("WebSocket connection is not available.") context_id = os.urandom(8).hex() done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future() self._context_futures[context_id] = done_future await self._send_create_context(context_id, voice_id) if isinstance(text, str): if text and text.strip(): await self._send_text(context_id, text) else: async for chunk in text: if self._interrupted: break if isinstance(chunk, FlushMarker): await self._send_flush(context_id) continue if not chunk or not chunk.strip(): continue await self._send_text(context_id, chunk) if not self._interrupted: await self._send_flush(context_id) try: await done_future except asyncio.CancelledError: return finally: if not self._interrupted: await self._send_close(context_id) except Exception as e: future = self._context_futures.get(context_id) if future and not future.done(): future.set_exception(e) self.emit("error", f"InworldAI WSS synthesis failed: {str(e)}") raise finally: if context_id and context_id in self._context_futures: del self._context_futures[context_id] async def _ensure_ws_connection(self) -> None: """Open a WebSocket connection if one isn't already alive, refreshing if older than ``max_connection_age_sec``.""" async with self._connection_lock: now = asyncio.get_event_loop().time() if self._ws_connection and not self._ws_connection.closed: age = now - self._ws_connect_time if age < self._max_connection_age_sec: return logger.info(f"Refreshing Inworld WebSocket (age={age:.1f}s)") if self._receive_task and not self._receive_task.done(): self._receive_task.cancel() if self._ws_connection: try: await self._ws_connection.close() except Exception: pass if self._ws_session: try: await self._ws_session.close() except Exception: pass try: self._ws_session = aiohttp.ClientSession() self._ws_connection = await asyncio.wait_for( self._ws_session.ws_connect( INWORLD_TTS_WSS_ENDPOINT, headers={"Authorization": self._auth_header}, heartbeat=30.0, ), timeout=10.0, ) self._ws_connect_time = now self._receive_task = asyncio.create_task(self._recv_loop()) except Exception as e: self.emit("error", f"Failed to connect to Inworld WSS: {e}") raise def _build_audio_config(self) -> dict[str, Any]: # Always request raw PCM — no header parsing, direct push to track. cfg: dict[str, Any] = { "audioEncoding": "PCM", "sampleRateHertz": self._sample_rate, } if self.speaking_rate is not None: cfg["speakingRate"] = self.speaking_rate return cfg async def _send_create_context( self, context_id: str, voice_id: Optional[str], ) -> None: create_payload: dict[str, Any] = { "voiceId": voice_id or self.voice_id, "modelId": self.model_id, "audioConfig": self._build_audio_config(), "temperature": self.temperature, "autoMode": self.auto_mode, } if self.max_buffer_delay_ms is not None: create_payload["maxBufferDelayMs"] = self.max_buffer_delay_ms if self.buffer_char_threshold is not None: create_payload["bufferCharThreshold"] = self.buffer_char_threshold if self.apply_text_normalization is not None: create_payload["applyTextNormalization"] = self.apply_text_normalization await self._ws_send({"create": create_payload, "contextId": context_id}) async def _send_text(self, context_id: str, text: str) -> None: for piece in self._slice_for_inworld(text, max_len=1000): await self._ws_send({ "send_text": {"text": piece}, "contextId": context_id, }) async def _send_flush(self, context_id: str) -> None: await self._ws_send({"flush_context": {}, "contextId": context_id}) async def _send_close(self, context_id: str) -> None: await self._ws_send({"close_context": {}, "contextId": context_id}) async def _ws_send(self, payload: dict[str, Any]) -> None: if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.send_str(json.dumps(payload)) @staticmethod def _slice_for_inworld(text: str, max_len: int) -> list[str]: """Split a string into ≤ max_len pieces without breaking words. Almost always returns a single piece (sentence chunks from the framework are well under 1000 chars); the splitter only kicks in for the rare case where a single ``send_text`` would exceed Inworld's per-message cap. """ if len(text) <= max_len: return [text] pieces: list[str] = [] remaining = text while len(remaining) > max_len: cut = remaining.rfind(" ", 0, max_len) if cut <= 0: cut = max_len pieces.append(remaining[:cut]) remaining = remaining[cut:].lstrip() if remaining: pieces.append(remaining) return pieces async def _recv_loop(self) -> None: """Long-running receive task: parses every server message and routes audio bytes to the audio track + signals flush completion.""" try: while self._ws_connection and not self._ws_connection.closed: msg = await self._ws_connection.receive() if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): break if msg.type != aiohttp.WSMsgType.TEXT: continue try: data = json.loads(msg.data) except json.JSONDecodeError: continue result = data.get("result") or {} context_id = result.get("contextId") # Top-level error (e.g. auth, malformed payload) — fail any # in-flight contexts so synthesize() doesn't hang. if "error" in data: err = data["error"] msg_str = err.get("message", "Unknown error") if isinstance(err, dict) else str(err) self._fail_all_pending(RuntimeError(f"Inworld API error: {msg_str}")) continue # Per-context status with non-zero gRPC code → error status = result.get("status") or {} if status and status.get("code", 0) != 0: err_msg = status.get("message", f"gRPC code {status.get('code')}") fut = self._context_futures.get(context_id) if fut and not fut.done(): fut.set_exception(RuntimeError(f"Inworld error: {err_msg}")) continue if "audioChunk" in result: chunk = result["audioChunk"] or {} audio_b64 = chunk.get("audioContent") if audio_b64: audio_bytes = base64.b64decode(audio_b64) await self._stream_audio_chunk(audio_bytes) elif "flushCompleted" in result: fut = self._context_futures.get(context_id) if fut and not fut.done(): fut.set_result(None) elif "contextCreated" in result or "contextClosed" in result: # Lifecycle events; nothing to do. pass except asyncio.CancelledError: pass except Exception as e: logger.warning(f"Inworld WSS receive loop error: {e}") self._fail_all_pending(e) finally: self._fail_all_pending(RuntimeError("Inworld WSS connection closed")) def _fail_all_pending(self, exc: BaseException) -> None: for fut in list(self._context_futures.values()): if not fut.done(): fut.set_exception(exc) async def _close_ws_resources(self) -> None: if self._receive_task and not self._receive_task.done(): self._receive_task.cancel() if self._ws_connection and not self._ws_connection.closed: await self._ws_connection.close() if self._ws_session and not self._ws_session.closed: await self._ws_session.close() self._receive_task = None self._ws_connection = None self._ws_session = None self._context_futures.clear() # ── HTTP path ────────────────────────────────────────────────────────── async def _http_synthesis( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str], ) -> None: """HTTP streaming synthesis. For AsyncIterator inputs, accumulates all chunks into a single POST so currency / number patterns span a single request — same fix pattern as Sarvam HTTP and OpenAI. ``FlushMarker`` segment markers are silently dropped (HTTP path has no per-segment flush primitive).""" if isinstance(text, str): if text.strip(): await self._http_post(text, voice_id) return parts: list[str] = [] async for chunk in text: if self._interrupted: break if isinstance(chunk, FlushMarker): continue if chunk and chunk.strip(): parts.append(chunk) if parts and not self._interrupted: combined = "".join(parts) if combined.strip(): await self._http_post(combined, voice_id) async def _http_post(self, text: str, voice_id: Optional[str]) -> None: payload = { "text": text, "voiceId": voice_id or self.voice_id, "modelId": self.model_id, "audioConfig": { "temperature": self.temperature, "audioEncoding": "PCM", "sampleRateHertz": self._sample_rate, }, } if self.speaking_rate is not None: payload["audioConfig"]["speakingRate"] = self.speaking_rate headers = { "Authorization": self._auth_header, "Content-Type": "application/json", "Accept": "application/json", } try: async with self._http_client.stream( "POST", INWORLD_TTS_HTTP_ENDPOINT, headers=headers, json=payload, ) as response: response.raise_for_status() async for line in response.aiter_lines(): if self._interrupted: break if not line: continue try: data = json.loads(line) except json.JSONDecodeError: continue if "error" in data: err = data["error"] msg = err.get("message", "Unknown error") if isinstance(err, dict) else str(err) self.emit("error", f"InworldAI API error: {msg}") return audio_b64 = ( data.get("result", {}).get("audioContent") or data.get("result", {}).get("audioChunk", {}).get("audioContent") ) if audio_b64: await self._stream_audio_chunk(base64.b64decode(audio_b64)) except httpx.HTTPStatusError as e: if e.response.status_code == 401: self.emit("error", "InworldAI authentication failed. Please check your API key.") elif e.response.status_code == 400: try: err = e.response.json().get("error", {}).get("message", "Bad request") except Exception: err = "Bad request" self.emit("error", f"InworldAI request error: {err}") else: self.emit("error", f"InworldAI HTTP error: {e.response.status_code}") raise # ── Audio output ─────────────────────────────────────────────────────── async def _stream_audio_chunk(self, audio_bytes: bytes) -> None: """Push raw PCM bytes straight to the audio track.""" if not audio_bytes or self._interrupted: return if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() if self.audio_track: asyncio.create_task(self.audio_track.add_new_bytes(audio_bytes)) await asyncio.sleep(0.001)Inworld AI Text-to-Speech plugin.
Supports two transports:
-
WebSocket (default,
enable_streaming=True) — uses the Inworld bidirectional streaming endpoint with per-call contexts. Server-sideautoModehandles intelligent flushing for low-latency, full-quality prosody. Best fit for the agent pipeline because the server combines sentence chunks into one continuous synthesis context. -
HTTP (
enable_streaming=False) — usesvoice:streamwith chunked transfer encoding. One POST persynthesize()call.
Both transports forward incoming text chunks verbatim and never re-segment client-side: the upstream tokenizer / text filter already delivers sentence-sized, verbalized segments, and re-tokenizing here would split currency / multi-digit tokens (
$50,000,₹50,00,000) and break prosody at chunk boundaries.Initialize the InworldAI TTS plugin.
Audio is always requested as raw 16-bit signed LE PCM (Inworld
"PCM"encoding) so it can be forwarded directly to the agent's audio track without header parsing.Args
api_key- Inworld API key. Falls back to
INWORLD_API_KEYenv var. model_id- TTS model id (e.g.
"inworld-tts-1.5-max","inworld-tts-1"). voice_id- Voice id (e.g.
"Sarah","Hades","Ashley"). temperature- Sampling temperature, 0.0–2.0. Defaults to 0.8.
sample_rate- Output sample rate in Hz. Defaults to 24000.
enable_streamingTrue(default) → WebSocket bidirectional streaming.False→ HTTP streaming POST.auto_mode- WSS only — when
True(default), the server controls buffer flushing for minimal latency. Recommended when text arrives as full sentences/phrases (which is what the agent pipeline produces). max_buffer_delay_ms- WSS only — server-side max wait time before
flushing accumulated text.
None= unbounded. buffer_char_threshold- WSS only — server-side character count that auto-triggers flushing. Defaults to 1000 server-side; cannot exceed 1000.
apply_text_normalization"ON","OFF", orNone(server decides). When on,Dr. Smith→Doctor Smithand3/10/25→March tenth, twenty twenty-five.speaking_rate- Speed multiplier in the range [0.5, 1.5].
Noneuses the voice's natural rate (1.0).
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" self._interrupted = True await self._close_ws_resources() if self._http_client and not self._http_client.is_closed: await self._http_client.aclose() await super().aclose()Cleanup resources
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; in-flight audio chunks for the cancelled context are dropped via context_id filtering in the receive loop (cancelled futures stop new audio from streaming).""" self._interrupted = True if self.audio_track: self.audio_track.interrupt() for fut in list(self._context_futures.values()): if not fut.done(): fut.cancel()Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; in-flight audio chunks for the cancelled context are dropped via context_id filtering in the receive loop (cancelled futures stop new audio from streaming).
async def prewarm(self) ‑> None-
Expand source code
async def prewarm(self) -> None: """Pre-establish the Inworld WebSocket so the first ``synthesize()`` call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly. Skipped automatically when ``enable_streaming=False``.""" if not self.enable_streaming: return try: await self._ensure_ws_connection() except Exception as e: logger.warning(f"Inworld TTS prewarm failed (non-fatal): {e}")Pre-establish the Inworld WebSocket so the first
synthesize()call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly. Skipped automatically whenenable_streaming=False. def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" self._first_chunk_sent = FalseReset the first audio tracking state for next TTS task
async def synthesize(self,
text: AsyncIterator[Union[str, FlushMarker]] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: """Convert text to speech and stream PCM frames to the audio track. For ``AsyncIterator`` inputs, chunks are forwarded verbatim into a single synthesis context (WSS) or accumulated into a single request (HTTP). Either way, no client-side re-segmentation happens — currency tokens, comma-grouped digits, and Devanagari verbalizations span a single coherent synthesis call. """ try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._interrupted = False if self.enable_streaming: await self._stream_synthesis(text, voice_id) else: await self._http_synthesis(text, voice_id) except Exception as e: self.emit("error", f"InworldAI TTS synthesis failed: {str(e)}")Convert text to speech and stream PCM frames to the audio track.
For
AsyncIteratorinputs, chunks are forwarded verbatim into a single synthesis context (WSS) or accumulated into a single request (HTTP). Either way, no client-side re-segmentation happens — currency tokens, comma-grouped digits, and Devanagari verbalizations span a single coherent synthesis call.
-