Module agents.avatar

Sub-modules

agents.avatar.avatar_audio_io
agents.avatar.avatar_auth
agents.avatar.avatar_controller
agents.avatar.avatar_schema
agents.avatar.avatar_synchronizer

Functions

def generate_avatar_credentials(api_key: str,
secret: str,
*,
participant_id: Optional[str] = None,
ttl_seconds: int = 3600) ‑> AvatarAuthCredentials
Expand source code
def generate_avatar_credentials(
    api_key: str,
    secret: str,
    *,
    participant_id: Optional[str] = None,
    ttl_seconds: int = 3600,
) -> AvatarAuthCredentials:
    """
    Generate a pre-signed VideoSDK token for an Avatar Server participant.

    Args:
        api_key: Your VideoSDK API key.
        secret: Your VideoSDK secret key.
        participant_id: Optional fixed participant ID. A random one is generated if omitted.
        ttl_seconds: Token validity in seconds (default 1 hour).

    Returns:
        AvatarAuthCredentials with participant_id and signed token.
    """
    try:
        import jwt
    except ImportError as exc:
        raise ImportError(
            "PyJWT is required for generate_avatar_credentials(). "
            "Install it with: pip install PyJWT"
        ) from exc

    pid = participant_id or f"{"avatar"}_{uuid.uuid4().hex[:8]}"
    now = int(time.time())
    payload = {
        "apikey": api_key,
        "permissions": ["allow_join"],
        "version": 2,
        "iat": now,
        "exp": now + ttl_seconds,
        "participantId": pid,
    }
    token = jwt.encode(payload, secret, algorithm="HS256")
    if isinstance(token, bytes):
        token = token.decode("utf-8")
    return AvatarAuthCredentials(participant_id=pid, token=token)

Generate a pre-signed VideoSDK token for an Avatar Server participant.

Args

api_key
Your VideoSDK API key.
secret
Your VideoSDK secret key.
participant_id
Optional fixed participant ID. A random one is generated if omitted.
ttl_seconds
Token validity in seconds (default 1 hour).

Returns

AvatarAuthCredentials with participant_id and signed token.

Classes

class AudioSegmentEnd
Expand source code
class AudioSegmentEnd:
    """Sentinel yielded by an AvatarRenderer to signal that a TTS segment has fully played."""
    pass

Sentinel yielded by an AvatarRenderer to signal that a TTS segment has fully played.

