Package videosdk.plugins.murfai

Sub-modules

videosdk.plugins.murfai.tts

Classes

class MurfAITTS (*,
api_key: str | None = None,
region: str = 'GLOBAL',
model: str = 'Falcon',
voice: str = 'en-US-natalie',
voice_settings: MurfAIVoiceSettings | None = None,
enable_streaming: bool = True,
min_buffer_size: int = 3,
max_buffer_delay_in_ms: int = 0,
max_connection_age_sec: float = 300.0)
Expand source code
class MurfAITTS(TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        region: str = DEFAULT_REGION,
        model: str = DEFAULT_MODEL,
        voice: str = DEFAULT_VOICE_ID,
        voice_settings: MurfAIVoiceSettings | None = None,
        enable_streaming: bool = True,
        min_buffer_size: int = 3,
        max_buffer_delay_in_ms: int = 0,
        max_connection_age_sec: float = DEFAULT_CONNECTION_MAX_AGE_SEC,
    ) -> None:
        """Initialize the Murf.ai TTS plugin.

        Args:
            api_key (Optional[str]): Murf API key. Uses MURFAI_API_KEY env var if not provided.
            region (str): The region code (GLOBAL, US_EAST, UK, INDIA, etc.). Defaults to US_EAST.
            model (str): The model to use (GEN2, FALCON). Defaults to FALCON.
            voice (str): The voice ID to use.
            voice_settings (Optional[MurfAIVoiceSettings]): Advanced voice settings (pitch, rate, style).
            enable_streaming (bool): Whether to use WebSocket streaming (low latency) or HTTP chunks.
            min_buffer_size (int): WebSocket only. Server-side minimum character count
                before flushing. Lower → lower TTFB, higher → smoother prosody. Defaults to 3.
            max_buffer_delay_in_ms (int): WebSocket only. Server-side max wait time
                before flushing accumulated text. ``0`` means flush as soon as
                ``min_buffer_size`` is met. Defaults to 0.
            max_connection_age_sec (float): Refresh the WebSocket after this many seconds
                to avoid hitting Murf's idle/session limits.
        """
        super().__init__(
            sample_rate=MURFAI_SAMPLE_RATE, num_channels=MURFAI_CHANNELS
        )

        self.api_key = api_key or os.getenv("MURFAI_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Murf API key must be provided either through api_key parameter or MURFAI_API_KEY environment variable"
            )

        self.model = model
        self.voice = voice
        self.enable_streaming = enable_streaming
        self.voice_settings = voice_settings or MurfAIVoiceSettings()
        self.min_buffer_size = min_buffer_size
        self.max_buffer_delay_in_ms = max_buffer_delay_in_ms
        self._max_connection_age_sec = max_connection_age_sec
        self._ws_connect_time: float = 0.0

        base_domain = REGION_URLS.get(region.upper(), REGION_URLS["US_EAST"])
        self.http_base_url = f"https://{base_domain}/v1/speech"
        self.ws_base_url = f"wss://{base_domain}/v1/speech"

        self.audio_track = None
        self.loop = None
        self._first_chunk_sent = False

        self._session = httpx.AsyncClient(
            timeout=httpx.Timeout(connect=15.0, read=30.0, write=5.0, pool=5.0),
            follow_redirects=True,
            limits=httpx.Limits(
                max_connections=50,
                max_keepalive_connections=50,
                keepalive_expiry=120,
            ),
        )

        self._ws_session = None
        self._ws_connection = None
        self._streams = weakref.WeakSet()
        self._send_task: asyncio.Task | None = None
        self._recv_task: asyncio.Task | None = None
        self._should_stop = False
        self._connection_lock = asyncio.Lock()
        
        self._active_contexts: set[str] = set()
        self._context_futures: dict[str, asyncio.Future[None]] = {}

    def reset_first_audio_tracking(self) -> None:
        self._first_chunk_sent = False

    async def prewarm(self) -> None:
        """Pre-establish the Murf WebSocket so the first ``synthesize()`` call
        does not pay the TLS + auth + upgrade cost. Safe to call repeatedly.
        Skipped automatically when ``enable_streaming=False``."""
        if not self.enable_streaming:
            return
        try:
            await self._ensure_connection()
        except Exception as e:
            logger.warning(f"Murf TTS prewarm failed (non-fatal): {e}")

    async def synthesize(
        self,
        text: AsyncIterator[Union[str, FlushMarker]] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            target_voice = voice_id or self.voice
            self._should_stop = False

            if self.enable_streaming:
                await self._stream_synthesis(text, target_voice)
            else:
                if isinstance(text, str):
                    await self._chunked_synthesis(text, target_voice)
                else:
                    async for segment in segment_text(text):
                        if self._should_stop:
                            break
                        await self._chunked_synthesis(segment, target_voice)

        except Exception as e:
            self.emit("error", f"TTS synthesis failed: {str(e)}")
            raise

    async def _chunked_synthesis(self, text: str, voice_id: str) -> None:
        """Non-streaming synthesis using the HTTP POST API"""
        url = f"{self.http_base_url}/stream"

        headers = {
            "api-key": self.api_key,
            "Content-Type": "application/json",
        }

        payload = {
            "text": text,
            "voiceId": voice_id,
            "model": self.model,
            "format": "PCM",  
            "sampleRate": MURFAI_SAMPLE_RATE,
            "style": self.voice_settings.style,
            "rate": self.voice_settings.rate,
            "pitch": self.voice_settings.pitch,
            "variation": self.voice_settings.variation
        }

        if self.voice_settings.multi_native_locale:
            payload["multiNativeLocale"] = self.voice_settings.multi_native_locale

        try:
            async with self._session.stream("POST", url, headers=headers, json=payload) as response:
                if response.status_code >= 400:
                    await response.aread()
                    response.raise_for_status()
                
                async for chunk in response.aiter_bytes():
                    if self._should_stop:
                        break
                    if chunk:
                        await self._stream_audio_chunks(chunk)

        except httpx.HTTPStatusError as e:
            self.emit("error", f"HTTP error {e.response.status_code}: {e.response.text}")
            raise
        except Exception as e:
            self.emit("error", f"Chunked synthesis failed: {str(e)}")
            raise

    async def _stream_synthesis(
        self,
        text: Union[AsyncIterator[Union[str, FlushMarker]], str],
        voice_id: str,
    ) -> None:
        """WebSocket-based streaming synthesis.

        Forwards each text chunk verbatim with ``end=false``, sending a final
        ``end=true`` packet at end-of-stream. ``FlushMarker`` is informational
        — Murf's server flushes naturally based on ``min_buffer_size`` /
        ``max_buffer_delay_in_ms``, so client-side flush packets aren't needed.
        """
        try:
            await self._ensure_connection()

            context_id = uuid.uuid4().hex[:12]
            done_future: asyncio.Future[None] = asyncio.get_event_loop().create_future()
            self.register_context(context_id, done_future)

            async def _send_chunks() -> None:
                try:
                    if isinstance(text, str):
                        if text.strip():
                            await self.send_text(
                                context_id, f"{text} ", voice_id,
                                send_voice_config=True, is_end=True,
                            )
                        else:
                            if not done_future.done():
                                done_future.set_result(None)
                        return

                    has_sent = False
                    async for chunk in text:
                        if self._should_stop:
                            break
                        if isinstance(chunk, FlushMarker):
                            # No-op: Murf's server flushes via min_buffer_size /
                            # max_buffer_delay_in_ms. Client-side flush packets
                            # would conflict with that buffering.
                            continue
                        if not chunk or not chunk.strip():
                            continue
                        await self.send_text(
                            context_id, f"{chunk} ", voice_id,
                            send_voice_config=False, is_end=False,
                        )
                        has_sent = True

                    if has_sent and not self._should_stop:
                        await self.send_end(context_id, voice_id)
                    elif not has_sent and not done_future.done():
                        done_future.set_result(None)

                except Exception as e:
                    if not done_future.done():
                        done_future.set_exception(e)

            sender = asyncio.create_task(_send_chunks())

            try:
                await done_future
            finally:
                if not sender.done():
                    sender.cancel()
                try:
                    await sender
                except (asyncio.CancelledError, Exception):
                    pass

        except Exception as e:
            self.emit("error", f"Streaming synthesis failed: {str(e)}")
            raise

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        if not audio_bytes or self._should_stop:
            return

        if not self._first_chunk_sent and hasattr(self, '_first_audio_callback') and self._first_audio_callback:
            self._first_chunk_sent = True
            asyncio.create_task(self._first_audio_callback())

        if self.audio_track and self.loop:
            await self.audio_track.add_new_bytes(audio_bytes)

    async def interrupt(self) -> None:
        """Stop emitting audio for the current synthesis. Keeps the WebSocket
        open so the next turn does not pay reconnect cost; in-flight audio
        chunks for cancelled contexts are dropped via the ``_context_futures``
        filter in the receive loop."""
        self._should_stop = True

        if self.audio_track:
            self.audio_track.interrupt()

        # Cancel pending futures so the receive loop drops further audio for
        # these contexts. Server-side the contexts are also released, freeing
        # provider compute.
        for fut in list(self._context_futures.values()):
            if not fut.done():
                fut.cancel()

        await self.close_all_contexts()

    async def aclose(self) -> None:
        self._should_stop = True
        
        for task in [self._send_task, self._recv_task]:
            if task and not task.done():
                task.cancel()

        for stream in list(self._streams):
            try:
                await stream.aclose()
            except Exception:
                pass
        self._streams.clear()

        if self._ws_connection and not self._ws_connection.closed:
            await self._ws_connection.close()
        
        if self._ws_session and not self._ws_session.closed:
            await self._ws_session.close()

        self._ws_connection = None
        self._ws_session = None
        
        if self._session:
            await self._session.aclose()
        
        await super().aclose()

    async def _ensure_connection(self) -> None:
        async with self._connection_lock:
            now = asyncio.get_event_loop().time()

            if self._ws_connection and not self._ws_connection.closed:
                age = now - self._ws_connect_time
                if age < self._max_connection_age_sec:
                    return
                logger.info(f"Refreshing Murf WebSocket (age={age:.1f}s)")
                try:
                    await self._ws_connection.close()
                except Exception:
                    pass

            if self._ws_session and not self._ws_session.closed:
                try:
                    await self._ws_session.close()
                except Exception:
                    pass

            self._ws_session = aiohttp.ClientSession()

            params = {
                "api_key": self.api_key,
                "model": self.model,
                "sample_rate": str(MURFAI_SAMPLE_RATE),
                "format": "PCM",
                "channel_type": "MONO"
            }
            
            param_string = "&".join([f"{k}={v}" for k, v in params.items()])
            full_ws_url = f"{self.ws_base_url}/stream-input?{param_string}"
            
            headers = {"api_key": self.api_key}

            self._ws_connection = await asyncio.wait_for(
                self._ws_session.ws_connect(
                    full_ws_url, headers=headers, heartbeat=30.0
                ),
                timeout=10.0
            )
            self._ws_connect_time = now

            if self._recv_task and not self._recv_task.done():
                self._recv_task.cancel()
            self._recv_task = asyncio.create_task(self._recv_loop())

    def register_context(self, context_id: str, done_future: asyncio.Future[None]) -> None:
        self._context_futures[context_id] = done_future

    def _get_voice_config(self, voice_id: str) -> dict:
        config = {
            "voice_id": voice_id,
            "style": self.voice_settings.style,
            "rate": self.voice_settings.rate,
            "pitch": self.voice_settings.pitch,
            "variation": self.voice_settings.variation,
        }
        if self.voice_settings.multi_native_locale:
            config["multi_native_locale"] = self.voice_settings.multi_native_locale
        return config

    async def send_text(self, context_id: str, text: str, voice_id: str, send_voice_config: bool = False, is_end: bool = False) -> None:
        """Sends a text segment to Murf."""
        if not self._ws_connection or self._ws_connection.closed:
            raise RuntimeError("WebSocket connection is closed")

        if not text or not text.strip():
            return

        payload: dict[str, Any] = {
            "text": text,
            "context_id": context_id,
            "end": is_end,
            "min_buffer_size": self.min_buffer_size,
            "max_buffer_delay_in_ms": self.max_buffer_delay_in_ms,
        }

        if context_id not in self._active_contexts:
            payload["voice_config"] = self._get_voice_config(voice_id)
            self._active_contexts.add(context_id)

        await self._ws_connection.send_str(json.dumps(payload))
    
    async def send_end(self, context_id: str, voice_id: str) -> None:
        """Sends the end message to finalize the context."""
        if not self._ws_connection or self._ws_connection.closed:
            return
        
        payload = {
            "text": " ",
            "context_id": context_id,
            "end": True
        }
        
        await self._ws_connection.send_str(json.dumps(payload))

    async def close_context(self, context_id: str) -> None:
        """Clears a specific context."""
        if not self._ws_connection or self._ws_connection.closed:
            return
        
        try:
            payload = {
                "clear": True,
                "context_id": context_id
            }
            await self._ws_connection.send_str(json.dumps(payload))
            self._active_contexts.discard(context_id)
        except Exception:
            pass

    async def close_all_contexts(self) -> None:
        try:
            for context_id in list(self._active_contexts):
                await self.close_context(context_id)
        except Exception:
            pass

    async def _recv_loop(self) -> None:
        try:
            while self._ws_connection and not self._ws_connection.closed:
                msg = await self._ws_connection.receive()
                
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(msg.data)

                    if "error" in data:
                        self.emit("error", f"WebSocket error: {data.get('error')}")
                        ctx_id = data.get("context_id")
                        if ctx_id:
                            fut = self._context_futures.get(ctx_id)
                            if fut and not fut.done():
                                fut.set_exception(RuntimeError(data.get("error", "Unknown error")))
                        continue

                    if "audio" in data and data["audio"]:
                        # Drop audio for cancelled contexts (interrupt() cancelled
                        # the future). This is how barge-in avoids reconnecting.
                        ctx_id = data.get("context_id")
                        fut = self._context_futures.get(ctx_id) if ctx_id else None
                        ctx_alive = fut is not None and not fut.done()
                        try:
                            audio_chunk = base64.b64decode(data["audio"])
                            if audio_chunk and not self._should_stop and ctx_alive:
                                await self._stream_audio_chunks(audio_chunk)
                        except Exception:
                            continue

                    if data.get("final") is True:
                        ctx_id = data.get("context_id")
                        if ctx_id:
                            fut = self._context_futures.pop(ctx_id, None)
                            self._active_contexts.discard(ctx_id)
                            if fut and not fut.done():
                                fut.set_result(None)

                elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING):
                    break
                    
        except Exception as e:
            self.emit("error", f"WebSocket receive loop error: {str(e)}")
            for fut in self._context_futures.values():
                if not fut.done():
                    fut.set_exception(RuntimeError(f"WebSocket receive loop error: {e}"))
            self._context_futures.clear()

