Package videosdk.plugins.simli

Sub-modules

videosdk.plugins.simli.simli

Classes

class SimliAvatar (config: SimliConfig,
simli_url: str = 'https://api.simli.ai')
Expand source code
class SimliAvatar:
    def __init__(self, config: SimliConfig, simli_url: str = DEFAULT_SIMLI_HTTP_URL):
        """Initialize the Simli Avatar plugin.

        Args:
            config (SimliConfig): The configuration for the Simli avatar.
            simli_url (str): The Simli API URL. Defaults to "https://api.simli.ai".
        """
        super().__init__()
        self.config = config
        self._stream_start_time = None
        self.video_track = SimliVideoTrack()
        self.audio_track = None
        self.pc: Optional[RTCPeerConnection] = None
        self.simli_http_url = simli_url
        self.simli_ws_url = simli_url.replace(
            "http", "ws") if simli_url else DEFAULT_SIMLI_WS_URL
        self.run = True
        self._is_speaking = False
        self._speech_timeout_task = None
        self.ws: Optional[websockets.asyncio.client.WebSocketClientProtocol] = None
        self.dc = None
        self.ready = asyncio.Event()
        self._avatar_speaking = False
        self._last_reconnect_attempt = 0
        self._message_handler_task = None
        self._retry_count = 3
        self._last_error = None
        self._stopping = False
        self._keep_alive_task = None
        self._last_audio_time = 0

    async def connect(self):
        loop = asyncio.get_event_loop()
        self.audio_track = SimliAudioTrack(loop)

        if self._stream_start_time is None:
            self._stream_start_time = time.time()
            self.video_track._shared_start_time = self._stream_start_time
            self.audio_track._shared_start_time = self._stream_start_time

        await self._initialize_connection()

        if hasattr(self.video_track, 'start'):
            self.video_track.start()
        if hasattr(self.audio_track, 'start'):
            self.audio_track.start()

        self._last_audio_time = time.time()
        self._keep_alive_task = asyncio.create_task(self._keep_alive_loop())

    async def _initialize_connection(self):
        """Initialize connection with retry logic """
        if self._retry_count == 0:
            raise Exception(
                f"Failed to connect to Simli servers. Last error: {self._last_error}")

        try:
            await self._start_session()
            await self._wait_for_data_channel()
            await self.sendSilence()
        except Exception as e:
            self._last_error = e
            self._retry_count -= 1
            if self._retry_count > 0:
                await asyncio.sleep(2)
                await self._initialize_connection()
            else:
                raise

    async def _wait_for_data_channel(self):
        """Wait for the data channel to be open"""
        max_wait = 10
        waited = 0
        while waited < max_wait:
            if self.dc and self.dc.readyState == "open":
                return
            await asyncio.sleep(0.1)
            waited += 0.1
        raise Exception("Data channel did not open within timeout")

    async def _start_session(self):
        try:
            await self._cleanup_connections()

            session_token = await self._http_start_session()
            await self._negotiate_webrtc_via_ws(session_token)
        except Exception as e:
            logger.error(f"Error starting Simli session: {e}")
            raise

    async def _http_start_session(self) -> str:
        """Sends a request to start a session and returns the session token."""
        config_json = self.config.__dict__
        async with AsyncClient() as client:
            resp = await client.post(
                f"{self.simli_http_url}/startAudioToVideoSession", json=config_json
            )
            resp.raise_for_status()
            return resp.json()["session_token"]

    async def _negotiate_webrtc_via_ws(self, session_token: str):
        """Sets up WebRTC connection and negotiates it through WebSocket."""
        payload = {"apiKey": self.config.apiKey}
        headers = {"Content-Type": "application/json"}

        loop = asyncio.get_event_loop()

        def fetch_ice_servers():
            try:
                response = requests.post(DEFAULT_SIMLI_ICE_SERVER_URL, json=payload, headers=headers, timeout=10)
                response.raise_for_status()
                return response.json()
            except requests.exceptions.RequestException as e:
                    raise RuntimeError(f"Failed to fetch ICE servers from Simli API: {e}")

        data = await loop.run_in_executor(None, fetch_ice_servers)

        if not isinstance(data, list):
            raise RuntimeError(f"Unexpected ICE server response format: {data}")

        ice_servers = [
            RTCIceServer(
                urls=server.get("urls"),
                username=server.get("username"),
                credential=server.get("credential"),
            )
            for server in data
        ]

        if not ice_servers:
            raise RuntimeError("No ICE servers returned from Simli API")
        
        self.pc = RTCPeerConnection(RTCConfiguration(iceServers=ice_servers))
        self.pc.addTransceiver("audio", direction="recvonly")
        self.pc.addTransceiver("video", direction="recvonly")
        self.pc.on("track", self._on_track)
        self.dc = self.pc.createDataChannel("datachannel", ordered=True)

        await self.pc.setLocalDescription(await self.pc.createOffer())
        while self.pc.iceGatheringState != "complete":
            await asyncio.sleep(0.01)

        json_offer = self.pc.localDescription.__dict__

        ws_url = f"{self.simli_ws_url}/StartWebRTCSession"
        self.ws = await websockets.asyncio.client.connect(ws_url)
        await self.ws.send(json.dumps(json_offer))
        await self.ws.recv()  # ACK

        answer_str = await self.ws.recv()
        await self.ws.send(session_token)
        await self.ws.recv()  # ACK

        ready_msg = await self.ws.recv()
        if ready_msg != "START":
            raise Exception(
                f"Failed to start Simli session. Expected START, got {ready_msg}")

        self.ready.set()
        await self.pc.setRemoteDescription(RTCSessionDescription(**json.loads(answer_str)))

        if self._message_handler_task:
            self._message_handler_task.cancel()
        self._message_handler_task = asyncio.create_task(
            self._handle_ws_messages())

    async def _cleanup_connections(self):
        """Clean up existing connections before creating new ones"""
        if self._message_handler_task and not self._message_handler_task.done():
            self._message_handler_task.cancel()
            try:
                await self._message_handler_task
            except asyncio.CancelledError:
                pass

        if self.ws:
            try:
                await self.ws.close()
            except:
                pass
            self.ws = None

        if self.pc:
            try:
                await self.pc.close()
            except:
                pass
            self.pc = None

        self.ready.clear()
        self._is_speaking = False
        if self._speech_timeout_task and not self._speech_timeout_task.done():
            self._speech_timeout_task.cancel()

    def _on_track(self, track: MediaStreamTrack):
        if track.kind == "video":
            self.video_receiver_track = track
            asyncio.create_task(self._process_video_frames())
        elif track.kind == "audio":
            self.audio_receiver_track = track
            asyncio.create_task(self._process_audio_frames())

    async def _process_video_frames(self):
        """Simple video frame processing for real-time playback"""
        frame_count = 0
        while self.run and not self._stopping:
            try:
                if not hasattr(self, 'video_receiver_track'):
                    await asyncio.sleep(0.1)
                    continue

                frame = await asyncio.wait_for(self.video_receiver_track.recv(), timeout=2.0)
                if frame is None:
                    continue

                frame_count += 1
                self.video_track.add_frame(frame)

            except asyncio.TimeoutError:
                if self.run and not self._stopping:
                    continue
                else:
                    break
            except Exception as e:
                logger.error(f"Simli: Video processing error: {e}")
                if not self.run or self._stopping:
                    break
                await asyncio.sleep(0.1)
                continue

    async def _process_audio_frames(self):
        """Simple audio frame processing for real-time playback"""
        frame_count = 0
        while self.run and not self._stopping:
            try:
                if not hasattr(self, 'audio_receiver_track'):
                    await asyncio.sleep(0.1)
                    continue

                frame = await asyncio.wait_for(self.audio_receiver_track.recv(), timeout=2.0)
                if frame is None:
                    logger.warning(
                        "Simli: Received None audio frame, continuing...")
                    continue

                frame_count += 1

                try:
                    if not hasattr(frame, 'sample_rate') or frame.sample_rate != AUDIO_SAMPLE_RATE:
                        frame.sample_rate = AUDIO_SAMPLE_RATE

                    self.audio_track.add_frame(frame)

                except Exception as frame_error:
                    logger.error(
                        f"Simli: Error processing audio frame #{frame_count}: {frame_error}")
                    continue

            except asyncio.TimeoutError:
                if self.run and not self._stopping:
                    continue
                else:
                    break
            except Exception as e:
                logger.error(f"Simli: Audio processing error: {e}")
                if not self.run or self._stopping:
                    break
                await asyncio.sleep(0.1)
                continue

    async def sendSilence(self, duration: float = 0.1875):
        """Send silence to bootstrap the connection"""
        await self.ready.wait()
        silence_data = (0).to_bytes(2, "little") * int(16000 * duration)
        try:
            await self._send_audio_data(silence_data)
        except Exception as e:
            logger.error(f"Error sending bootstrap silence: {e}")

    async def _handle_ws_messages(self):
        """Handle WebSocket messages """
        try:
            while self.run and not self._stopping:
                await self.ready.wait()
                message = await self.ws.recv()

                if message == "STOP":
                    self.run = False
                    await self.aclose()
                    break

                elif "error" in message.lower():
                    self.run = False
                    await self.aclose()
                    break

                elif message == "SPEAK":
                    self._avatar_speaking = True

                elif message == "SILENT":
                    self._avatar_speaking = False

                elif message != "ACK":
                    pass

        except Exception as e:
            logger.error(f"Error in Simli websocket message handler: {e}")
            if not self._stopping:
                self.run = False
                await self.aclose()

    async def _speech_timeout_handler(self):
        try:
            await asyncio.sleep(0.2)
            if self._is_speaking:
                await self.ws.send("AUDIO_END")
                self._is_speaking = False
        except asyncio.CancelledError:
            pass
        except Exception as e:
            logger.error(f"Error in speech timeout handler: {e}")

    async def handle_audio_input(self, audio_data: bytes):
        if not self.run or self._stopping:
            return

        if self.ws and self.ready.is_set():
            try:
                if len(audio_data) % 2 != 0:
                    audio_data = audio_data + b'\x00'

                audio_array = np.frombuffer(audio_data, dtype=np.int16)
                input_frame = AudioFrame.from_ndarray(
                    audio_array.reshape(1, -1), format="s16", layout="mono"
                )
                input_frame.sample_rate = 24000

                resampled_frames = audioResampler.resample(input_frame)
                if resampled_frames:
                    resampled_data = resampled_frames[0].to_ndarray().tobytes()

                    await self._send_audio_data(resampled_data)

                    self._last_audio_time = time.time()

            except Exception as e:
                logger.error(f"Error processing/sending audio data: {e}")
        else:
            logger.error(
                f"Simli: Cannot send audio - ws available: {self.ws is not None}, ready: {self.ready.is_set()}")

    async def _send_audio_data(self, data: bytes):
        """Send audio data via WebSocket to simli """
        try:
            for i in range(0, len(data), 6000):
                chunk = data[i:i + 6000]
                await self.ws.send(chunk)
        except Exception as e:
            logger.error(f"Error sending audio data via WebSocket: {e}")

    async def send_message(self, message: str):
        pass

    async def aclose(self):
        if self._stopping:
            return
        self._stopping = True
        self.run = False

        if self._keep_alive_task and not self._keep_alive_task.done():
            self._keep_alive_task.cancel()

        if self._speech_timeout_task and not self._speech_timeout_task.done():
            self._speech_timeout_task.cancel()

        try:
            if self.ws:
                await self.ws.send(b"DONE")
        except:
            pass

        await self._cleanup_connections()

    def set_agent(self, agent):
        pass

    async def _keep_alive_loop(self):
        """Send periodic keep-alive audio to maintain Simli session"""
        silence_duration = 0.1875
        silence_data = (0).to_bytes(2, "little") * \
            int(16000 * silence_duration)

        while self.run and not self._stopping:
            try:
                current_time = time.time()
                if current_time - self._last_audio_time > 5.0:
                    if self.ws and self.ready.is_set():
                        try:
                            await self._send_audio_data(silence_data)
                            self._last_audio_time = current_time
                        except Exception as e:
                            print(f"Simli: Keep-alive send failed: {e}")

                await asyncio.sleep(3.0)

            except Exception as e:
                if not self.run or self._stopping:
                    break
                await asyncio.sleep(1.0)