class AvatarAudioIn (meeting: Meeting | None, *, channels: int = 1, sample_rate: int = 24000)
Expand source code
class AvatarAudioIn(AvatarInput):
    """
    Avatar-worker-side receiver.

    Listens for data-channel messages from the agent and exposes them as an
    async iterator of AudioFrame / AudioSegmentEnd items. Control messages:
    - Raw bytes  → reconstruct AudioFrame and enqueue it.
    - INTERRUPT  → clear the queue and emit ``reset_stream``.
    - segment_end JSON → enqueue AudioSegmentEnd.

    Note: VideoSDK broadcasts data-channel messages to all participants, so
    every participant in the room sees every message.
    """

    _INTERRUPT_COOLDOWN = 0.3

    def __init__(
        self,
        meeting: Meeting | None,
        *,
        channels: int = 1,
        sample_rate: int = 24000,
    ):
        super().__init__()
        self._channels = channels
        self._sample_rate = sample_rate
        self._data_ch: asyncio.Queue[AudioFrame | AudioSegmentEnd] = asyncio.Queue()
        self._handler: _AvatarDataHandler | None = None
        self._meeting: Meeting | None = None
        self._interrupt_until: float = 0.0 
        if meeting:
            self.set_meeting(meeting)

    def set_meeting(self, meeting: Meeting) -> None:
        if self._meeting and self._handler:
            try:
                self._meeting.remove_event_listener(self._handler)
            except Exception:
                pass
            self._handler = None

        self._meeting = meeting
        self._handler = _AvatarDataHandler(callback=self._on_data)
        self._meeting.add_event_listener(self._handler)
        logger.info("AvatarAudioIn attached to meeting")

    def notify_stream_ended(self, playback_position: float, interrupted: bool) -> None:
        asyncio.create_task(self._send_stream_ended(playback_position, interrupted))

    def __aiter__(self):
        return self

    async def __anext__(self) -> AudioFrame | AudioSegmentEnd:
        try:
            return await self._data_ch.get()
        except asyncio.CancelledError:
            raise StopAsyncIteration

    async def aclose(self) -> None:
        if self._meeting and self._handler:
            try:
                self._meeting.remove_event_listener(self._handler)
            except Exception:
                pass
            self._handler = None

    def _on_data(self, data: dict) -> None:
        payload = data.get("payload", b"")
        try:
            if isinstance(payload, memoryview):
                payload = payload.tobytes()

            if payload == MSG_INTERRUPT or payload == "INTERRUPT":
                self._handle_interrupt()
                return

            if isinstance(payload, (bytes, bytearray)):
                self._handle_audio_bytes(payload)
                return

            if isinstance(payload, str):
                self._handle_text_payload(payload)
        except Exception as e:
            logger.error("AvatarAudioIn: error processing message: %s", e)

    def _handle_audio_bytes(self, raw: bytes) -> None:
        import time as _time
        if _time.monotonic() < self._interrupt_until:
            return
        if len(raw) % 2 != 0:
            raw = raw + b"\x00"
        array = np.frombuffer(raw, dtype=np.int16)
        mono = array.reshape(-1, 1)
        if self._channels == 2:
            stereo = np.column_stack([mono[:, 0], mono[:, 0]])
            array_out = stereo
        else:
            array_out = mono
        frame = AudioFrame.from_ndarray(
            array_out.T,
            format="s16",
            layout="mono" if self._channels == 1 else "stereo",
        )
        frame.sample_rate = self._sample_rate
        self._data_ch.put_nowait(frame)

    def _handle_interrupt(self) -> None:
        import time as _time
        self._interrupt_until = _time.monotonic() + self._INTERRUPT_COOLDOWN
        while not self._data_ch.empty():
            try:
                self._data_ch.get_nowait()
            except asyncio.QueueEmpty:
                break
        self.emit("reset_stream")
        logger.info("AvatarAudioIn: INTERRUPT received, buffer cleared, cooldown %.1fs", self._INTERRUPT_COOLDOWN)

    def _handle_text_payload(self, payload: str) -> None:
        if not payload:
            return
        try:
            msg = json.loads(payload)
        except (json.JSONDecodeError, ValueError):
            return

        if msg.get("type") == MSG_TYPE_SEGMENT_END:
            self._data_ch.put_nowait(AudioSegmentEnd())
            logger.debug("AvatarAudioIn: segment_end received")
        elif msg.get("type") == MSG_TYPE_STREAM_ENDED:
            pass

    async def _send_stream_ended(self, playback_position: float, interrupted: bool) -> None:
        if not self._meeting:
            return
        payload = json.dumps(
            {
                "type": MSG_TYPE_STREAM_ENDED,
                "data": {
                    "playback_position": playback_position,
                    "interrupted": interrupted,
                },
            }
        )
        await self._meeting.send(
            payload, {"reliability": ReliabilityModes.RELIABLE.value}
        )
        logger.debug(
            "AvatarAudioIn: sent stream_ended (pos=%.3fs, interrupted=%s)",
            playback_position,
            interrupted,
        )

Avatar-worker-side receiver.

Listens for data-channel messages from the agent and exposes them as an async iterator of AudioFrame / AudioSegmentEnd items. Control messages: - Raw bytes → reconstruct AudioFrame and enqueue it. - INTERRUPT → clear the queue and emit reset_stream. - segment_end JSON → enqueue AudioSegmentEnd.

Note: VideoSDK broadcasts data-channel messages to all participants, so every participant in the room sees every message.

Ancestors

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._meeting and self._handler:
        try:
            self._meeting.remove_event_listener(self._handler)
        except Exception:
            pass
        self._handler = None
def set_meeting(self, meeting: Meeting) ‑> None
Expand source code
def set_meeting(self, meeting: Meeting) -> None:
    if self._meeting and self._handler:
        try:
            self._meeting.remove_event_listener(self._handler)
        except Exception:
            pass
        self._handler = None

    self._meeting = meeting
    self._handler = _AvatarDataHandler(callback=self._on_data)
    self._meeting.add_event_listener(self._handler)
    logger.info("AvatarAudioIn attached to meeting")

Inherited members

