Module agents.avatar.avatar_audio_io
Classes
class AvatarAudioIn (meeting: Meeting | None, *, channels: int = 1, sample_rate: int = 24000)-
Expand source code
class AvatarAudioIn(AvatarInput): """ Avatar-worker-side receiver. Listens for data-channel messages from the agent and exposes them as an async iterator of AudioFrame / AudioSegmentEnd items. Control messages: - Raw bytes → reconstruct AudioFrame and enqueue it. - INTERRUPT → clear the queue and emit ``reset_stream``. - segment_end JSON → enqueue AudioSegmentEnd. Note: VideoSDK broadcasts data-channel messages to all participants, so every participant in the room sees every message. """ _INTERRUPT_COOLDOWN = 0.3 def __init__( self, meeting: Meeting | None, *, channels: int = 1, sample_rate: int = 24000, ): super().__init__() self._channels = channels self._sample_rate = sample_rate self._data_ch: asyncio.Queue[AudioFrame | AudioSegmentEnd] = asyncio.Queue() self._handler: _AvatarDataHandler | None = None self._meeting: Meeting | None = None self._interrupt_until: float = 0.0 if meeting: self.set_meeting(meeting) def set_meeting(self, meeting: Meeting) -> None: if self._meeting and self._handler: try: self._meeting.remove_event_listener(self._handler) except Exception: pass self._handler = None self._meeting = meeting self._handler = _AvatarDataHandler(callback=self._on_data) self._meeting.add_event_listener(self._handler) logger.info("AvatarAudioIn attached to meeting") def notify_stream_ended(self, playback_position: float, interrupted: bool) -> None: asyncio.create_task(self._send_stream_ended(playback_position, interrupted)) def __aiter__(self): return self async def __anext__(self) -> AudioFrame | AudioSegmentEnd: try: return await self._data_ch.get() except asyncio.CancelledError: raise StopAsyncIteration async def aclose(self) -> None: if self._meeting and self._handler: try: self._meeting.remove_event_listener(self._handler) except Exception: pass self._handler = None def _on_data(self, data: dict) -> None: payload = data.get("payload", b"") try: if isinstance(payload, memoryview): payload = payload.tobytes() if payload == MSG_INTERRUPT or payload == "INTERRUPT": self._handle_interrupt() return if isinstance(payload, (bytes, bytearray)): self._handle_audio_bytes(payload) return if isinstance(payload, str): self._handle_text_payload(payload) except Exception as e: logger.error("AvatarAudioIn: error processing message: %s", e) def _handle_audio_bytes(self, raw: bytes) -> None: import time as _time if _time.monotonic() < self._interrupt_until: return if len(raw) % 2 != 0: raw = raw + b"\x00" array = np.frombuffer(raw, dtype=np.int16) mono = array.reshape(-1, 1) if self._channels == 2: stereo = np.column_stack([mono[:, 0], mono[:, 0]]) array_out = stereo else: array_out = mono frame = AudioFrame.from_ndarray( array_out.T, format="s16", layout="mono" if self._channels == 1 else "stereo", ) frame.sample_rate = self._sample_rate self._data_ch.put_nowait(frame) def _handle_interrupt(self) -> None: import time as _time self._interrupt_until = _time.monotonic() + self._INTERRUPT_COOLDOWN while not self._data_ch.empty(): try: self._data_ch.get_nowait() except asyncio.QueueEmpty: break self.emit("reset_stream") logger.info("AvatarAudioIn: INTERRUPT received, buffer cleared, cooldown %.1fs", self._INTERRUPT_COOLDOWN) def _handle_text_payload(self, payload: str) -> None: if not payload: return try: msg = json.loads(payload) except (json.JSONDecodeError, ValueError): return if msg.get("type") == MSG_TYPE_SEGMENT_END: self._data_ch.put_nowait(AudioSegmentEnd()) logger.debug("AvatarAudioIn: segment_end received") elif msg.get("type") == MSG_TYPE_STREAM_ENDED: pass async def _send_stream_ended(self, playback_position: float, interrupted: bool) -> None: if not self._meeting: return payload = json.dumps( { "type": MSG_TYPE_STREAM_ENDED, "data": { "playback_position": playback_position, "interrupted": interrupted, }, } ) await self._meeting.send( payload, {"reliability": ReliabilityModes.RELIABLE.value} ) logger.debug( "AvatarAudioIn: sent stream_ended (pos=%.3fs, interrupted=%s)", playback_position, interrupted, )Avatar-worker-side receiver.
Listens for data-channel messages from the agent and exposes them as an async iterator of AudioFrame / AudioSegmentEnd items. Control messages: - Raw bytes → reconstruct AudioFrame and enqueue it. - INTERRUPT → clear the queue and emit
reset_stream. - segment_end JSON → enqueue AudioSegmentEnd.Note: VideoSDK broadcasts data-channel messages to all participants, so every participant in the room sees every message.
Ancestors
- AvatarInput
- abc.ABC
- EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if self._meeting and self._handler: try: self._meeting.remove_event_listener(self._handler) except Exception: pass self._handler = None def set_meeting(self, meeting: Meeting) ‑> None-
Expand source code
def set_meeting(self, meeting: Meeting) -> None: if self._meeting and self._handler: try: self._meeting.remove_event_listener(self._handler) except Exception: pass self._handler = None self._meeting = meeting self._handler = _AvatarDataHandler(callback=self._on_data) self._meeting.add_event_listener(self._handler) logger.info("AvatarAudioIn attached to meeting")
Inherited members
class AvatarAudioOut (*,
credentials: AvatarAuthCredentials,
avatar_dispatcher_url: Optional[str] = None,
room_id: Optional[str] = None)-
Expand source code
class AvatarAudioOut: """ Agent-side handle for the avatar data channel. Responsibilities: - Spin up the Avatar Server via an HTTP dispatcher. - Stream raw PCM audio chunks to the worker (UNRELIABLE). - Send ``segment_end`` control messages (RELIABLE) so the worker knows when a TTS turn has finished — this is what allows ``notify_stream_ended`` to fire on the worker side. - Send ``INTERRUPT`` (RELIABLE) when the agent interrupts its output. - Receive ``stream_ended`` acks from the worker via an on_data listener. """ def __init__( self, *, credentials: AvatarAuthCredentials, avatar_dispatcher_url: Optional[str] = None, room_id: Optional[str] = None, ): self._credentials = credentials self._avatar_dispatcher_url = avatar_dispatcher_url self._room_id = room_id self._meeting: Meeting | None = None self._ack_handler: _AvatarAckHandler | None = None self._participant_id: str = credentials.participant_id self.video_track = None self.audio_track = None def set_room_id(self, room_id: str) -> None: self._room_id = room_id @property def participant_id(self) -> str: return self._participant_id async def connect(self) -> None: """Call the avatar dispatcher so the worker process joins the room.""" await self._avatar_spinup() def _set_meeting(self, meeting: Meeting) -> None: """ Inject the live Meeting object. Called by the framework after the agent has joined the room. Also registers the ack listener. """ self._meeting = meeting self._ack_handler = _AvatarAckHandler(on_stream_ended=self._on_stream_ended) self._meeting.add_event_listener(self._ack_handler) logger.info("AvatarAudioOut attached to meeting: %s", meeting.id) async def _avatar_spinup(self) -> None: if not self._avatar_dispatcher_url: logger.info("AvatarAudioOut: No dispatcher URL provided, skipping local avatar spinup.") return if not self._room_id: raise ValueError("room_id must be set before calling connect()") join_info = AvatarJoinInfo( room_name=self._room_id, token=self._credentials.token, participant_id=self._credentials.participant_id, ) logger.info( "Sending connection info to avatar dispatcher %s (participant_id=%s)", self._avatar_dispatcher_url, self._credentials.participant_id, ) async with httpx.AsyncClient() as client: response = await client.post( self._avatar_dispatcher_url, json=asdict(join_info) ) response.raise_for_status() logger.info("Avatar handshake completed") async def handle_audio_input(self, audio_data: bytes) -> None: """ Chunk and send raw PCM bytes to the Avatar Server via data channel. Uses UNRELIABLE mode for low-latency streaming. """ if not self._meeting: return MAX_CHUNK = 15_000 for i in range(0, len(audio_data), MAX_CHUNK): chunk = audio_data[i : i + MAX_CHUNK] if not chunk: continue if len(chunk) % 2 != 0: chunk = chunk + b"\x00" try: await self._meeting.send( chunk, {"reliability": ReliabilityModes.UNRELIABLE.value} ) except Exception: # Data channel closed (e.g. participant left) — stop sending logger.debug("AvatarAudioOut: data channel closed, dropping remaining audio") self._meeting = None return async def send_segment_end(self) -> None: """ Notify the Avatar Server that the current TTS segment has finished. This causes the receiver to enqueue AudioSegmentEnd so the controller can call notify_stream_ended(interrupted=False). """ if not self._meeting: return try: payload = json.dumps({"type": MSG_TYPE_SEGMENT_END}) await self._meeting.send( payload, {"reliability": ReliabilityModes.RELIABLE.value} ) logger.debug("AvatarAudioOut: sent segment_end") except Exception: self._meeting = None async def interrupt(self) -> None: """Tell the Avatar Server to immediately stop playback.""" if not self._meeting: return try: await self._meeting.send( MSG_INTERRUPT, {"reliability": ReliabilityModes.RELIABLE.value} ) logger.info("AvatarAudioOut: sent INTERRUPT to Avatar Server") except Exception: self._meeting = None def _on_stream_ended(self, playback_position: float, interrupted: bool) -> None: logger.info( "AvatarAudioOut: stream_ended ack received (pos=%.3fs, interrupted=%s)", playback_position, interrupted, ) async def aclose(self) -> None: if self._meeting and self._ack_handler: try: self._meeting.remove_event_listener(self._ack_handler) except Exception: pass self._ack_handler = NoneAgent-side handle for the avatar data channel.
Responsibilities: - Spin up the Avatar Server via an HTTP dispatcher. - Stream raw PCM audio chunks to the worker (UNRELIABLE). - Send
segment_endcontrol messages (RELIABLE) so the worker knows when a TTS turn has finished — this is what allowsnotify_stream_endedto fire on the worker side. - SendINTERRUPT(RELIABLE) when the agent interrupts its output. - Receivestream_endedacks from the worker via an on_data listener.Instance variables
prop participant_id : str-
Expand source code
@property def participant_id(self) -> str: return self._participant_id
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if self._meeting and self._ack_handler: try: self._meeting.remove_event_listener(self._ack_handler) except Exception: pass self._ack_handler = None async def connect(self) ‑> None-
Expand source code
async def connect(self) -> None: """Call the avatar dispatcher so the worker process joins the room.""" await self._avatar_spinup()Call the avatar dispatcher so the worker process joins the room.
async def handle_audio_input(self, audio_data: bytes) ‑> None-
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None: """ Chunk and send raw PCM bytes to the Avatar Server via data channel. Uses UNRELIABLE mode for low-latency streaming. """ if not self._meeting: return MAX_CHUNK = 15_000 for i in range(0, len(audio_data), MAX_CHUNK): chunk = audio_data[i : i + MAX_CHUNK] if not chunk: continue if len(chunk) % 2 != 0: chunk = chunk + b"\x00" try: await self._meeting.send( chunk, {"reliability": ReliabilityModes.UNRELIABLE.value} ) except Exception: # Data channel closed (e.g. participant left) — stop sending logger.debug("AvatarAudioOut: data channel closed, dropping remaining audio") self._meeting = None returnChunk and send raw PCM bytes to the Avatar Server via data channel. Uses UNRELIABLE mode for low-latency streaming.
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Tell the Avatar Server to immediately stop playback.""" if not self._meeting: return try: await self._meeting.send( MSG_INTERRUPT, {"reliability": ReliabilityModes.RELIABLE.value} ) logger.info("AvatarAudioOut: sent INTERRUPT to Avatar Server") except Exception: self._meeting = NoneTell the Avatar Server to immediately stop playback.
async def send_segment_end(self) ‑> None-
Expand source code
async def send_segment_end(self) -> None: """ Notify the Avatar Server that the current TTS segment has finished. This causes the receiver to enqueue AudioSegmentEnd so the controller can call notify_stream_ended(interrupted=False). """ if not self._meeting: return try: payload = json.dumps({"type": MSG_TYPE_SEGMENT_END}) await self._meeting.send( payload, {"reliability": ReliabilityModes.RELIABLE.value} ) logger.debug("AvatarAudioOut: sent segment_end") except Exception: self._meeting = NoneNotify the Avatar Server that the current TTS segment has finished. This causes the receiver to enqueue AudioSegmentEnd so the controller can call notify_stream_ended(interrupted=False).
def set_room_id(self, room_id: str) ‑> None-
Expand source code
def set_room_id(self, room_id: str) -> None: self._room_id = room_id