Initialize the Simli Avatar plugin.

Args

config : SimliConfig
The configuration for the Simli avatar.
simli_url : str
The Simli API URL. Defaults to "https://api.simli.ai".

Methods

async def aclose(self)
Expand source code
async def aclose(self):
    if self._stopping:
        return
    self._stopping = True
    self.run = False

    if self._keep_alive_task and not self._keep_alive_task.done():
        self._keep_alive_task.cancel()

    if self._speech_timeout_task and not self._speech_timeout_task.done():
        self._speech_timeout_task.cancel()

    try:
        if self.ws:
            await self.ws.send(b"DONE")
    except:
        pass

    await self._cleanup_connections()
async def connect(self)
Expand source code
async def connect(self):
    loop = asyncio.get_event_loop()
    self.audio_track = SimliAudioTrack(loop)

    if self._stream_start_time is None:
        self._stream_start_time = time.time()
        self.video_track._shared_start_time = self._stream_start_time
        self.audio_track._shared_start_time = self._stream_start_time

    await self._initialize_connection()

    if hasattr(self.video_track, 'start'):
        self.video_track.start()
    if hasattr(self.audio_track, 'start'):
        self.audio_track.start()

    self._last_audio_time = time.time()
    self._keep_alive_task = asyncio.create_task(self._keep_alive_loop())