class AvatarAudioOut (*,
credentials: AvatarAuthCredentials,
avatar_dispatcher_url: Optional[str] = None,
room_id: Optional[str] = None)
Expand source code
class AvatarAudioOut:
    """
    Agent-side handle for the avatar data channel.

    Responsibilities:
    - Spin up the Avatar Server via an HTTP dispatcher.
    - Stream raw PCM audio chunks to the worker (UNRELIABLE).
    - Send ``segment_end`` control messages (RELIABLE) so the worker knows
      when a TTS turn has finished — this is what allows ``notify_stream_ended``
      to fire on the worker side.
    - Send ``INTERRUPT`` (RELIABLE) when the agent interrupts its output.
    - Receive ``stream_ended`` acks from the worker via an on_data listener.
    """

    def __init__(
        self,
        *,
        credentials: AvatarAuthCredentials,
        avatar_dispatcher_url: Optional[str] = None,
        room_id: Optional[str] = None,
    ):
        self._credentials = credentials
        self._avatar_dispatcher_url = avatar_dispatcher_url
        self._room_id = room_id
        self._meeting: Meeting | None = None
        self._ack_handler: _AvatarAckHandler | None = None
        self._participant_id: str = credentials.participant_id
        self.video_track = None
        self.audio_track = None
        
    def set_room_id(self, room_id: str) -> None:
        self._room_id = room_id

    @property
    def participant_id(self) -> str:
        return self._participant_id

    async def connect(self) -> None:
        """Call the avatar dispatcher so the worker process joins the room."""
        await self._avatar_spinup()

    def _set_meeting(self, meeting: Meeting) -> None:
        """
        Inject the live Meeting object. Called by the framework after the agent
        has joined the room. Also registers the ack listener.
        """
        self._meeting = meeting
        self._ack_handler = _AvatarAckHandler(on_stream_ended=self._on_stream_ended)
        self._meeting.add_event_listener(self._ack_handler)
        logger.info("AvatarAudioOut attached to meeting: %s", meeting.id)

    async def _avatar_spinup(self) -> None:
        if not self._avatar_dispatcher_url:
            logger.info("AvatarAudioOut: No dispatcher URL provided, skipping local avatar spinup.")
            return

        if not self._room_id:
            raise ValueError("room_id must be set before calling connect()")

        join_info = AvatarJoinInfo(
            room_name=self._room_id,
            token=self._credentials.token,
            participant_id=self._credentials.participant_id,
        )
        logger.info(
            "Sending connection info to avatar dispatcher %s (participant_id=%s)",
            self._avatar_dispatcher_url,
            self._credentials.participant_id,
        )
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self._avatar_dispatcher_url, json=asdict(join_info)
            )
            response.raise_for_status()
        logger.info("Avatar handshake completed")


    async def handle_audio_input(self, audio_data: bytes) -> None:
        """
        Chunk and send raw PCM bytes to the Avatar Server via data channel.
        Uses UNRELIABLE mode for low-latency streaming.
        """
        if not self._meeting:
            return

        MAX_CHUNK = 15_000
        for i in range(0, len(audio_data), MAX_CHUNK):
            chunk = audio_data[i : i + MAX_CHUNK]
            if not chunk:
                continue
            if len(chunk) % 2 != 0:
                chunk = chunk + b"\x00"
            try:
                await self._meeting.send(
                    chunk, {"reliability": ReliabilityModes.UNRELIABLE.value}
                )
            except Exception:
                # Data channel closed (e.g. participant left) — stop sending
                logger.debug("AvatarAudioOut: data channel closed, dropping remaining audio")
                self._meeting = None
                return

    async def send_segment_end(self) -> None:
        """
        Notify the Avatar Server that the current TTS segment has finished.
        This causes the receiver to enqueue AudioSegmentEnd so the controller
        can call notify_stream_ended(interrupted=False).
        """
        if not self._meeting:
            return
        try:
            payload = json.dumps({"type": MSG_TYPE_SEGMENT_END})
            await self._meeting.send(
                payload, {"reliability": ReliabilityModes.RELIABLE.value}
            )
            logger.debug("AvatarAudioOut: sent segment_end")
        except Exception:
            self._meeting = None

    async def interrupt(self) -> None:
        """Tell the Avatar Server to immediately stop playback."""
        if not self._meeting:
            return
        try:
            await self._meeting.send(
                MSG_INTERRUPT, {"reliability": ReliabilityModes.RELIABLE.value}
            )
            logger.info("AvatarAudioOut: sent INTERRUPT to Avatar Server")
        except Exception:
            self._meeting = None


    def _on_stream_ended(self, playback_position: float, interrupted: bool) -> None:
        logger.info(
            "AvatarAudioOut: stream_ended ack received (pos=%.3fs, interrupted=%s)",
            playback_position,
            interrupted,
        )

    async def aclose(self) -> None:
        if self._meeting and self._ack_handler:
            try:
                self._meeting.remove_event_listener(self._ack_handler)
            except Exception:
                pass
        self._ack_handler = None

Agent-side handle for the avatar data channel.

Responsibilities: - Spin up the Avatar Server via an HTTP dispatcher. - Stream raw PCM audio chunks to the worker (UNRELIABLE). - Send segment_end control messages (RELIABLE) so the worker knows when a TTS turn has finished — this is what allows notify_stream_ended to fire on the worker side. - Send INTERRUPT (RELIABLE) when the agent interrupts its output. - Receive stream_ended acks from the worker via an on_data listener.

Instance variables

prop participant_id : str
Expand source code
@property
def participant_id(self) -> str:
    return self._participant_id

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._meeting and self._ack_handler:
        try:
            self._meeting.remove_event_listener(self._ack_handler)
        except Exception:
            pass
    self._ack_handler = None
