Module videosdk.plugins.simli.simli

Classes

class SimliAudioTrack (loop: asyncio.events.AbstractEventLoop, simli_client: simli.simli.SimliClient)
Expand source code
class SimliAudioTrack(CustomAudioTrack):
    def __init__(self, loop: asyncio.AbstractEventLoop, simli_client: SimliClient):
        super().__init__()
        self.kind = "audio"
        self.loop = loop
        self._timestamp = 0
        self.simli_client = simli_client
        self.queue = asyncio.Queue()
        self._readyState = "live"

    @property
    def readyState(self):
        return self._readyState

    def interrupt(self):
        asyncio.ensure_future(self.simli_client.clearBuffer())
        while not self.queue.empty():
            try:
                self.queue.get_nowait()
            except asyncio.QueueEmpty:
                break

    async def recv(self) -> AudioFrame:
        """Return next audio frame to VideoSDK."""
        try:
            if self.readyState != "live":
                raise Exception("Track not live")
            frame = await self.queue.get()
            return frame

        except Exception:
            traceback.print_exc()
            return self._create_silence_frame()

    async def cleanup(self):
        self.interrupt()
        self.stop()

    def add_frame(self, frame: AudioFrame):
        """Add frame from Simli stream - add AudioFrame directly to buffer with quality validation"""

        if frame is None:
            return
        try:
            for resampled_frame in videosdk_output_resampler.resample(frame):
                self.queue.put_nowait(resampled_frame)
        except asyncio.QueueEmpty:
            pass
        except Exception as e:
            logger.error(f"Error adding Simli audio frame: {e}")

A dummy audio track which reads silence.

Ancestors

  • videosdk.custom_audio_track.CustomAudioTrack
  • vsaiortc.mediastreams.MediaStreamTrack
  • pyee.asyncio.AsyncIOEventEmitter
  • pyee.base.EventEmitter

Instance variables

prop readyState
Expand source code
@property
def readyState(self):
    return self._readyState

Methods

def add_frame(self, frame: av.audio.frame.AudioFrame)
Expand source code
def add_frame(self, frame: AudioFrame):
    """Add frame from Simli stream - add AudioFrame directly to buffer with quality validation"""

    if frame is None:
        return
    try:
        for resampled_frame in videosdk_output_resampler.resample(frame):
            self.queue.put_nowait(resampled_frame)
    except asyncio.QueueEmpty:
        pass
    except Exception as e:
        logger.error(f"Error adding Simli audio frame: {e}")

Add frame from Simli stream - add AudioFrame directly to buffer with quality validation

async def cleanup(self)
Expand source code
async def cleanup(self):
    self.interrupt()
    self.stop()
def interrupt(self)
Expand source code
def interrupt(self):
    asyncio.ensure_future(self.simli_client.clearBuffer())
    while not self.queue.empty():
        try:
            self.queue.get_nowait()
        except asyncio.QueueEmpty:
            break
async def recv(self) ‑> av.audio.frame.AudioFrame
Expand source code
async def recv(self) -> AudioFrame:
    """Return next audio frame to VideoSDK."""
    try:
        if self.readyState != "live":
            raise Exception("Track not live")
        frame = await self.queue.get()
        return frame

    except Exception:
        traceback.print_exc()
        return self._create_silence_frame()

Return next audio frame to VideoSDK.