Base class for Text-to-Speech implementations

Initialize the Murf.ai TTS plugin.

Args

api_key : Optional[str]
Murf API key. Uses MURFAI_API_KEY env var if not provided.
region : str
The region code (GLOBAL, US_EAST, UK, INDIA, etc.). Defaults to US_EAST.
model : str
The model to use (GEN2, FALCON). Defaults to FALCON.
voice : str
The voice ID to use.
voice_settings : Optional[MurfAIVoiceSettings]
Advanced voice settings (pitch, rate, style).
enable_streaming : bool
Whether to use WebSocket streaming (low latency) or HTTP chunks.
min_buffer_size : int
WebSocket only. Server-side minimum character count before flushing. Lower → lower TTFB, higher → smoother prosody. Defaults to 3.
max_buffer_delay_in_ms : int
WebSocket only. Server-side max wait time before flushing accumulated text. 0 means flush as soon as min_buffer_size is met. Defaults to 0.
max_connection_age_sec : float
Refresh the WebSocket after this many seconds to avoid hitting Murf's idle/session limits.

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._should_stop = True
    
    for task in [self._send_task, self._recv_task]:
        if task and not task.done():
            task.cancel()

    for stream in list(self._streams):
        try:
            await stream.aclose()
        except Exception:
            pass
    self._streams.clear()

    if self._ws_connection and not self._ws_connection.closed:
        await self._ws_connection.close()
    
    if self._ws_session and not self._ws_session.closed:
        await self._ws_session.close()

    self._ws_connection = None
    self._ws_session = None
    
    if self._session:
        await self._session.aclose()
    
    await super().aclose()