async def connect(self) ‑> None
Expand source code
async def connect(self) -> None:
    """Call the avatar dispatcher so the worker process joins the room."""
    await self._avatar_spinup()

Call the avatar dispatcher so the worker process joins the room.

async def handle_audio_input(self, audio_data: bytes) ‑> None
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None:
    """
    Chunk and send raw PCM bytes to the Avatar Server via data channel.
    Uses UNRELIABLE mode for low-latency streaming.
    """
    if not self._meeting:
        return

    MAX_CHUNK = 15_000
    for i in range(0, len(audio_data), MAX_CHUNK):
        chunk = audio_data[i : i + MAX_CHUNK]
        if not chunk:
            continue
        if len(chunk) % 2 != 0:
            chunk = chunk + b"\x00"
        try:
            await self._meeting.send(
                chunk, {"reliability": ReliabilityModes.UNRELIABLE.value}
            )
        except Exception:
            # Data channel closed (e.g. participant left) — stop sending
            logger.debug("AvatarAudioOut: data channel closed, dropping remaining audio")
            self._meeting = None
            return

Chunk and send raw PCM bytes to the Avatar Server via data channel. Uses UNRELIABLE mode for low-latency streaming.

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Tell the Avatar Server to immediately stop playback."""
    if not self._meeting:
        return
    try:
        await self._meeting.send(
            MSG_INTERRUPT, {"reliability": ReliabilityModes.RELIABLE.value}
        )
        logger.info("AvatarAudioOut: sent INTERRUPT to Avatar Server")
    except Exception:
        self._meeting = None

Tell the Avatar Server to immediately stop playback.

async def send_segment_end(self) ‑> None
Expand source code
async def send_segment_end(self) -> None:
    """
    Notify the Avatar Server that the current TTS segment has finished.
    This causes the receiver to enqueue AudioSegmentEnd so the controller
    can call notify_stream_ended(interrupted=False).
    """
    if not self._meeting:
        return
    try:
        payload = json.dumps({"type": MSG_TYPE_SEGMENT_END})
        await self._meeting.send(
            payload, {"reliability": ReliabilityModes.RELIABLE.value}
        )
        logger.debug("AvatarAudioOut: sent segment_end")
    except Exception:
        self._meeting = None

Notify the Avatar Server that the current TTS segment has finished. This causes the receiver to enqueue AudioSegmentEnd so the controller can call notify_stream_ended(interrupted=False).

def set_room_id(self, room_id: str) ‑> None
Expand source code
def set_room_id(self, room_id: str) -> None:
    self._room_id = room_id
class AvatarAuthCredentials (participant_id: str, token: str, attributes: Optional[dict[str, str]] = None)
Expand source code
@dataclass(frozen=True)
class AvatarAuthCredentials:
    """Pre-signed credentials that allow an Avatar Server to join a VideoSDK room."""

    participant_id: str
    token: str
    attributes: Optional[dict[str, str]] = None

Pre-signed credentials that allow an Avatar Server to join a VideoSDK room.

Instance variables

var attributes : dict[str, str] | None
var participant_id : str
var token : str
class AvatarInput
Expand source code
class AvatarInput(ABC, EventEmitter[Literal["reset_stream"]]):
    """
    Abstract base for the avatar-worker-side audio receiver.

    The Avatar Server iterates over an AvatarInput to receive AudioFrame objects
    from the agent. When the agent finishes a TTS segment it sends a
    ``segment_end`` control message which causes the receiver to enqueue an
    AudioSegmentEnd sentinel. The receiver also emits a ``reset_stream`` event
    when an interrupt arrives from the agent.
    """

    def __init__(self):
        super().__init__()

    async def start_stream(self) -> None:
        """Optional hook called before the first frame is consumed."""
        pass

    @abstractmethod
    def notify_stream_ended(
        self, playback_position: float, interrupted: bool
    ) -> None | Coroutine[None, None, None]:
        """Send a stream_ended ack back to the agent."""

    @abstractmethod
    def __aiter__(self) -> AsyncIterator[AudioFrame | AudioSegmentEnd]:
        """Yield AudioFrame items, then AudioSegmentEnd when the segment is done."""

    async def aclose(self) -> None:
        pass

Abstract base for the avatar-worker-side audio receiver.

The Avatar Server iterates over an AvatarInput to receive AudioFrame objects from the agent. When the agent finishes a TTS segment it sends a segment_end control message which causes the receiver to enqueue an AudioSegmentEnd sentinel. The receiver also emits a reset_stream event when an interrupt arrives from the agent.

Ancestors

Subclasses

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    pass
def notify_stream_ended(self, playback_position: float, interrupted: bool) ‑> None | collections.abc.Coroutine[None, None, None]
Expand source code
@abstractmethod
def notify_stream_ended(
    self, playback_position: float, interrupted: bool
) -> None | Coroutine[None, None, None]:
    """Send a stream_ended ack back to the agent."""

Send a stream_ended ack back to the agent.

async def start_stream(self) ‑> None
Expand source code
async def start_stream(self) -> None:
    """Optional hook called before the first frame is consumed."""
    pass

Optional hook called before the first frame is consumed.

Inherited members

class AvatarJoinInfo (room_name: str,
token: str,
participant_id: Optional[str] = None,
signaling_base_url: Optional[str] = None)
Expand source code
@dataclass(frozen=True)
class AvatarJoinInfo:
    """Payload sent to the avatar dispatcher so it can join the room."""

    room_name: str
    token: str
    participant_id: Optional[str] = None
    signaling_base_url: Optional[str] = None

Payload sent to the avatar dispatcher so it can join the room.

Instance variables

var participant_id : str | None
var room_name : str
var signaling_base_url : str | None
var token : str
class AvatarRenderer
Expand source code
class AvatarRenderer(ABC):
    """
    Abstract base for the avatar-worker-side video/audio renderer.

    An AvatarRenderer receives audio frames via push_stream_chunk and yields
    interleaved VideoFrame / AudioFrame items plus an AudioSegmentEnd sentinel
    when the current segment has been fully rendered.
    """

    @abstractmethod
    async def push_stream_chunk(self, frame: AudioFrame | AudioSegmentEnd) -> None:
        """Receive an audio frame (or segment-end sentinel) from the controller."""

    @abstractmethod
    def reset_stream(self) -> None | Coroutine[None, None, None]:
        """Immediately discard buffered audio (called on interrupt)."""

    @abstractmethod
    def __aiter__(
        self,
    ) -> AsyncIterator[VideoFrame | AudioFrame | AudioSegmentEnd]:
        """Yield interleaved video+audio frames, then AudioSegmentEnd."""

Abstract base for the avatar-worker-side video/audio renderer.

An AvatarRenderer receives audio frames via push_stream_chunk and yields interleaved VideoFrame / AudioFrame items plus an AudioSegmentEnd sentinel when the current segment has been fully rendered.

Ancestors

  • abc.ABC

Methods

async def push_stream_chunk(self,
frame: AudioFrame | AudioSegmentEnd) ‑> None
Expand source code
@abstractmethod
async def push_stream_chunk(self, frame: AudioFrame | AudioSegmentEnd) -> None:
    """Receive an audio frame (or segment-end sentinel) from the controller."""

Receive an audio frame (or segment-end sentinel) from the controller.

def reset_stream(self) ‑> None | collections.abc.Coroutine[None, None, None]
Expand source code
@abstractmethod
def reset_stream(self) -> None | Coroutine[None, None, None]:
    """Immediately discard buffered audio (called on interrupt)."""

Immediately discard buffered audio (called on interrupt).

class AvatarServer (meeting: Any | None,
*,
audio_recv: AvatarInput,
video_gen: AvatarRenderer,
options: AvatarSettings)
Expand source code
class AvatarServer:
    """
    Orchestrates the Avatar Server side.

    Reads audio from an AvatarInput, drives it through an AvatarRenderer, and
    forwards the resulting audio+video frames to the room via AvatarSynchronizer. Handles
    segment completion and interruption signals.
    """

    def __init__(
        self,
        meeting: Any | None,
        *,
        audio_recv: AvatarInput,
        video_gen: AvatarRenderer,
        options: AvatarSettings,
    ) -> None:
        self._meeting = meeting
        self._video_gen = video_gen
        self._options = options
        self._audio_recv = audio_recv
        self._playback_position = 0.0
        self._audio_playing = False
        self._tasks: set[asyncio.Task[Any]] = set()

        self._audio_track = AvatarVoiceTrack(
            sample_rate=options.audio_sample_rate,
            num_channels=options.audio_channels,
        )
        self._video_track = AvatarVisualTrack(
            width=options.video_width,
            height=options.video_height,
            fps=options.video_fps,
        )
        self._av_sync = AvatarSynchronizer(
            audio_track=self._audio_track,
            video_track=self._video_track,
            video_fps=options.video_fps,
        )
        self._read_audio_atask: asyncio.Task[None] | None = None
        self._forward_video_atask: asyncio.Task[None] | None = None

    @property
    def av_sync(self) -> AvatarSynchronizer:
        return self._av_sync

    async def start(self) -> None:
        """Start audio/video processing tasks."""
        await self._audio_recv.start_stream()
        self._audio_recv.on("reset_stream", self._on_reset_stream)
        self._read_audio_atask = asyncio.create_task(self._read_audio())
        self._forward_video_atask = asyncio.create_task(self._forward_video())

    async def wait_for_complete(self) -> None:
        if not self._read_audio_atask or not self._forward_video_atask:
            raise RuntimeError("AvatarServer not started")
        await asyncio.gather(self._read_audio_atask, self._forward_video_atask)

    async def _read_audio(self) -> None:
        async for frame in self._audio_recv:
            if not self._audio_playing and isinstance(frame, AudioFrame):
                self._audio_playing = True
            await self._video_gen.push_stream_chunk(frame)

    async def _forward_video(self) -> None:
        async for frame in self._video_gen:
            if isinstance(frame, AudioSegmentEnd):
                if self._audio_playing:
                    notify_task = self._audio_recv.notify_stream_ended(
                        playback_position=self._playback_position,
                        interrupted=False,
                    )
                    self._audio_playing = False
                    self._playback_position = 0.0
                    if asyncio.iscoroutine(notify_task):
                        task = asyncio.create_task(notify_task)
                        self._tasks.add(task)
                        task.add_done_callback(self._tasks.discard)
                continue

            await self._av_sync.push(frame)
            if isinstance(frame, AudioFrame):
                self._playback_position += frame.samples / frame.sample_rate

    def _on_reset_stream(self) -> None:

        self._audio_track._audio_buffer.clear()
        self._audio_track._is_speaking = False
        logger.info("AvatarServer: reset_stream — audio buffer cleared (sync)")

        maybe_coro = self._video_gen.reset_stream()

        async def _handle_reset(audio_playing: bool) -> None:
            if asyncio.iscoroutine(maybe_coro):
                await maybe_coro

            self._audio_track._audio_buffer.clear()
            logger.info("AvatarServer: reset_stream — audio buffer cleared (async follow-up)")

            if audio_playing:
                notify_task = self._audio_recv.notify_stream_ended(
                    playback_position=self._playback_position,
                    interrupted=True,
                )
                self._playback_position = 0.0
                if asyncio.iscoroutine(notify_task):
                    await notify_task

        task = asyncio.create_task(_handle_reset(self._audio_playing))
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)
        self._audio_playing = False

    async def aclose(self) -> None:
        await self._audio_recv.aclose()
        if self._forward_video_atask:
            self._forward_video_atask.cancel()
        if self._read_audio_atask:
            self._read_audio_atask.cancel()
        for task in list(self._tasks):
            task.cancel()
        await self._av_sync.aclose()

Orchestrates the Avatar Server side.

Reads audio from an AvatarInput, drives it through an AvatarRenderer, and forwards the resulting audio+video frames to the room via AvatarSynchronizer. Handles segment completion and interruption signals.

Instance variables

prop av_syncAvatarSynchronizer
Expand source code
@property
def av_sync(self) -> AvatarSynchronizer:
    return self._av_sync

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await self._audio_recv.aclose()
    if self._forward_video_atask:
        self._forward_video_atask.cancel()
    if self._read_audio_atask:
        self._read_audio_atask.cancel()
    for task in list(self._tasks):
        task.cancel()
    await self._av_sync.aclose()
async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    """Start audio/video processing tasks."""
    await self._audio_recv.start_stream()
    self._audio_recv.on("reset_stream", self._on_reset_stream)
    self._read_audio_atask = asyncio.create_task(self._read_audio())
    self._forward_video_atask = asyncio.create_task(self._forward_video())

Start audio/video processing tasks.

async def wait_for_complete(self) ‑> None
Expand source code
async def wait_for_complete(self) -> None:
    if not self._read_audio_atask or not self._forward_video_atask:
        raise RuntimeError("AvatarServer not started")
    await asyncio.gather(self._read_audio_atask, self._forward_video_atask)
class AvatarSettings (video_width: int,
video_height: int,
video_fps: float,
audio_sample_rate: int,
audio_channels: int)
Expand source code
@dataclass
class AvatarSettings:
    """Configuration for the Avatar Server's A/V output."""

    video_width: int
    video_height: int
    video_fps: float
    audio_sample_rate: int
    audio_channels: int