class SimliAvatar (config: simli.simli.SimliConfig,
api_key: str | None = None,
transport_mode: str = 'P2P',
simli_url: str = 'https://api.simli.ai',
is_trinity_avatar: bool = False)
Expand source code
class SimliAvatar:
    def __init__(
        self,
        config: SimliConfig,
        api_key: str | None = None,
        transport_mode: str = "P2P",
        simli_url: str = DEFAULT_SIMLI_HTTP_URL,
        is_trinity_avatar: bool = False,
    ):
        """Initialize the Simli Avatar plugin.

        Args:
            config (SimliConfig): The configuration for the Simli avatar.
            api_key (str): The Simli API key.
            simli_url (str): The Simli API URL. Defaults to "https://api.simli.ai".
            is_trinity_avatar (bool): Specify if the face id in the simli config is a trinity avatar to ensure synchronization
        """
        super().__init__()
        self.config = config
        self.api_key = api_key or os.getenv("SIMLI_API_KEY")
        if isinstance(transport_mode, str):
            if transport_mode.upper() == "LIVEKIT":
                self.transport_mode = TransportMode.LIVEKIT
            elif transport_mode.upper() == "P2P":
                self.transport_mode = TransportMode.P2P
            else:
                logger.warning(
                    f"Unknown transport mode: {transport_mode}, defaulting to P2P"
                )
                self.transport_mode = TransportMode.P2P
        else:
            self.transport_mode = transport_mode
        self._stream_start_time = None
        self.video_track: SimliVideoTrack | None = None
        self.video_receiver = None
        self.audio_track: SimliAudioTrack | None = None
        self.audio_receiver = None
        self.run = True
        self._is_speaking = False
        self._avatar_speaking = False
        self._last_error = None
        self._stopping = False
        self._keep_alive_task = None
        self._last_audio_time = 0
        self._is_trinity_avatar = is_trinity_avatar
        self.simli_url = simli_url

    async def connect(self):
        await self._initialize_connection()
        if self._stream_start_time is None:
            self._stream_start_time = time.time()

        self._last_audio_time = time.time()
        self._keep_alive_task = asyncio.create_task(self._keep_alive_loop())
        return self.audio_track, self.video_track

    async def mark_silent(self):
        self._avatar_speaking = False

    async def mark_speaking(self):
        self._avatar_speaking = True

    async def _initialize_connection(self):
        """Initialize connection with retry logic"""
        self.simli_client = SimliClient(
            api_key=self.api_key,
            config=self.config,
            simliURL=self.simli_url,
            transport_mode=self.transport_mode
        )

        async def on_start():
            self.simli_client.ready.set()
            asyncio.create_task(self.simli_client.sendSilence(1.0))

        await self.simli_client.start()
        
        self.simli_client.registerEventCallback(SimliEvent.START, on_start)
        self.simli_client.registerEventCallback(SimliEvent.SILENT, self.mark_silent)
        self.simli_client.registerEventCallback(SimliEvent.SPEAK, self.mark_speaking)

        loop = asyncio.get_event_loop()
        self.audio_track = SimliAudioTrack(loop, self.simli_client)
        self.video_track = SimliVideoTrack(self._is_trinity_avatar)
        
        asyncio.ensure_future(self._process_video_frames())
        asyncio.ensure_future(self._process_audio_frames())

        logger.info("Waiting for Simli START event...")
        await self.simli_client.ready.wait()
        logger.info("Simli START event received, connection ready")

    async def _process_video_frames(self):
        """Simple video frame processing for real-time playback"""

        async for frame in self.simli_client.getVideoStreamIterator():
            if not self.run or self._stopping:
                break
            if frame is None:
                continue
            self.video_track.add_frame(frame)

    async def _process_audio_frames(self):
        """Simple audio frame processing for real-time playback"""

        async for frame in self.simli_client.getAudioStreamIterator():
            if not self.run or self._stopping:
                break
            if frame is None:
                logger.warning("Simli: Received None audio frame, continuing...")
                continue
            try:
                self.audio_track.add_frame(frame)
            except Exception as frame_error:
                logger.error(f"Simli: Error processing audio frame: {frame_error}")
                continue

    async def sendSilence(self, duration: float = 0.1875):
        """Send silence to bootstrap the connection"""
        await self.simli_client.ready.wait()
        await self.simli_client.sendSilence(duration)

    async def _speech_timeout_handler(self):
        try:
            await asyncio.sleep(0.2)
            if self._is_speaking:
                await self.simli_client.clearBuffer()
                self._is_speaking = False
        except asyncio.CancelledError:
            pass
        except Exception as e:
            logger.error(f"Error in speech timeout handler: {e}")

    async def handle_audio_input(self, audio_data: bytes):
        if not self.run or self._stopping:
            return
        if self.simli_client.ready.is_set():
            try:
                if len(audio_data) % 2 != 0:
                    audio_data = audio_data + b"\x00"

                audio_array = np.frombuffer(audio_data, dtype=np.int16)
                input_frame = AudioFrame.from_ndarray(
                    audio_array.reshape(1, -1), format="s16", layout="mono"
                )
                input_frame.sample_rate = 24000

                resampled_frames = simli_input_resampler.resample(input_frame)
                for frame in resampled_frames:
                    resampled_data = frame.to_ndarray().tobytes()

                    await self.simli_client.send(resampled_data)

                    self._last_audio_time = time.time()

            except Exception as e:
                logger.error(f"Error processing/sending audio data: {e}")
        else:
            logger.error(
                f"Simli: Cannot send audio - ws available: {self.simli_client is not None}, ready: {self.simli_client.ready.is_set()}"
            )

    async def interrupt(self):
        if self.audio_track:
            self.audio_track.interrupt()

    async def aclose(self):
        if self._stopping:
            return
        self._stopping = True
        self.run = False

        if self._keep_alive_task and not self._keep_alive_task.done():
            self._keep_alive_task.cancel()

        try:
            await self.simli_client.stop()
        except Exception:
            pass

    async def _keep_alive_loop(self):
        """Send periodic keep-alive audio to maintain Simli session"""

        while self.run and not self._stopping:
            try:
                current_time = time.time()
                if current_time - self._last_audio_time > 5.0:
                    if self.simli_client.ready.is_set():
                        try:
                            self._last_audio_time = current_time
                            await self.simli_client.sendSilence()

                        except Exception as e:
                            print(f"Simli: Keep-alive send failed: {e}")

                await asyncio.sleep(3.0)

            except Exception:
                if not self.run or self._stopping:
                    break
                await asyncio.sleep(1.0)

