Package videosdk.plugins.simli
Sub-modules
videosdk.plugins.simli.simli
Classes
class SimliAvatar (config: SimliConfig,
simli_url: str = 'https://api.simli.ai')-
Expand source code
class SimliAvatar: def __init__(self, config: SimliConfig, simli_url: str = DEFAULT_SIMLI_HTTP_URL): """Initialize the Simli Avatar plugin. Args: config (SimliConfig): The configuration for the Simli avatar. simli_url (str): The Simli API URL. Defaults to "https://api.simli.ai". """ super().__init__() self.config = config self._stream_start_time = None self.video_track = SimliVideoTrack() self.audio_track = None self.pc: Optional[RTCPeerConnection] = None self.simli_http_url = simli_url self.simli_ws_url = simli_url.replace( "http", "ws") if simli_url else DEFAULT_SIMLI_WS_URL self.run = True self._is_speaking = False self._speech_timeout_task = None self.ws: Optional[websockets.asyncio.client.WebSocketClientProtocol] = None self.dc = None self.ready = asyncio.Event() self._avatar_speaking = False self._last_reconnect_attempt = 0 self._message_handler_task = None self._retry_count = 3 self._last_error = None self._stopping = False self._keep_alive_task = None self._last_audio_time = 0 async def connect(self): loop = asyncio.get_event_loop() self.audio_track = SimliAudioTrack(loop) if self._stream_start_time is None: self._stream_start_time = time.time() self.video_track._shared_start_time = self._stream_start_time self.audio_track._shared_start_time = self._stream_start_time await self._initialize_connection() if hasattr(self.video_track, 'start'): self.video_track.start() if hasattr(self.audio_track, 'start'): self.audio_track.start() self._last_audio_time = time.time() self._keep_alive_task = asyncio.create_task(self._keep_alive_loop()) async def _initialize_connection(self): """Initialize connection with retry logic """ if self._retry_count == 0: raise Exception( f"Failed to connect to Simli servers. Last error: {self._last_error}") try: await self._start_session() await self._wait_for_data_channel() await self.sendSilence() except Exception as e: self._last_error = e self._retry_count -= 1 if self._retry_count > 0: await asyncio.sleep(2) await self._initialize_connection() else: raise async def _wait_for_data_channel(self): """Wait for the data channel to be open""" max_wait = 10 waited = 0 while waited < max_wait: if self.dc and self.dc.readyState == "open": return await asyncio.sleep(0.1) waited += 0.1 raise Exception("Data channel did not open within timeout") async def _start_session(self): try: await self._cleanup_connections() session_token = await self._http_start_session() await self._negotiate_webrtc_via_ws(session_token) except Exception as e: logger.error(f"Error starting Simli session: {e}") raise async def _http_start_session(self) -> str: """Sends a request to start a session and returns the session token.""" config_json = self.config.__dict__ async with AsyncClient() as client: resp = await client.post( f"{self.simli_http_url}/startAudioToVideoSession", json=config_json ) resp.raise_for_status() return resp.json()["session_token"] async def _negotiate_webrtc_via_ws(self, session_token: str): """Sets up WebRTC connection and negotiates it through WebSocket.""" payload = {"apiKey": self.config.apiKey} headers = {"Content-Type": "application/json"} loop = asyncio.get_event_loop() def fetch_ice_servers(): try: response = requests.post(DEFAULT_SIMLI_ICE_SERVER_URL, json=payload, headers=headers, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: raise RuntimeError(f"Failed to fetch ICE servers from Simli API: {e}") data = await loop.run_in_executor(None, fetch_ice_servers) if not isinstance(data, list): raise RuntimeError(f"Unexpected ICE server response format: {data}") ice_servers = [ RTCIceServer( urls=server.get("urls"), username=server.get("username"), credential=server.get("credential"), ) for server in data ] if not ice_servers: raise RuntimeError("No ICE servers returned from Simli API") self.pc = RTCPeerConnection(RTCConfiguration(iceServers=ice_servers)) self.pc.addTransceiver("audio", direction="recvonly") self.pc.addTransceiver("video", direction="recvonly") self.pc.on("track", self._on_track) self.dc = self.pc.createDataChannel("datachannel", ordered=True) await self.pc.setLocalDescription(await self.pc.createOffer()) while self.pc.iceGatheringState != "complete": await asyncio.sleep(0.01) json_offer = self.pc.localDescription.__dict__ ws_url = f"{self.simli_ws_url}/StartWebRTCSession" self.ws = await websockets.asyncio.client.connect(ws_url) await self.ws.send(json.dumps(json_offer)) await self.ws.recv() # ACK answer_str = await self.ws.recv() await self.ws.send(session_token) await self.ws.recv() # ACK ready_msg = await self.ws.recv() if ready_msg != "START": raise Exception( f"Failed to start Simli session. Expected START, got {ready_msg}") self.ready.set() await self.pc.setRemoteDescription(RTCSessionDescription(**json.loads(answer_str))) if self._message_handler_task: self._message_handler_task.cancel() self._message_handler_task = asyncio.create_task( self._handle_ws_messages()) async def _cleanup_connections(self): """Clean up existing connections before creating new ones""" if self._message_handler_task and not self._message_handler_task.done(): self._message_handler_task.cancel() try: await self._message_handler_task except asyncio.CancelledError: pass if self.ws: try: await self.ws.close() except: pass self.ws = None if self.pc: try: await self.pc.close() except: pass self.pc = None self.ready.clear() self._is_speaking = False if self._speech_timeout_task and not self._speech_timeout_task.done(): self._speech_timeout_task.cancel() def _on_track(self, track: MediaStreamTrack): if track.kind == "video": self.video_receiver_track = track asyncio.create_task(self._process_video_frames()) elif track.kind == "audio": self.audio_receiver_track = track asyncio.create_task(self._process_audio_frames()) async def _process_video_frames(self): """Simple video frame processing for real-time playback""" frame_count = 0 while self.run and not self._stopping: try: if not hasattr(self, 'video_receiver_track'): await asyncio.sleep(0.1) continue frame = await asyncio.wait_for(self.video_receiver_track.recv(), timeout=2.0) if frame is None: continue frame_count += 1 self.video_track.add_frame(frame) except asyncio.TimeoutError: if self.run and not self._stopping: continue else: break except Exception as e: logger.error(f"Simli: Video processing error: {e}") if not self.run or self._stopping: break await asyncio.sleep(0.1) continue async def _process_audio_frames(self): """Simple audio frame processing for real-time playback""" frame_count = 0 while self.run and not self._stopping: try: if not hasattr(self, 'audio_receiver_track'): await asyncio.sleep(0.1) continue frame = await asyncio.wait_for(self.audio_receiver_track.recv(), timeout=2.0) if frame is None: logger.warning( "Simli: Received None audio frame, continuing...") continue frame_count += 1 try: if not hasattr(frame, 'sample_rate') or frame.sample_rate != AUDIO_SAMPLE_RATE: frame.sample_rate = AUDIO_SAMPLE_RATE self.audio_track.add_frame(frame) except Exception as frame_error: logger.error( f"Simli: Error processing audio frame #{frame_count}: {frame_error}") continue except asyncio.TimeoutError: if self.run and not self._stopping: continue else: break except Exception as e: logger.error(f"Simli: Audio processing error: {e}") if not self.run or self._stopping: break await asyncio.sleep(0.1) continue async def sendSilence(self, duration: float = 0.1875): """Send silence to bootstrap the connection""" await self.ready.wait() silence_data = (0).to_bytes(2, "little") * int(16000 * duration) try: await self._send_audio_data(silence_data) except Exception as e: logger.error(f"Error sending bootstrap silence: {e}") async def _handle_ws_messages(self): """Handle WebSocket messages """ try: while self.run and not self._stopping: await self.ready.wait() message = await self.ws.recv() if message == "STOP": self.run = False await self.aclose() break elif "error" in message.lower(): self.run = False await self.aclose() break elif message == "SPEAK": self._avatar_speaking = True elif message == "SILENT": self._avatar_speaking = False elif message != "ACK": pass except Exception as e: logger.error(f"Error in Simli websocket message handler: {e}") if not self._stopping: self.run = False await self.aclose() async def _speech_timeout_handler(self): try: await asyncio.sleep(0.2) if self._is_speaking: await self.ws.send("AUDIO_END") self._is_speaking = False except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error in speech timeout handler: {e}") async def handle_audio_input(self, audio_data: bytes): if not self.run or self._stopping: return if self.ws and self.ready.is_set(): try: if len(audio_data) % 2 != 0: audio_data = audio_data + b'\x00' audio_array = np.frombuffer(audio_data, dtype=np.int16) input_frame = AudioFrame.from_ndarray( audio_array.reshape(1, -1), format="s16", layout="mono" ) input_frame.sample_rate = 24000 resampled_frames = audioResampler.resample(input_frame) if resampled_frames: resampled_data = resampled_frames[0].to_ndarray().tobytes() await self._send_audio_data(resampled_data) self._last_audio_time = time.time() except Exception as e: logger.error(f"Error processing/sending audio data: {e}") else: logger.error( f"Simli: Cannot send audio - ws available: {self.ws is not None}, ready: {self.ready.is_set()}") async def _send_audio_data(self, data: bytes): """Send audio data via WebSocket to simli """ try: for i in range(0, len(data), 6000): chunk = data[i:i + 6000] await self.ws.send(chunk) except Exception as e: logger.error(f"Error sending audio data via WebSocket: {e}") async def send_message(self, message: str): pass async def aclose(self): if self._stopping: return self._stopping = True self.run = False if self._keep_alive_task and not self._keep_alive_task.done(): self._keep_alive_task.cancel() if self._speech_timeout_task and not self._speech_timeout_task.done(): self._speech_timeout_task.cancel() try: if self.ws: await self.ws.send(b"DONE") except: pass await self._cleanup_connections() def set_agent(self, agent): pass async def _keep_alive_loop(self): """Send periodic keep-alive audio to maintain Simli session""" silence_duration = 0.1875 silence_data = (0).to_bytes(2, "little") * \ int(16000 * silence_duration) while self.run and not self._stopping: try: current_time = time.time() if current_time - self._last_audio_time > 5.0: if self.ws and self.ready.is_set(): try: await self._send_audio_data(silence_data) self._last_audio_time = current_time except Exception as e: print(f"Simli: Keep-alive send failed: {e}") await asyncio.sleep(3.0) except Exception as e: if not self.run or self._stopping: break await asyncio.sleep(1.0)
Initialize the Simli Avatar plugin.
Args
config
:SimliConfig
- The configuration for the Simli avatar.
simli_url
:str
- The Simli API URL. Defaults to "https://api.simli.ai".
Methods
async def aclose(self)
-
Expand source code
async def aclose(self): if self._stopping: return self._stopping = True self.run = False if self._keep_alive_task and not self._keep_alive_task.done(): self._keep_alive_task.cancel() if self._speech_timeout_task and not self._speech_timeout_task.done(): self._speech_timeout_task.cancel() try: if self.ws: await self.ws.send(b"DONE") except: pass await self._cleanup_connections()
async def connect(self)
-
Expand source code
async def connect(self): loop = asyncio.get_event_loop() self.audio_track = SimliAudioTrack(loop) if self._stream_start_time is None: self._stream_start_time = time.time() self.video_track._shared_start_time = self._stream_start_time self.audio_track._shared_start_time = self._stream_start_time await self._initialize_connection() if hasattr(self.video_track, 'start'): self.video_track.start() if hasattr(self.audio_track, 'start'): self.audio_track.start() self._last_audio_time = time.time() self._keep_alive_task = asyncio.create_task(self._keep_alive_loop())
async def handle_audio_input(self, audio_data: bytes)
-
Expand source code
async def handle_audio_input(self, audio_data: bytes): if not self.run or self._stopping: return if self.ws and self.ready.is_set(): try: if len(audio_data) % 2 != 0: audio_data = audio_data + b'\x00' audio_array = np.frombuffer(audio_data, dtype=np.int16) input_frame = AudioFrame.from_ndarray( audio_array.reshape(1, -1), format="s16", layout="mono" ) input_frame.sample_rate = 24000 resampled_frames = audioResampler.resample(input_frame) if resampled_frames: resampled_data = resampled_frames[0].to_ndarray().tobytes() await self._send_audio_data(resampled_data) self._last_audio_time = time.time() except Exception as e: logger.error(f"Error processing/sending audio data: {e}") else: logger.error( f"Simli: Cannot send audio - ws available: {self.ws is not None}, ready: {self.ready.is_set()}")
async def sendSilence(self, duration: float = 0.1875)
-
Expand source code
async def sendSilence(self, duration: float = 0.1875): """Send silence to bootstrap the connection""" await self.ready.wait() silence_data = (0).to_bytes(2, "little") * int(16000 * duration) try: await self._send_audio_data(silence_data) except Exception as e: logger.error(f"Error sending bootstrap silence: {e}")
Send silence to bootstrap the connection
async def send_message(self, message: str)
-
Expand source code
async def send_message(self, message: str): pass
def set_agent(self, agent)
-
Expand source code
def set_agent(self, agent): pass
class SimliConfig (apiKey: str,
faceId: str = '0c2b8b04-5274-41f1-a21c-d5c98322efa9',
syncAudio: bool = True,
handleSilence: bool = True,
maxSessionLength: int = 1800,
maxIdleTime: int = 300)-
Expand source code
@dataclass class SimliConfig: apiKey: str faceId: str = "0c2b8b04-5274-41f1-a21c-d5c98322efa9" syncAudio: bool = True handleSilence: bool = True maxSessionLength: int = 1800 # 30 minutes maxIdleTime: int = 300 # 5 minutes
SimliConfig(apiKey: str, faceId: str = '0c2b8b04-5274-41f1-a21c-d5c98322efa9', syncAudio: bool = True, handleSilence: bool = True, maxSessionLength: int = 1800, maxIdleTime: int = 300)
Instance variables
var apiKey : str
var faceId : str
var handleSilence : bool
var maxIdleTime : int
var maxSessionLength : int
var syncAudio : bool