Configuration for the Avatar Server's A/V output.

Instance variables

var audio_channels : int
var audio_sample_rate : int
var video_fps : float
var video_height : int
var video_width : int
class AvatarSynchronizer (audio_track: CustomAudioTrack, video_track: CustomVideoTrack, video_fps: float)
Expand source code
class AvatarSynchronizer:
    """
    Paces audio and video frames into their respective custom tracks so that
    audio and video stay in sync at the configured FPS.
    """

    def __init__(
        self,
        audio_track: CustomAudioTrack,
        video_track: CustomVideoTrack,
        video_fps: float,
    ):
        self._audio_track = audio_track
        self._video_track = video_track
        self._video_fps = video_fps
        self._frame_interval = 1.0 / video_fps
        self._last_frame_time = 0.0
        self._start_time = 0.0

    async def push(self, frame: Union[AudioFrame, VideoFrame, AudioSegmentEnd]) -> None:
        if not self._start_time:
            self._start_time = time.monotonic()

        if isinstance(frame, AudioFrame):
            await self._audio_track.put_frame(frame)
        elif isinstance(frame, VideoFrame):
            now = time.monotonic()
            elapsed = now - self._last_frame_time
            if elapsed < self._frame_interval:
                await asyncio.sleep(self._frame_interval - elapsed)
            await self._video_track.put_frame(frame)
            self._last_frame_time = time.monotonic()

    async def aclose(self) -> None:
        pass