Initialize the Simli Avatar plugin.

Args

config : SimliConfig
The configuration for the Simli avatar.
api_key : str
The Simli API key.
simli_url : str
The Simli API URL. Defaults to "https://api.simli.ai".
is_trinity_avatar : bool
Specify if the face id in the simli config is a trinity avatar to ensure synchronization

Methods

async def aclose(self)
Expand source code
async def aclose(self):
    if self._stopping:
        return
    self._stopping = True
    self.run = False

    if self._keep_alive_task and not self._keep_alive_task.done():
        self._keep_alive_task.cancel()

    try:
        await self.simli_client.stop()
    except Exception:
        pass
async def connect(self)
Expand source code
async def connect(self):
    await self._initialize_connection()
    if self._stream_start_time is None:
        self._stream_start_time = time.time()

    self._last_audio_time = time.time()
    self._keep_alive_task = asyncio.create_task(self._keep_alive_loop())
    return self.audio_track, self.video_track
async def handle_audio_input(self, audio_data: bytes)
Expand source code
async def handle_audio_input(self, audio_data: bytes):
    if not self.run or self._stopping:
        return
    if self.simli_client.ready.is_set():
        try:
            if len(audio_data) % 2 != 0:
                audio_data = audio_data + b"\x00"

            audio_array = np.frombuffer(audio_data, dtype=np.int16)
            input_frame = AudioFrame.from_ndarray(
                audio_array.reshape(1, -1), format="s16", layout="mono"
            )
            input_frame.sample_rate = 24000

            resampled_frames = simli_input_resampler.resample(input_frame)
            for frame in resampled_frames:
                resampled_data = frame.to_ndarray().tobytes()

                await self.simli_client.send(resampled_data)

                self._last_audio_time = time.time()

        except Exception as e:
            logger.error(f"Error processing/sending audio data: {e}")
    else:
        logger.error(
            f"Simli: Cannot send audio - ws available: {self.simli_client is not None}, ready: {self.simli_client.ready.is_set()}"
        )
async def interrupt(self)
Expand source code
async def interrupt(self):
    if self.audio_track:
        self.audio_track.interrupt()
async def mark_silent(self)
Expand source code
async def mark_silent(self):
    self._avatar_speaking = False
async def mark_speaking(self)
Expand source code
async def mark_speaking(self):
    self._avatar_speaking = True
async def sendSilence(self, duration: float = 0.1875)
Expand source code
async def sendSilence(self, duration: float = 0.1875):
    """Send silence to bootstrap the connection"""
    await self.simli_client.ready.wait()
    await self.simli_client.sendSilence(duration)

Send silence to bootstrap the connection

class SimliVideoTrack (is_trinity_avatar=True)
Expand source code
class SimliVideoTrack(CustomVideoTrack):
    def __init__(self, is_trinity_avatar=True):
        super().__init__()
        self.kind = "video"
        self.queue = asyncio.Queue()
        self._readyState = "live"
        self._frame_rate = 25 if is_trinity_avatar else 30

        self._shared_start_time = None

    @property
    def readyState(self):
        return self._readyState

    async def recv(self) -> VideoFrame:
        return await self.queue.get()

    def add_frame(self, frame: VideoFrame):
        self.queue.put_nowait(frame)

A dummy video track which reads green frames.

Ancestors

  • videosdk.custom_video_track.CustomVideoTrack
  • vsaiortc.mediastreams.MediaStreamTrack
  • pyee.asyncio.AsyncIOEventEmitter
  • pyee.base.EventEmitter

Instance variables

prop readyState
Expand source code
@property
def readyState(self):
    return self._readyState

Methods

def add_frame(self, frame: av.video.frame.VideoFrame)
Expand source code
def add_frame(self, frame: VideoFrame):
    self.queue.put_nowait(frame)
async def recv(self) ‑> av.video.frame.VideoFrame
Expand source code
async def recv(self) -> VideoFrame:
    return await self.queue.get()

Receive the next :class:~av.video.frame.VideoFrame.

The base implementation just reads a 640x480 green frame at 30fps, subclass :class:VideoStreamTrack to provide a useful implementation.