Package videosdk.plugins.anam
Sub-modules
videosdk.plugins.anam.anam
Classes
class AnamAvatar (api_key: str,
avatar_id: str | None = None,
persona_config: anam.types.PersonaConfig | None = None)-
Expand source code
class AnamAvatar: def __init__( self, api_key: str, avatar_id: Optional[str] = None, persona_config: Optional[PersonaConfig] = None, ): """Initialize the Anam Avatar plugin. Args: api_key (str): The Anam API key. avatar_id (str, optional): The ID of the avatar to use. persona_config (PersonaConfig, optional): Full persona configuration (must include avatar_id). """ self.api_key = api_key if persona_config is not None: self.persona_config = persona_config else: self.persona_config = PersonaConfig( avatar_id=avatar_id or DEFAULT_AVATAR_ID, enable_audio_passthrough=True, ) self.client: Optional[AnamClient] = None self.session = None self.audio_track: Optional[AnamAudioTrack] = None self.video_track: Optional[AnamVideoTrack] = None self.run = True self._stopping = False self._input_stream = None self._tasks = [] self._end_sequence_task: Optional[asyncio.Task] = None self._end_sequence_debounce = 0.3 async def connect(self): """Connect to Anam and start processing streams.""" try: client_options = ClientOptions() self.client = AnamClient( api_key=self.api_key, persona_config=self.persona_config, options=client_options ) self.session = await self.client.connect_async() self.audio_track = AnamAudioTrack() self.video_track = AnamVideoTrack() input_config = AgentAudioInputConfig( sample_rate=ANAM_INPUT_SAMPLE_RATE, channels=1, encoding="pcm_s16le" ) self._input_stream = self.session.create_agent_audio_input_stream(input_config) self._tasks.append(asyncio.create_task(self._process_video_frames())) self._tasks.append(asyncio.create_task(self._process_audio_frames())) logger.info("Connected to Anam Avatar") except Exception as e: logger.error(f"Failed to connect to Anam: {e}") raise e async def _process_video_frames(self): """Process video frames from Anam.""" if not self.session: return try: async for frame in self.session.video_frames(): if not self.run or self._stopping: break if frame: self.video_track.add_frame(frame) except Exception as e: logger.error(f"Anam: Video processing error: {e}") finally: logger.info("Anam video processing stopped") async def _process_audio_frames(self): """Process audio frames from Anam.""" if not self.session: return try: async for frame in self.session.audio_frames(): if not self.run or self._stopping: break if frame: self.audio_track.add_frame(frame) except Exception as e: logger.error(f"Anam: Audio processing error: {e}") finally: logger.info("Anam audio processing stopped") async def _schedule_end_sequence(self): """ Debounced call to end the current TTS sequence. Anam expects end_sequence() once all TTS audio has been sent so it can move the avatar back into "listening" mode. We debounce this so short gaps between audio chunks don't prematurely end the sequence. """ try: await asyncio.sleep(self._end_sequence_debounce) if not self.run or self._stopping or self._input_stream is None: return await self._input_stream.end_sequence() except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error sending end_sequence to Anam: {e}") async def handle_audio_input(self, audio_data: bytes): """Handle audio input from VideoSDK pipeline and send to Anam.""" if not self.run or self._stopping or not self._input_stream: return try: if len(audio_data) % 2 != 0: audio_data = audio_data + b"\x00" audio_array = np.frombuffer(audio_data, dtype=np.int16) input_frame = AudioFrame.from_ndarray( audio_array.reshape(1, -1), format="s16", layout="mono" ) input_frame.sample_rate = 24000 resampled_frames = anam_input_resampler.resample(input_frame) for frame in resampled_frames: resampled_data = frame.to_ndarray().tobytes() await self._input_stream.send_audio_chunk(resampled_data) if self._end_sequence_task is not None: self._end_sequence_task.cancel() self._end_sequence_task = asyncio.create_task(self._schedule_end_sequence()) except Exception as e: logger.error(f"Error processing/sending Anam audio data: {e}") async def interrupt(self): """Interrupt the avatar.""" if self.session: try: logger.info("Interrupting Anam Avatar") if self._end_sequence_task is not None: self._end_sequence_task.cancel() if self.audio_track: self.audio_track.interrupt() await self.session.interrupt() if self._input_stream is not None: await self._input_stream.end_sequence() except Exception as e: logger.error(f"Error interrupting Anam: {e}") async def aclose(self): """Close the avatar plugin.""" if self._stopping: return self._stopping = True self.run = False for task in self._tasks: task.cancel() if self._end_sequence_task is not None: self._end_sequence_task.cancel() if self.session: try: await self.session.close() except Exception: pass if self.audio_track: self.audio_track.stop() if self.video_track: self.video_track.stop()Initialize the Anam Avatar plugin.
Args
api_key:str- The Anam API key.
avatar_id:str, optional- The ID of the avatar to use.
persona_config:PersonaConfig, optional- Full persona configuration (must include avatar_id).
Methods
async def aclose(self)-
Expand source code
async def aclose(self): """Close the avatar plugin.""" if self._stopping: return self._stopping = True self.run = False for task in self._tasks: task.cancel() if self._end_sequence_task is not None: self._end_sequence_task.cancel() if self.session: try: await self.session.close() except Exception: pass if self.audio_track: self.audio_track.stop() if self.video_track: self.video_track.stop()Close the avatar plugin.
async def connect(self)-
Expand source code
async def connect(self): """Connect to Anam and start processing streams.""" try: client_options = ClientOptions() self.client = AnamClient( api_key=self.api_key, persona_config=self.persona_config, options=client_options ) self.session = await self.client.connect_async() self.audio_track = AnamAudioTrack() self.video_track = AnamVideoTrack() input_config = AgentAudioInputConfig( sample_rate=ANAM_INPUT_SAMPLE_RATE, channels=1, encoding="pcm_s16le" ) self._input_stream = self.session.create_agent_audio_input_stream(input_config) self._tasks.append(asyncio.create_task(self._process_video_frames())) self._tasks.append(asyncio.create_task(self._process_audio_frames())) logger.info("Connected to Anam Avatar") except Exception as e: logger.error(f"Failed to connect to Anam: {e}") raise eConnect to Anam and start processing streams.
async def handle_audio_input(self, audio_data: bytes)-
Expand source code
async def handle_audio_input(self, audio_data: bytes): """Handle audio input from VideoSDK pipeline and send to Anam.""" if not self.run or self._stopping or not self._input_stream: return try: if len(audio_data) % 2 != 0: audio_data = audio_data + b"\x00" audio_array = np.frombuffer(audio_data, dtype=np.int16) input_frame = AudioFrame.from_ndarray( audio_array.reshape(1, -1), format="s16", layout="mono" ) input_frame.sample_rate = 24000 resampled_frames = anam_input_resampler.resample(input_frame) for frame in resampled_frames: resampled_data = frame.to_ndarray().tobytes() await self._input_stream.send_audio_chunk(resampled_data) if self._end_sequence_task is not None: self._end_sequence_task.cancel() self._end_sequence_task = asyncio.create_task(self._schedule_end_sequence()) except Exception as e: logger.error(f"Error processing/sending Anam audio data: {e}")Handle audio input from VideoSDK pipeline and send to Anam.
async def interrupt(self)-
Expand source code
async def interrupt(self): """Interrupt the avatar.""" if self.session: try: logger.info("Interrupting Anam Avatar") if self._end_sequence_task is not None: self._end_sequence_task.cancel() if self.audio_track: self.audio_track.interrupt() await self.session.interrupt() if self._input_stream is not None: await self._input_stream.end_sequence() except Exception as e: logger.error(f"Error interrupting Anam: {e}")Interrupt the avatar.