Paces audio and video frames into their respective custom tracks so that audio and video stay in sync at the configured FPS.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    pass
async def push(self,
frame: Union[AudioFrame, VideoFrame, AudioSegmentEnd]) ‑> None
Expand source code
async def push(self, frame: Union[AudioFrame, VideoFrame, AudioSegmentEnd]) -> None:
    if not self._start_time:
        self._start_time = time.monotonic()

    if isinstance(frame, AudioFrame):
        await self._audio_track.put_frame(frame)
    elif isinstance(frame, VideoFrame):
        now = time.monotonic()
        elapsed = now - self._last_frame_time
        if elapsed < self._frame_interval:
            await asyncio.sleep(self._frame_interval - elapsed)
        await self._video_track.put_frame(frame)
        self._last_frame_time = time.monotonic()
class AvatarVisualTrack (width: int, height: int, fps: float)
Expand source code
class AvatarVisualTrack(CustomVideoTrack):
    """Custom video track that drains frames from an asyncio.Queue."""

    def __init__(self, width: int, height: int, fps: float):
        super().__init__()
        self.kind = "video"
        self._width = width
        self._height = height
        self._fps = fps
        self._queue: asyncio.Queue[VideoFrame | None] = asyncio.Queue()
        self._start: float | None = None
        self._timestamp = 0

    async def put_frame(self, frame: VideoFrame) -> None:
        await self._queue.put(frame)

    async def recv(self) -> VideoFrame:
        if self.readyState != "live":
            raise MediaStreamError

        if self._start is None:
            self._start = time.time()

        frame = await self._queue.get()
        if frame is None:
            raise MediaStreamError("Track ended")

        VIDEO_CLOCK_RATE = 90000
        VIDEO_PTIME = 1 / self._fps
        self._timestamp += int(VIDEO_PTIME * VIDEO_CLOCK_RATE)
        frame.pts = self._timestamp
        frame.time_base = fractions.Fraction(1, VIDEO_CLOCK_RATE)
        return frame

    async def stop(self) -> None:
        await self._queue.put(None)

