Module agents.avatar
Sub-modules
agents.avatar.avatar_audio_ioagents.avatar.avatar_authagents.avatar.avatar_controlleragents.avatar.avatar_schemaagents.avatar.avatar_synchronizer
Functions
def generate_avatar_credentials(api_key: str,
secret: str,
*,
participant_id: Optional[str] = None,
ttl_seconds: int = 3600) ‑> AvatarAuthCredentials-
Expand source code
def generate_avatar_credentials( api_key: str, secret: str, *, participant_id: Optional[str] = None, ttl_seconds: int = 3600, ) -> AvatarAuthCredentials: """ Generate a pre-signed VideoSDK token for an Avatar Server participant. Args: api_key: Your VideoSDK API key. secret: Your VideoSDK secret key. participant_id: Optional fixed participant ID. A random one is generated if omitted. ttl_seconds: Token validity in seconds (default 1 hour). Returns: AvatarAuthCredentials with participant_id and signed token. """ try: import jwt except ImportError as exc: raise ImportError( "PyJWT is required for generate_avatar_credentials(). " "Install it with: pip install PyJWT" ) from exc pid = participant_id or f"{"avatar"}_{uuid.uuid4().hex[:8]}" now = int(time.time()) payload = { "apikey": api_key, "permissions": ["allow_join"], "version": 2, "iat": now, "exp": now + ttl_seconds, "participantId": pid, } token = jwt.encode(payload, secret, algorithm="HS256") if isinstance(token, bytes): token = token.decode("utf-8") return AvatarAuthCredentials(participant_id=pid, token=token)Generate a pre-signed VideoSDK token for an Avatar Server participant.
Args
api_key- Your VideoSDK API key.
secret- Your VideoSDK secret key.
participant_id- Optional fixed participant ID. A random one is generated if omitted.
ttl_seconds- Token validity in seconds (default 1 hour).
Returns
AvatarAuthCredentials with participant_id and signed token.
Classes
class AudioSegmentEnd-
Expand source code
class AudioSegmentEnd: """Sentinel yielded by an AvatarRenderer to signal that a TTS segment has fully played.""" passSentinel yielded by an AvatarRenderer to signal that a TTS segment has fully played.
class AvatarAudioIn (meeting: Meeting | None, *, channels: int = 1, sample_rate: int = 24000)-
Expand source code
class AvatarAudioIn(AvatarInput): """ Avatar-worker-side receiver. Listens for data-channel messages from the agent and exposes them as an async iterator of AudioFrame / AudioSegmentEnd items. Control messages: - Raw bytes → reconstruct AudioFrame and enqueue it. - INTERRUPT → clear the queue and emit ``reset_stream``. - segment_end JSON → enqueue AudioSegmentEnd. Note: VideoSDK broadcasts data-channel messages to all participants, so every participant in the room sees every message. """ _INTERRUPT_COOLDOWN = 0.3 def __init__( self, meeting: Meeting | None, *, channels: int = 1, sample_rate: int = 24000, ): super().__init__() self._channels = channels self._sample_rate = sample_rate self._data_ch: asyncio.Queue[AudioFrame | AudioSegmentEnd] = asyncio.Queue() self._handler: _AvatarDataHandler | None = None self._meeting: Meeting | None = None self._interrupt_until: float = 0.0 if meeting: self.set_meeting(meeting) def set_meeting(self, meeting: Meeting) -> None: if self._meeting and self._handler: try: self._meeting.remove_event_listener(self._handler) except Exception: pass self._handler = None self._meeting = meeting self._handler = _AvatarDataHandler(callback=self._on_data) self._meeting.add_event_listener(self._handler) logger.info("AvatarAudioIn attached to meeting") def notify_stream_ended(self, playback_position: float, interrupted: bool) -> None: asyncio.create_task(self._send_stream_ended(playback_position, interrupted)) def __aiter__(self): return self async def __anext__(self) -> AudioFrame | AudioSegmentEnd: try: return await self._data_ch.get() except asyncio.CancelledError: raise StopAsyncIteration async def aclose(self) -> None: if self._meeting and self._handler: try: self._meeting.remove_event_listener(self._handler) except Exception: pass self._handler = None def _on_data(self, data: dict) -> None: payload = data.get("payload", b"") try: if isinstance(payload, memoryview): payload = payload.tobytes() if payload == MSG_INTERRUPT or payload == "INTERRUPT": self._handle_interrupt() return if isinstance(payload, (bytes, bytearray)): self._handle_audio_bytes(payload) return if isinstance(payload, str): self._handle_text_payload(payload) except Exception as e: logger.error("AvatarAudioIn: error processing message: %s", e) def _handle_audio_bytes(self, raw: bytes) -> None: import time as _time if _time.monotonic() < self._interrupt_until: return if len(raw) % 2 != 0: raw = raw + b"\x00" array = np.frombuffer(raw, dtype=np.int16) mono = array.reshape(-1, 1) if self._channels == 2: stereo = np.column_stack([mono[:, 0], mono[:, 0]]) array_out = stereo else: array_out = mono frame = AudioFrame.from_ndarray( array_out.T, format="s16", layout="mono" if self._channels == 1 else "stereo", ) frame.sample_rate = self._sample_rate self._data_ch.put_nowait(frame) def _handle_interrupt(self) -> None: import time as _time self._interrupt_until = _time.monotonic() + self._INTERRUPT_COOLDOWN while not self._data_ch.empty(): try: self._data_ch.get_nowait() except asyncio.QueueEmpty: break self.emit("reset_stream") logger.info("AvatarAudioIn: INTERRUPT received, buffer cleared, cooldown %.1fs", self._INTERRUPT_COOLDOWN) def _handle_text_payload(self, payload: str) -> None: if not payload: return try: msg = json.loads(payload) except (json.JSONDecodeError, ValueError): return if msg.get("type") == MSG_TYPE_SEGMENT_END: self._data_ch.put_nowait(AudioSegmentEnd()) logger.debug("AvatarAudioIn: segment_end received") elif msg.get("type") == MSG_TYPE_STREAM_ENDED: pass async def _send_stream_ended(self, playback_position: float, interrupted: bool) -> None: if not self._meeting: return payload = json.dumps( { "type": MSG_TYPE_STREAM_ENDED, "data": { "playback_position": playback_position, "interrupted": interrupted, }, } ) await self._meeting.send( payload, {"reliability": ReliabilityModes.RELIABLE.value} ) logger.debug( "AvatarAudioIn: sent stream_ended (pos=%.3fs, interrupted=%s)", playback_position, interrupted, )Avatar-worker-side receiver.
Listens for data-channel messages from the agent and exposes them as an async iterator of AudioFrame / AudioSegmentEnd items. Control messages: - Raw bytes → reconstruct AudioFrame and enqueue it. - INTERRUPT → clear the queue and emit
reset_stream. - segment_end JSON → enqueue AudioSegmentEnd.Note: VideoSDK broadcasts data-channel messages to all participants, so every participant in the room sees every message.
Ancestors
- AvatarInput
- abc.ABC
- EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if self._meeting and self._handler: try: self._meeting.remove_event_listener(self._handler) except Exception: pass self._handler = None def set_meeting(self, meeting: Meeting) ‑> None-
Expand source code
def set_meeting(self, meeting: Meeting) -> None: if self._meeting and self._handler: try: self._meeting.remove_event_listener(self._handler) except Exception: pass self._handler = None self._meeting = meeting self._handler = _AvatarDataHandler(callback=self._on_data) self._meeting.add_event_listener(self._handler) logger.info("AvatarAudioIn attached to meeting")
Inherited members
class AvatarAudioOut (*,
credentials: AvatarAuthCredentials,
avatar_dispatcher_url: Optional[str] = None,
room_id: Optional[str] = None)-
Expand source code
class AvatarAudioOut: """ Agent-side handle for the avatar data channel. Responsibilities: - Spin up the Avatar Server via an HTTP dispatcher. - Stream raw PCM audio chunks to the worker (UNRELIABLE). - Send ``segment_end`` control messages (RELIABLE) so the worker knows when a TTS turn has finished — this is what allows ``notify_stream_ended`` to fire on the worker side. - Send ``INTERRUPT`` (RELIABLE) when the agent interrupts its output. - Receive ``stream_ended`` acks from the worker via an on_data listener. """ def __init__( self, *, credentials: AvatarAuthCredentials, avatar_dispatcher_url: Optional[str] = None, room_id: Optional[str] = None, ): self._credentials = credentials self._avatar_dispatcher_url = avatar_dispatcher_url self._room_id = room_id self._meeting: Meeting | None = None self._ack_handler: _AvatarAckHandler | None = None self._participant_id: str = credentials.participant_id self.video_track = None self.audio_track = None def set_room_id(self, room_id: str) -> None: self._room_id = room_id @property def participant_id(self) -> str: return self._participant_id async def connect(self) -> None: """Call the avatar dispatcher so the worker process joins the room.""" await self._avatar_spinup() def _set_meeting(self, meeting: Meeting) -> None: """ Inject the live Meeting object. Called by the framework after the agent has joined the room. Also registers the ack listener. """ self._meeting = meeting self._ack_handler = _AvatarAckHandler(on_stream_ended=self._on_stream_ended) self._meeting.add_event_listener(self._ack_handler) logger.info("AvatarAudioOut attached to meeting: %s", meeting.id) async def _avatar_spinup(self) -> None: if not self._avatar_dispatcher_url: logger.info("AvatarAudioOut: No dispatcher URL provided, skipping local avatar spinup.") return if not self._room_id: raise ValueError("room_id must be set before calling connect()") join_info = AvatarJoinInfo( room_name=self._room_id, token=self._credentials.token, participant_id=self._credentials.participant_id, ) logger.info( "Sending connection info to avatar dispatcher %s (participant_id=%s)", self._avatar_dispatcher_url, self._credentials.participant_id, ) async with httpx.AsyncClient() as client: response = await client.post( self._avatar_dispatcher_url, json=asdict(join_info) ) response.raise_for_status() logger.info("Avatar handshake completed") async def handle_audio_input(self, audio_data: bytes) -> None: """ Chunk and send raw PCM bytes to the Avatar Server via data channel. Uses UNRELIABLE mode for low-latency streaming. """ if not self._meeting: return MAX_CHUNK = 15_000 for i in range(0, len(audio_data), MAX_CHUNK): chunk = audio_data[i : i + MAX_CHUNK] if not chunk: continue if len(chunk) % 2 != 0: chunk = chunk + b"\x00" try: await self._meeting.send( chunk, {"reliability": ReliabilityModes.UNRELIABLE.value} ) except Exception: # Data channel closed (e.g. participant left) — stop sending logger.debug("AvatarAudioOut: data channel closed, dropping remaining audio") self._meeting = None return async def send_segment_end(self) -> None: """ Notify the Avatar Server that the current TTS segment has finished. This causes the receiver to enqueue AudioSegmentEnd so the controller can call notify_stream_ended(interrupted=False). """ if not self._meeting: return try: payload = json.dumps({"type": MSG_TYPE_SEGMENT_END}) await self._meeting.send( payload, {"reliability": ReliabilityModes.RELIABLE.value} ) logger.debug("AvatarAudioOut: sent segment_end") except Exception: self._meeting = None async def interrupt(self) -> None: """Tell the Avatar Server to immediately stop playback.""" if not self._meeting: return try: await self._meeting.send( MSG_INTERRUPT, {"reliability": ReliabilityModes.RELIABLE.value} ) logger.info("AvatarAudioOut: sent INTERRUPT to Avatar Server") except Exception: self._meeting = None def _on_stream_ended(self, playback_position: float, interrupted: bool) -> None: logger.info( "AvatarAudioOut: stream_ended ack received (pos=%.3fs, interrupted=%s)", playback_position, interrupted, ) async def aclose(self) -> None: if self._meeting and self._ack_handler: try: self._meeting.remove_event_listener(self._ack_handler) except Exception: pass self._ack_handler = NoneAgent-side handle for the avatar data channel.
Responsibilities: - Spin up the Avatar Server via an HTTP dispatcher. - Stream raw PCM audio chunks to the worker (UNRELIABLE). - Send
segment_endcontrol messages (RELIABLE) so the worker knows when a TTS turn has finished — this is what allowsnotify_stream_endedto fire on the worker side. - SendINTERRUPT(RELIABLE) when the agent interrupts its output. - Receivestream_endedacks from the worker via an on_data listener.Instance variables
prop participant_id : str-
Expand source code
@property def participant_id(self) -> str: return self._participant_id
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if self._meeting and self._ack_handler: try: self._meeting.remove_event_listener(self._ack_handler) except Exception: pass self._ack_handler = None async def connect(self) ‑> None-
Expand source code
async def connect(self) -> None: """Call the avatar dispatcher so the worker process joins the room.""" await self._avatar_spinup()Call the avatar dispatcher so the worker process joins the room.
async def handle_audio_input(self, audio_data: bytes) ‑> None-
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None: """ Chunk and send raw PCM bytes to the Avatar Server via data channel. Uses UNRELIABLE mode for low-latency streaming. """ if not self._meeting: return MAX_CHUNK = 15_000 for i in range(0, len(audio_data), MAX_CHUNK): chunk = audio_data[i : i + MAX_CHUNK] if not chunk: continue if len(chunk) % 2 != 0: chunk = chunk + b"\x00" try: await self._meeting.send( chunk, {"reliability": ReliabilityModes.UNRELIABLE.value} ) except Exception: # Data channel closed (e.g. participant left) — stop sending logger.debug("AvatarAudioOut: data channel closed, dropping remaining audio") self._meeting = None returnChunk and send raw PCM bytes to the Avatar Server via data channel. Uses UNRELIABLE mode for low-latency streaming.
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Tell the Avatar Server to immediately stop playback.""" if not self._meeting: return try: await self._meeting.send( MSG_INTERRUPT, {"reliability": ReliabilityModes.RELIABLE.value} ) logger.info("AvatarAudioOut: sent INTERRUPT to Avatar Server") except Exception: self._meeting = NoneTell the Avatar Server to immediately stop playback.
async def send_segment_end(self) ‑> None-
Expand source code
async def send_segment_end(self) -> None: """ Notify the Avatar Server that the current TTS segment has finished. This causes the receiver to enqueue AudioSegmentEnd so the controller can call notify_stream_ended(interrupted=False). """ if not self._meeting: return try: payload = json.dumps({"type": MSG_TYPE_SEGMENT_END}) await self._meeting.send( payload, {"reliability": ReliabilityModes.RELIABLE.value} ) logger.debug("AvatarAudioOut: sent segment_end") except Exception: self._meeting = NoneNotify the Avatar Server that the current TTS segment has finished. This causes the receiver to enqueue AudioSegmentEnd so the controller can call notify_stream_ended(interrupted=False).
def set_room_id(self, room_id: str) ‑> None-
Expand source code
def set_room_id(self, room_id: str) -> None: self._room_id = room_id
class AvatarAuthCredentials (participant_id: str, token: str, attributes: Optional[dict[str, str]] = None)-
Expand source code
@dataclass(frozen=True) class AvatarAuthCredentials: """Pre-signed credentials that allow an Avatar Server to join a VideoSDK room.""" participant_id: str token: str attributes: Optional[dict[str, str]] = NonePre-signed credentials that allow an Avatar Server to join a VideoSDK room.
Instance variables
var attributes : dict[str, str] | Nonevar participant_id : strvar token : str
class AvatarInput-
Expand source code
class AvatarInput(ABC, EventEmitter[Literal["reset_stream"]]): """ Abstract base for the avatar-worker-side audio receiver. The Avatar Server iterates over an AvatarInput to receive AudioFrame objects from the agent. When the agent finishes a TTS segment it sends a ``segment_end`` control message which causes the receiver to enqueue an AudioSegmentEnd sentinel. The receiver also emits a ``reset_stream`` event when an interrupt arrives from the agent. """ def __init__(self): super().__init__() async def start_stream(self) -> None: """Optional hook called before the first frame is consumed.""" pass @abstractmethod def notify_stream_ended( self, playback_position: float, interrupted: bool ) -> None | Coroutine[None, None, None]: """Send a stream_ended ack back to the agent.""" @abstractmethod def __aiter__(self) -> AsyncIterator[AudioFrame | AudioSegmentEnd]: """Yield AudioFrame items, then AudioSegmentEnd when the segment is done.""" async def aclose(self) -> None: passAbstract base for the avatar-worker-side audio receiver.
The Avatar Server iterates over an AvatarInput to receive AudioFrame objects from the agent. When the agent finishes a TTS segment it sends a
segment_endcontrol message which causes the receiver to enqueue an AudioSegmentEnd sentinel. The receiver also emits areset_streamevent when an interrupt arrives from the agent.Ancestors
- abc.ABC
- EventEmitter
- typing.Generic
Subclasses
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: pass def notify_stream_ended(self, playback_position: float, interrupted: bool) ‑> None | collections.abc.Coroutine[None, None, None]-
Expand source code
@abstractmethod def notify_stream_ended( self, playback_position: float, interrupted: bool ) -> None | Coroutine[None, None, None]: """Send a stream_ended ack back to the agent."""Send a stream_ended ack back to the agent.
async def start_stream(self) ‑> None-
Expand source code
async def start_stream(self) -> None: """Optional hook called before the first frame is consumed.""" passOptional hook called before the first frame is consumed.
Inherited members
class AvatarJoinInfo (room_name: str,
token: str,
participant_id: Optional[str] = None,
signaling_base_url: Optional[str] = None)-
Expand source code
@dataclass(frozen=True) class AvatarJoinInfo: """Payload sent to the avatar dispatcher so it can join the room.""" room_name: str token: str participant_id: Optional[str] = None signaling_base_url: Optional[str] = NonePayload sent to the avatar dispatcher so it can join the room.
Instance variables
var participant_id : str | Nonevar room_name : strvar signaling_base_url : str | Nonevar token : str
class AvatarRenderer-
Expand source code
class AvatarRenderer(ABC): """ Abstract base for the avatar-worker-side video/audio renderer. An AvatarRenderer receives audio frames via push_stream_chunk and yields interleaved VideoFrame / AudioFrame items plus an AudioSegmentEnd sentinel when the current segment has been fully rendered. """ @abstractmethod async def push_stream_chunk(self, frame: AudioFrame | AudioSegmentEnd) -> None: """Receive an audio frame (or segment-end sentinel) from the controller.""" @abstractmethod def reset_stream(self) -> None | Coroutine[None, None, None]: """Immediately discard buffered audio (called on interrupt).""" @abstractmethod def __aiter__( self, ) -> AsyncIterator[VideoFrame | AudioFrame | AudioSegmentEnd]: """Yield interleaved video+audio frames, then AudioSegmentEnd."""Abstract base for the avatar-worker-side video/audio renderer.
An AvatarRenderer receives audio frames via push_stream_chunk and yields interleaved VideoFrame / AudioFrame items plus an AudioSegmentEnd sentinel when the current segment has been fully rendered.
Ancestors
- abc.ABC
Methods
async def push_stream_chunk(self,
frame: AudioFrame | AudioSegmentEnd) ‑> None-
Expand source code
@abstractmethod async def push_stream_chunk(self, frame: AudioFrame | AudioSegmentEnd) -> None: """Receive an audio frame (or segment-end sentinel) from the controller."""Receive an audio frame (or segment-end sentinel) from the controller.
def reset_stream(self) ‑> None | collections.abc.Coroutine[None, None, None]-
Expand source code
@abstractmethod def reset_stream(self) -> None | Coroutine[None, None, None]: """Immediately discard buffered audio (called on interrupt)."""Immediately discard buffered audio (called on interrupt).
class AvatarServer (meeting: Any | None,
*,
audio_recv: AvatarInput,
video_gen: AvatarRenderer,
options: AvatarSettings)-
Expand source code
class AvatarServer: """ Orchestrates the Avatar Server side. Reads audio from an AvatarInput, drives it through an AvatarRenderer, and forwards the resulting audio+video frames to the room via AvatarSynchronizer. Handles segment completion and interruption signals. """ def __init__( self, meeting: Any | None, *, audio_recv: AvatarInput, video_gen: AvatarRenderer, options: AvatarSettings, ) -> None: self._meeting = meeting self._video_gen = video_gen self._options = options self._audio_recv = audio_recv self._playback_position = 0.0 self._audio_playing = False self._tasks: set[asyncio.Task[Any]] = set() self._audio_track = AvatarVoiceTrack( sample_rate=options.audio_sample_rate, num_channels=options.audio_channels, ) self._video_track = AvatarVisualTrack( width=options.video_width, height=options.video_height, fps=options.video_fps, ) self._av_sync = AvatarSynchronizer( audio_track=self._audio_track, video_track=self._video_track, video_fps=options.video_fps, ) self._read_audio_atask: asyncio.Task[None] | None = None self._forward_video_atask: asyncio.Task[None] | None = None @property def av_sync(self) -> AvatarSynchronizer: return self._av_sync async def start(self) -> None: """Start audio/video processing tasks.""" await self._audio_recv.start_stream() self._audio_recv.on("reset_stream", self._on_reset_stream) self._read_audio_atask = asyncio.create_task(self._read_audio()) self._forward_video_atask = asyncio.create_task(self._forward_video()) async def wait_for_complete(self) -> None: if not self._read_audio_atask or not self._forward_video_atask: raise RuntimeError("AvatarServer not started") await asyncio.gather(self._read_audio_atask, self._forward_video_atask) async def _read_audio(self) -> None: async for frame in self._audio_recv: if not self._audio_playing and isinstance(frame, AudioFrame): self._audio_playing = True await self._video_gen.push_stream_chunk(frame) async def _forward_video(self) -> None: async for frame in self._video_gen: if isinstance(frame, AudioSegmentEnd): if self._audio_playing: notify_task = self._audio_recv.notify_stream_ended( playback_position=self._playback_position, interrupted=False, ) self._audio_playing = False self._playback_position = 0.0 if asyncio.iscoroutine(notify_task): task = asyncio.create_task(notify_task) self._tasks.add(task) task.add_done_callback(self._tasks.discard) continue await self._av_sync.push(frame) if isinstance(frame, AudioFrame): self._playback_position += frame.samples / frame.sample_rate def _on_reset_stream(self) -> None: self._audio_track._audio_buffer.clear() self._audio_track._is_speaking = False logger.info("AvatarServer: reset_stream — audio buffer cleared (sync)") maybe_coro = self._video_gen.reset_stream() async def _handle_reset(audio_playing: bool) -> None: if asyncio.iscoroutine(maybe_coro): await maybe_coro self._audio_track._audio_buffer.clear() logger.info("AvatarServer: reset_stream — audio buffer cleared (async follow-up)") if audio_playing: notify_task = self._audio_recv.notify_stream_ended( playback_position=self._playback_position, interrupted=True, ) self._playback_position = 0.0 if asyncio.iscoroutine(notify_task): await notify_task task = asyncio.create_task(_handle_reset(self._audio_playing)) self._tasks.add(task) task.add_done_callback(self._tasks.discard) self._audio_playing = False async def aclose(self) -> None: await self._audio_recv.aclose() if self._forward_video_atask: self._forward_video_atask.cancel() if self._read_audio_atask: self._read_audio_atask.cancel() for task in list(self._tasks): task.cancel() await self._av_sync.aclose()Orchestrates the Avatar Server side.
Reads audio from an AvatarInput, drives it through an AvatarRenderer, and forwards the resulting audio+video frames to the room via AvatarSynchronizer. Handles segment completion and interruption signals.
Instance variables
prop av_sync : AvatarSynchronizer-
Expand source code
@property def av_sync(self) -> AvatarSynchronizer: return self._av_sync
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: await self._audio_recv.aclose() if self._forward_video_atask: self._forward_video_atask.cancel() if self._read_audio_atask: self._read_audio_atask.cancel() for task in list(self._tasks): task.cancel() await self._av_sync.aclose() async def start(self) ‑> None-
Expand source code
async def start(self) -> None: """Start audio/video processing tasks.""" await self._audio_recv.start_stream() self._audio_recv.on("reset_stream", self._on_reset_stream) self._read_audio_atask = asyncio.create_task(self._read_audio()) self._forward_video_atask = asyncio.create_task(self._forward_video())Start audio/video processing tasks.
async def wait_for_complete(self) ‑> None-
Expand source code
async def wait_for_complete(self) -> None: if not self._read_audio_atask or not self._forward_video_atask: raise RuntimeError("AvatarServer not started") await asyncio.gather(self._read_audio_atask, self._forward_video_atask)
class AvatarSettings (video_width: int,
video_height: int,
video_fps: float,
audio_sample_rate: int,
audio_channels: int)-
Expand source code
@dataclass class AvatarSettings: """Configuration for the Avatar Server's A/V output.""" video_width: int video_height: int video_fps: float audio_sample_rate: int audio_channels: intConfiguration for the Avatar Server's A/V output.
Instance variables
var audio_channels : intvar audio_sample_rate : intvar video_fps : floatvar video_height : intvar video_width : int
class AvatarSynchronizer (audio_track: CustomAudioTrack, video_track: CustomVideoTrack, video_fps: float)-
Expand source code
class AvatarSynchronizer: """ Paces audio and video frames into their respective custom tracks so that audio and video stay in sync at the configured FPS. """ def __init__( self, audio_track: CustomAudioTrack, video_track: CustomVideoTrack, video_fps: float, ): self._audio_track = audio_track self._video_track = video_track self._video_fps = video_fps self._frame_interval = 1.0 / video_fps self._last_frame_time = 0.0 self._start_time = 0.0 async def push(self, frame: Union[AudioFrame, VideoFrame, AudioSegmentEnd]) -> None: if not self._start_time: self._start_time = time.monotonic() if isinstance(frame, AudioFrame): await self._audio_track.put_frame(frame) elif isinstance(frame, VideoFrame): now = time.monotonic() elapsed = now - self._last_frame_time if elapsed < self._frame_interval: await asyncio.sleep(self._frame_interval - elapsed) await self._video_track.put_frame(frame) self._last_frame_time = time.monotonic() async def aclose(self) -> None: passPaces audio and video frames into their respective custom tracks so that audio and video stay in sync at the configured FPS.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: pass async def push(self,
frame: Union[AudioFrame, VideoFrame, AudioSegmentEnd]) ‑> None-
Expand source code
async def push(self, frame: Union[AudioFrame, VideoFrame, AudioSegmentEnd]) -> None: if not self._start_time: self._start_time = time.monotonic() if isinstance(frame, AudioFrame): await self._audio_track.put_frame(frame) elif isinstance(frame, VideoFrame): now = time.monotonic() elapsed = now - self._last_frame_time if elapsed < self._frame_interval: await asyncio.sleep(self._frame_interval - elapsed) await self._video_track.put_frame(frame) self._last_frame_time = time.monotonic()
class AvatarVisualTrack (width: int, height: int, fps: float)-
Expand source code
class AvatarVisualTrack(CustomVideoTrack): """Custom video track that drains frames from an asyncio.Queue.""" def __init__(self, width: int, height: int, fps: float): super().__init__() self.kind = "video" self._width = width self._height = height self._fps = fps self._queue: asyncio.Queue[VideoFrame | None] = asyncio.Queue() self._start: float | None = None self._timestamp = 0 async def put_frame(self, frame: VideoFrame) -> None: await self._queue.put(frame) async def recv(self) -> VideoFrame: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time.time() frame = await self._queue.get() if frame is None: raise MediaStreamError("Track ended") VIDEO_CLOCK_RATE = 90000 VIDEO_PTIME = 1 / self._fps self._timestamp += int(VIDEO_PTIME * VIDEO_CLOCK_RATE) frame.pts = self._timestamp frame.time_base = fractions.Fraction(1, VIDEO_CLOCK_RATE) return frame async def stop(self) -> None: await self._queue.put(None)Custom video track that drains frames from an asyncio.Queue.
Ancestors
- videosdk.custom_video_track.CustomVideoTrack
- vsaiortc.mediastreams.MediaStreamTrack
- pyee.asyncio.AsyncIOEventEmitter
- pyee.base.EventEmitter
Methods
async def put_frame(self, frame: VideoFrame) ‑> None-
Expand source code
async def put_frame(self, frame: VideoFrame) -> None: await self._queue.put(frame) async def recv(self) ‑> av.video.frame.VideoFrame-
Expand source code
async def recv(self) -> VideoFrame: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time.time() frame = await self._queue.get() if frame is None: raise MediaStreamError("Track ended") VIDEO_CLOCK_RATE = 90000 VIDEO_PTIME = 1 / self._fps self._timestamp += int(VIDEO_PTIME * VIDEO_CLOCK_RATE) frame.pts = self._timestamp frame.time_base = fractions.Fraction(1, VIDEO_CLOCK_RATE) return frameReceive the next :class:
~av.video.frame.VideoFrame.The base implementation just reads a 640x480 green frame at 30fps, subclass :class:
VideoStreamTrackto provide a useful implementation. async def stop(self) ‑> None-
Expand source code
async def stop(self) -> None: await self._queue.put(None)
class AvatarVoiceTrack (sample_rate: int, num_channels: int)-
Expand source code
class AvatarVoiceTrack(CustomAudioTrack): """ Custom audio track that reconstructs steady 20 ms PCM frames from incoming audio pushed by AvatarSynchronizer. Produces silence when the buffer is empty. """ AUDIO_PTIME = 0.02 MAX_BUFFER_DURATION = 2.0 def __init__(self, sample_rate: int, num_channels: int): super().__init__() self.kind = "audio" self._sample_rate = sample_rate self._num_channels = num_channels self._start: float | None = None self._timestamp = 0 self._time_base = fractions.Fraction(1, self._sample_rate) self._default_samples = int(self._sample_rate * self.AUDIO_PTIME) self._sample_width = 2 # s16le self._chunk_size = self._default_samples * self._num_channels * self._sample_width self._audio_buffer = bytearray() max_chunks = int(self.MAX_BUFFER_DURATION / self.AUDIO_PTIME) self._max_buffer_bytes = max(self._chunk_size * max_chunks, self._chunk_size * 5) self._stopped = False self._is_speaking = False async def put_frame(self, frame: AudioFrame) -> None: if self._stopped: return try: pcm_bytes = bytes(frame.planes[0]) except Exception: pcm_bytes = frame.to_ndarray().tobytes() if not pcm_bytes: return self._audio_buffer.extend(pcm_bytes) if len(self._audio_buffer) > self._max_buffer_bytes: overflow = len(self._audio_buffer) - self._max_buffer_bytes del self._audio_buffer[:overflow] def _build_audio_frame(self, chunk: bytes) -> AudioFrame: if len(chunk) < self._chunk_size: chunk = chunk + bytes(self._chunk_size - len(chunk)) data = np.frombuffer(chunk, dtype=np.int16).reshape(-1, self._num_channels) layout = "mono" if self._num_channels == 1 else "stereo" frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout) frame.sample_rate = self._sample_rate return frame def _build_silence_frame(self) -> AudioFrame: frame = AudioFrame( format="s16", layout="mono" if self._num_channels == 1 else "stereo", samples=self._default_samples, ) for plane in frame.planes: plane.update(bytes(plane.buffer_size)) frame.sample_rate = self._sample_rate return frame async def recv(self) -> AudioFrame: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time.time() if len(self._audio_buffer) >= self._chunk_size: chunk = self._audio_buffer[: self._chunk_size] del self._audio_buffer[: self._chunk_size] frame = self._build_audio_frame(bytes(chunk)) self._is_speaking = True else: if self._stopped: raise MediaStreamError("Track ended") frame = self._build_silence_frame() self._is_speaking = False samples = frame.samples or self._default_samples pts = self._timestamp wait = self._start + (pts / self._sample_rate) - time.time() if wait > 0: await asyncio.sleep(wait) frame.pts = pts frame.time_base = self._time_base self._timestamp += samples return frame async def stop(self) -> None: self._stopped = True self._audio_buffer.clear()Custom audio track that reconstructs steady 20 ms PCM frames from incoming audio pushed by AvatarSynchronizer. Produces silence when the buffer is empty.
Ancestors
- videosdk.custom_audio_track.CustomAudioTrack
- vsaiortc.mediastreams.MediaStreamTrack
- pyee.asyncio.AsyncIOEventEmitter
- pyee.base.EventEmitter
Class variables
var AUDIO_PTIMEvar MAX_BUFFER_DURATION
Methods
async def put_frame(self, frame: AudioFrame) ‑> None-
Expand source code
async def put_frame(self, frame: AudioFrame) -> None: if self._stopped: return try: pcm_bytes = bytes(frame.planes[0]) except Exception: pcm_bytes = frame.to_ndarray().tobytes() if not pcm_bytes: return self._audio_buffer.extend(pcm_bytes) if len(self._audio_buffer) > self._max_buffer_bytes: overflow = len(self._audio_buffer) - self._max_buffer_bytes del self._audio_buffer[:overflow] async def recv(self) ‑> av.audio.frame.AudioFrame-
Expand source code
async def recv(self) -> AudioFrame: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time.time() if len(self._audio_buffer) >= self._chunk_size: chunk = self._audio_buffer[: self._chunk_size] del self._audio_buffer[: self._chunk_size] frame = self._build_audio_frame(bytes(chunk)) self._is_speaking = True else: if self._stopped: raise MediaStreamError("Track ended") frame = self._build_silence_frame() self._is_speaking = False samples = frame.samples or self._default_samples pts = self._timestamp wait = self._start + (pts / self._sample_rate) - time.time() if wait > 0: await asyncio.sleep(wait) frame.pts = pts frame.time_base = self._time_base self._timestamp += samples return frameReceive the next :class:
~av.audio.frame.AudioFrame.The base implementation just reads silence, subclass :class:
AudioStreamTrackto provide a useful implementation. async def stop(self) ‑> None-
Expand source code
async def stop(self) -> None: self._stopped = True self._audio_buffer.clear()