async def handle_audio_input(self, audio_data: bytes)
Expand source code
async def handle_audio_input(self, audio_data: bytes):
    if not self.run or self._stopping:
        return

    if self.ws and self.ready.is_set():
        try:
            if len(audio_data) % 2 != 0:
                audio_data = audio_data + b'\x00'

            audio_array = np.frombuffer(audio_data, dtype=np.int16)
            input_frame = AudioFrame.from_ndarray(
                audio_array.reshape(1, -1), format="s16", layout="mono"
            )
            input_frame.sample_rate = 24000

            resampled_frames = audioResampler.resample(input_frame)
            if resampled_frames:
                resampled_data = resampled_frames[0].to_ndarray().tobytes()

                await self._send_audio_data(resampled_data)

                self._last_audio_time = time.time()

        except Exception as e:
            logger.error(f"Error processing/sending audio data: {e}")
    else:
        logger.error(
            f"Simli: Cannot send audio - ws available: {self.ws is not None}, ready: {self.ready.is_set()}")
async def sendSilence(self, duration: float = 0.1875)
Expand source code
async def sendSilence(self, duration: float = 0.1875):
    """Send silence to bootstrap the connection"""
    await self.ready.wait()
    silence_data = (0).to_bytes(2, "little") * int(16000 * duration)
    try:
        await self._send_audio_data(silence_data)
    except Exception as e:
        logger.error(f"Error sending bootstrap silence: {e}")

Send silence to bootstrap the connection

async def send_message(self, message: str)
Expand source code
async def send_message(self, message: str):
    pass
def set_agent(self, agent)
Expand source code
def set_agent(self, agent):
    pass
class SimliConfig (apiKey: str,
faceId: str = '0c2b8b04-5274-41f1-a21c-d5c98322efa9',
syncAudio: bool = True,
handleSilence: bool = True,
maxSessionLength: int = 1800,
maxIdleTime: int = 300)
Expand source code
@dataclass
class SimliConfig:
    apiKey: str
    faceId: str = "0c2b8b04-5274-41f1-a21c-d5c98322efa9"
    syncAudio: bool = True
    handleSilence: bool = True
    maxSessionLength: int = 1800  # 30 minutes
    maxIdleTime: int = 300       # 5 minutes

SimliConfig(apiKey: str, faceId: str = '0c2b8b04-5274-41f1-a21c-d5c98322efa9', syncAudio: bool = True, handleSilence: bool = True, maxSessionLength: int = 1800, maxIdleTime: int = 300)

Instance variables

var apiKey : str
var faceId : str
var handleSilence : bool
var maxIdleTime : int
var maxSessionLength : int
var syncAudio : bool