Custom video track that drains frames from an asyncio.Queue.

Ancestors

  • videosdk.custom_video_track.CustomVideoTrack
  • vsaiortc.mediastreams.MediaStreamTrack
  • pyee.asyncio.AsyncIOEventEmitter
  • pyee.base.EventEmitter

Methods

async def put_frame(self, frame: VideoFrame) ‑> None
Expand source code
async def put_frame(self, frame: VideoFrame) -> None:
    await self._queue.put(frame)
async def recv(self) ‑> av.video.frame.VideoFrame
Expand source code
async def recv(self) -> VideoFrame:
    if self.readyState != "live":
        raise MediaStreamError

    if self._start is None:
        self._start = time.time()

    frame = await self._queue.get()
    if frame is None:
        raise MediaStreamError("Track ended")

    VIDEO_CLOCK_RATE = 90000
    VIDEO_PTIME = 1 / self._fps
    self._timestamp += int(VIDEO_PTIME * VIDEO_CLOCK_RATE)
    frame.pts = self._timestamp
    frame.time_base = fractions.Fraction(1, VIDEO_CLOCK_RATE)
    return frame

Receive the next :class:~av.video.frame.VideoFrame.

The base implementation just reads a 640x480 green frame at 30fps, subclass :class:VideoStreamTrack to provide a useful implementation.

async def stop(self) ‑> None
Expand source code
async def stop(self) -> None:
    await self._queue.put(None)
