Package videosdk.plugins.anam

Sub-modules

videosdk.plugins.anam.anam

Classes

class AnamAvatar (api_key: str,
avatar_id: str | None = None,
persona_config: anam.types.PersonaConfig | None = None)
Expand source code
class AnamAvatar:
    def __init__(
        self,
        api_key: str,
        avatar_id: Optional[str] = None,
        persona_config: Optional[PersonaConfig] = None,
    ):
        """Initialize the Anam Avatar plugin.

        Args:
            api_key (str): The Anam API key.
            avatar_id (str, optional): The ID of the avatar to use.
            persona_config (PersonaConfig, optional): Full persona configuration (must include avatar_id).
        """
        self.api_key = api_key

        if persona_config is not None:
            self.persona_config = persona_config
        else:
            self.persona_config = PersonaConfig(
                avatar_id=avatar_id or DEFAULT_AVATAR_ID,
                enable_audio_passthrough=True,
            )

        self.client: Optional[AnamClient] = None
        self.session = None
        self.audio_track: Optional[AnamAudioTrack] = None
        self.video_track: Optional[AnamVideoTrack] = None
        
        self.run = True
        self._stopping = False
        self._input_stream = None
        self._tasks = []
        self._end_sequence_task: Optional[asyncio.Task] = None
        self._end_sequence_debounce = 0.3

    async def connect(self):
        """Connect to Anam and start processing streams."""
        try:
            client_options = ClientOptions()
            
            self.client = AnamClient(
                api_key=self.api_key,
                persona_config=self.persona_config,
                options=client_options
            )
            
            self.session = await self.client.connect_async()
            
            self.audio_track = AnamAudioTrack()
            self.video_track = AnamVideoTrack()
            
            input_config = AgentAudioInputConfig(
                sample_rate=ANAM_INPUT_SAMPLE_RATE,
                channels=1,
                encoding="pcm_s16le"
            )
            self._input_stream = self.session.create_agent_audio_input_stream(input_config)
            
            self._tasks.append(asyncio.create_task(self._process_video_frames()))
            self._tasks.append(asyncio.create_task(self._process_audio_frames()))
            
            logger.info("Connected to Anam Avatar")
            
        except Exception as e:
            logger.error(f"Failed to connect to Anam: {e}")
            raise e

    async def _process_video_frames(self):
        """Process video frames from Anam."""
        if not self.session:
            return

        try:
            async for frame in self.session.video_frames():
                if not self.run or self._stopping:
                    break
                if frame:
                    self.video_track.add_frame(frame)
        except Exception as e:
            logger.error(f"Anam: Video processing error: {e}")
        finally:
            logger.info("Anam video processing stopped")

    async def _process_audio_frames(self):
        """Process audio frames from Anam."""
        if not self.session:
            return

        try:
            async for frame in self.session.audio_frames():
                if not self.run or self._stopping:
                    break
                if frame:
                    self.audio_track.add_frame(frame)
        except Exception as e:
            logger.error(f"Anam: Audio processing error: {e}")
        finally:
            logger.info("Anam audio processing stopped")

    async def _schedule_end_sequence(self):
        """
        Debounced call to end the current TTS sequence.

        Anam expects end_sequence() once all TTS audio has been sent so it can
        move the avatar back into "listening" mode. We debounce this so short
        gaps between audio chunks don't prematurely end the sequence.
        """
        try:
            await asyncio.sleep(self._end_sequence_debounce)
            if not self.run or self._stopping or self._input_stream is None:
                return
            await self._input_stream.end_sequence()
        except asyncio.CancelledError:
            pass
        except Exception as e:
            logger.error(f"Error sending end_sequence to Anam: {e}")

    async def handle_audio_input(self, audio_data: bytes):
        """Handle audio input from VideoSDK pipeline and send to Anam."""
        if not self.run or self._stopping or not self._input_stream:
            return
            
        try:
            
            if len(audio_data) % 2 != 0:
                audio_data = audio_data + b"\x00"

            audio_array = np.frombuffer(audio_data, dtype=np.int16)
            input_frame = AudioFrame.from_ndarray(
                audio_array.reshape(1, -1), format="s16", layout="mono"
            )
            
            input_frame.sample_rate = 24000 
            resampled_frames = anam_input_resampler.resample(input_frame)
            
            for frame in resampled_frames:
                resampled_data = frame.to_ndarray().tobytes()
                await self._input_stream.send_audio_chunk(resampled_data)

            if self._end_sequence_task is not None:
                self._end_sequence_task.cancel()
            self._end_sequence_task = asyncio.create_task(self._schedule_end_sequence())

        except Exception as e:
            logger.error(f"Error processing/sending Anam audio data: {e}")

    async def interrupt(self):
        """Interrupt the avatar."""
        if self.session:
            try:
                logger.info("Interrupting Anam Avatar")
                if self._end_sequence_task is not None:
                    self._end_sequence_task.cancel()
                if self.audio_track:
                    self.audio_track.interrupt()

                await self.session.interrupt()

                if self._input_stream is not None:
                    await self._input_stream.end_sequence()
                    
            except Exception as e:
                logger.error(f"Error interrupting Anam: {e}")

    async def aclose(self):
        """Close the avatar plugin."""
        if self._stopping:
            return
        self._stopping = True
        self.run = False

        for task in self._tasks:
            task.cancel()

        if self._end_sequence_task is not None:
            self._end_sequence_task.cancel()
        
        if self.session:
            try:
                await self.session.close()
            except Exception:
                pass        

        if self.audio_track:
            self.audio_track.stop()
        if self.video_track:
            self.video_track.stop()