Cleanup resources

async def close_all_contexts(self) ‑> None
Expand source code
async def close_all_contexts(self) -> None:
    try:
        for context_id in list(self._active_contexts):
            await self.close_context(context_id)
    except Exception:
        pass
async def close_context(self, context_id: str) ‑> None
Expand source code
async def close_context(self, context_id: str) -> None:
    """Clears a specific context."""
    if not self._ws_connection or self._ws_connection.closed:
        return
    
    try:
        payload = {
            "clear": True,
            "context_id": context_id
        }
        await self._ws_connection.send_str(json.dumps(payload))
        self._active_contexts.discard(context_id)
    except Exception:
        pass

Clears a specific context.

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Stop emitting audio for the current synthesis. Keeps the WebSocket
    open so the next turn does not pay reconnect cost; in-flight audio
    chunks for cancelled contexts are dropped via the ``_context_futures``
    filter in the receive loop."""
    self._should_stop = True

    if self.audio_track:
        self.audio_track.interrupt()

    # Cancel pending futures so the receive loop drops further audio for
    # these contexts. Server-side the contexts are also released, freeing
    # provider compute.
    for fut in list(self._context_futures.values()):
        if not fut.done():
            fut.cancel()

    await self.close_all_contexts()

Stop emitting audio for the current synthesis. Keeps the WebSocket open so the next turn does not pay reconnect cost; in-flight audio chunks for cancelled contexts are dropped via the _context_futures filter in the receive loop.

async def prewarm(self) ‑> None
Expand source code
async def prewarm(self) -> None:
    """Pre-establish the Murf WebSocket so the first ``synthesize()`` call
    does not pay the TLS + auth + upgrade cost. Safe to call repeatedly.
    Skipped automatically when ``enable_streaming=False``."""
    if not self.enable_streaming:
        return
    try:
        await self._ensure_connection()
    except Exception as e:
        logger.warning(f"Murf TTS prewarm failed (non-fatal): {e}")

Pre-establish the Murf WebSocket so the first synthesize() call does not pay the TLS + auth + upgrade cost. Safe to call repeatedly. Skipped automatically when enable_streaming=False.

def register_context(self, context_id: str, done_future: asyncio.Future[None]) ‑> None
Expand source code
def register_context(self, context_id: str, done_future: asyncio.Future[None]) -> None:
    self._context_futures[context_id] = done_future
def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def send_end(self, context_id: str, voice_id: str) ‑> None
Expand source code
async def send_end(self, context_id: str, voice_id: str) -> None:
    """Sends the end message to finalize the context."""
    if not self._ws_connection or self._ws_connection.closed:
        return
    
    payload = {
        "text": " ",
        "context_id": context_id,
        "end": True
    }
    
    await self._ws_connection.send_str(json.dumps(payload))

Sends the end message to finalize the context.

async def send_text(self,
context_id: str,
text: str,
voice_id: str,
send_voice_config: bool = False,
is_end: bool = False) ‑> None
Expand source code
async def send_text(self, context_id: str, text: str, voice_id: str, send_voice_config: bool = False, is_end: bool = False) -> None:
    """Sends a text segment to Murf."""
    if not self._ws_connection or self._ws_connection.closed:
        raise RuntimeError("WebSocket connection is closed")

    if not text or not text.strip():
        return

    payload: dict[str, Any] = {
        "text": text,
        "context_id": context_id,
        "end": is_end,
        "min_buffer_size": self.min_buffer_size,
        "max_buffer_delay_in_ms": self.max_buffer_delay_in_ms,
    }

    if context_id not in self._active_contexts:
        payload["voice_config"] = self._get_voice_config(voice_id)
        self._active_contexts.add(context_id)

    await self._ws_connection.send_str(json.dumps(payload))

Sends a text segment to Murf.

async def synthesize(self,
text: AsyncIterator[Union[str, FlushMarker]] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[Union[str, FlushMarker]] | str,
    voice_id: Optional[str] = None,
    **kwargs: Any,
) -> None:
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        target_voice = voice_id or self.voice
        self._should_stop = False

        if self.enable_streaming:
            await self._stream_synthesis(text, target_voice)
        else:
            if isinstance(text, str):
                await self._chunked_synthesis(text, target_voice)
            else:
                async for segment in segment_text(text):
                    if self._should_stop:
                        break
                    await self._chunked_synthesis(segment, target_voice)

    except Exception as e:
        self.emit("error", f"TTS synthesis failed: {str(e)}")
        raise

Convert text to speech

Args

text
Text to convert to speech. Either a plain string or an async iterator that may yield str chunks and FlushMarker segment-boundary markers. Plugins that don't support per-segment flushing should drop the markers with an inline isinstance check (or rely on segment_text which already drops them).
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None

class MurfAIVoiceSettings (pitch: int = 0,
rate: int = 0,
style: str = 'Conversational',
variation: int = 1,
multi_native_locale: Optional[str] = None)
Expand source code
@dataclass
class MurfAIVoiceSettings:
    """Settings specific to Murf.ai voice generation."""
    pitch: int = 0
    rate: int = 0
    style: str = "Conversational"
    variation: int = 1
    multi_native_locale: Optional[str] = None

Settings specific to Murf.ai voice generation.

Instance variables

var multi_native_locale : str | None
var pitch : int
var rate : int
var style : str
var variation : int