class AvatarVoiceTrack (sample_rate: int, num_channels: int)
Expand source code
class AvatarVoiceTrack(CustomAudioTrack):
    """
    Custom audio track that reconstructs steady 20 ms PCM frames from incoming
    audio pushed by AvatarSynchronizer. Produces silence when the buffer is empty.
    """

    AUDIO_PTIME = 0.02
    MAX_BUFFER_DURATION = 2.0

    def __init__(self, sample_rate: int, num_channels: int):
        super().__init__()
        self.kind = "audio"
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._start: float | None = None
        self._timestamp = 0
        self._time_base = fractions.Fraction(1, self._sample_rate)
        self._default_samples = int(self._sample_rate * self.AUDIO_PTIME)
        self._sample_width = 2  # s16le
        self._chunk_size = self._default_samples * self._num_channels * self._sample_width
        self._audio_buffer = bytearray()
        max_chunks = int(self.MAX_BUFFER_DURATION / self.AUDIO_PTIME)
        self._max_buffer_bytes = max(self._chunk_size * max_chunks, self._chunk_size * 5)
        self._stopped = False
        self._is_speaking = False

    async def put_frame(self, frame: AudioFrame) -> None:
        if self._stopped:
            return
        try:
            pcm_bytes = bytes(frame.planes[0])
        except Exception:
            pcm_bytes = frame.to_ndarray().tobytes()
        if not pcm_bytes:
            return
        self._audio_buffer.extend(pcm_bytes)
        if len(self._audio_buffer) > self._max_buffer_bytes:
            overflow = len(self._audio_buffer) - self._max_buffer_bytes
            del self._audio_buffer[:overflow]

    def _build_audio_frame(self, chunk: bytes) -> AudioFrame:
        if len(chunk) < self._chunk_size:
            chunk = chunk + bytes(self._chunk_size - len(chunk))
        data = np.frombuffer(chunk, dtype=np.int16).reshape(-1, self._num_channels)
        layout = "mono" if self._num_channels == 1 else "stereo"
        frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout)
        frame.sample_rate = self._sample_rate
        return frame

    def _build_silence_frame(self) -> AudioFrame:
        frame = AudioFrame(
            format="s16",
            layout="mono" if self._num_channels == 1 else "stereo",
            samples=self._default_samples,
        )
        for plane in frame.planes:
            plane.update(bytes(plane.buffer_size))
        frame.sample_rate = self._sample_rate
        return frame

    async def recv(self) -> AudioFrame:
        if self.readyState != "live":
            raise MediaStreamError

        if self._start is None:
            self._start = time.time()

        if len(self._audio_buffer) >= self._chunk_size:
            chunk = self._audio_buffer[: self._chunk_size]
            del self._audio_buffer[: self._chunk_size]
            frame = self._build_audio_frame(bytes(chunk))
            self._is_speaking = True
        else:
            if self._stopped:
                raise MediaStreamError("Track ended")
            frame = self._build_silence_frame()
            self._is_speaking = False

        samples = frame.samples or self._default_samples
        pts = self._timestamp
        wait = self._start + (pts / self._sample_rate) - time.time()
        if wait > 0:
            await asyncio.sleep(wait)

        frame.pts = pts
        frame.time_base = self._time_base
        self._timestamp += samples
        return frame

    async def stop(self) -> None:
        self._stopped = True
        self._audio_buffer.clear()

Custom audio track that reconstructs steady 20 ms PCM frames from incoming audio pushed by AvatarSynchronizer. Produces silence when the buffer is empty.

Ancestors

  • videosdk.custom_audio_track.CustomAudioTrack
  • vsaiortc.mediastreams.MediaStreamTrack
  • pyee.asyncio.AsyncIOEventEmitter
  • pyee.base.EventEmitter

Class variables

var AUDIO_PTIME
var MAX_BUFFER_DURATION

Methods

async def put_frame(self, frame: AudioFrame) ‑> None
Expand source code
async def put_frame(self, frame: AudioFrame) -> None:
    if self._stopped:
        return
    try:
        pcm_bytes = bytes(frame.planes[0])
    except Exception:
        pcm_bytes = frame.to_ndarray().tobytes()
    if not pcm_bytes:
        return
    self._audio_buffer.extend(pcm_bytes)
    if len(self._audio_buffer) > self._max_buffer_bytes:
        overflow = len(self._audio_buffer) - self._max_buffer_bytes
        del self._audio_buffer[:overflow]
async def recv(self) ‑> av.audio.frame.AudioFrame
Expand source code
async def recv(self) -> AudioFrame:
    if self.readyState != "live":
        raise MediaStreamError

    if self._start is None:
        self._start = time.time()

    if len(self._audio_buffer) >= self._chunk_size:
        chunk = self._audio_buffer[: self._chunk_size]
        del self._audio_buffer[: self._chunk_size]
        frame = self._build_audio_frame(bytes(chunk))
        self._is_speaking = True
    else:
        if self._stopped:
            raise MediaStreamError("Track ended")
        frame = self._build_silence_frame()
        self._is_speaking = False

    samples = frame.samples or self._default_samples
    pts = self._timestamp
    wait = self._start + (pts / self._sample_rate) - time.time()
    if wait > 0:
        await asyncio.sleep(wait)

    frame.pts = pts
    frame.time_base = self._time_base
    self._timestamp += samples
    return frame

Receive the next :class:~av.audio.frame.AudioFrame.

The base implementation just reads silence, subclass :class:AudioStreamTrack to provide a useful implementation.

async def stop(self) ‑> None
Expand source code
async def stop(self) -> None:
    self._stopped = True
    self._audio_buffer.clear()