Initialize the Anam Avatar plugin.

Args

api_key : str
The Anam API key.
avatar_id : str, optional
The ID of the avatar to use.
persona_config : PersonaConfig, optional
Full persona configuration (must include avatar_id).

Methods

async def aclose(self)
Expand source code
async def aclose(self):
    """Close the avatar plugin."""
    if self._stopping:
        return
    self._stopping = True
    self.run = False

    for task in self._tasks:
        task.cancel()

    if self._end_sequence_task is not None:
        self._end_sequence_task.cancel()
    
    if self.session:
        try:
            await self.session.close()
        except Exception:
            pass        

    if self.audio_track:
        self.audio_track.stop()
    if self.video_track:
        self.video_track.stop()

Close the avatar plugin.

async def connect(self)
Expand source code
async def connect(self):
    """Connect to Anam and start processing streams."""
    try:
        client_options = ClientOptions()
        
        self.client = AnamClient(
            api_key=self.api_key,
            persona_config=self.persona_config,
            options=client_options
        )
        
        self.session = await self.client.connect_async()
        
        self.audio_track = AnamAudioTrack()
        self.video_track = AnamVideoTrack()
        
        input_config = AgentAudioInputConfig(
            sample_rate=ANAM_INPUT_SAMPLE_RATE,
            channels=1,
            encoding="pcm_s16le"
        )
        self._input_stream = self.session.create_agent_audio_input_stream(input_config)
        
        self._tasks.append(asyncio.create_task(self._process_video_frames()))
        self._tasks.append(asyncio.create_task(self._process_audio_frames()))
        
        logger.info("Connected to Anam Avatar")
        
    except Exception as e:
        logger.error(f"Failed to connect to Anam: {e}")
        raise e

Connect to Anam and start processing streams.

async def handle_audio_input(self, audio_data: bytes)
Expand source code
async def handle_audio_input(self, audio_data: bytes):
    """Handle audio input from VideoSDK pipeline and send to Anam."""
    if not self.run or self._stopping or not self._input_stream:
        return
        
    try:
        
        if len(audio_data) % 2 != 0:
            audio_data = audio_data + b"\x00"

        audio_array = np.frombuffer(audio_data, dtype=np.int16)
        input_frame = AudioFrame.from_ndarray(
            audio_array.reshape(1, -1), format="s16", layout="mono"
        )
        
        input_frame.sample_rate = 24000 
        resampled_frames = anam_input_resampler.resample(input_frame)
        
        for frame in resampled_frames:
            resampled_data = frame.to_ndarray().tobytes()
            await self._input_stream.send_audio_chunk(resampled_data)

        if self._end_sequence_task is not None:
            self._end_sequence_task.cancel()
        self._end_sequence_task = asyncio.create_task(self._schedule_end_sequence())

    except Exception as e:
        logger.error(f"Error processing/sending Anam audio data: {e}")

Handle audio input from VideoSDK pipeline and send to Anam.

async def interrupt(self)
Expand source code
async def interrupt(self):
    """Interrupt the avatar."""
    if self.session:
        try:
            logger.info("Interrupting Anam Avatar")
            if self._end_sequence_task is not None:
                self._end_sequence_task.cancel()
            if self.audio_track:
                self.audio_track.interrupt()

            await self.session.interrupt()

            if self._input_stream is not None:
                await self._input_stream.end_sequence()
                
        except Exception as e:
            logger.error(f"Error interrupting Anam: {e}")

Interrupt the avatar.