Module agents.avatar.avatar_schema

Classes

class AudioSegmentEnd
Expand source code
class AudioSegmentEnd:
    """Sentinel yielded by an AvatarRenderer to signal that a TTS segment has fully played."""
    pass

Sentinel yielded by an AvatarRenderer to signal that a TTS segment has fully played.

class AvatarInput
Expand source code
class AvatarInput(ABC, EventEmitter[Literal["reset_stream"]]):
    """
    Abstract base for the avatar-worker-side audio receiver.

    The Avatar Server iterates over an AvatarInput to receive AudioFrame objects
    from the agent. When the agent finishes a TTS segment it sends a
    ``segment_end`` control message which causes the receiver to enqueue an
    AudioSegmentEnd sentinel. The receiver also emits a ``reset_stream`` event
    when an interrupt arrives from the agent.
    """

    def __init__(self):
        super().__init__()

    async def start_stream(self) -> None:
        """Optional hook called before the first frame is consumed."""
        pass

    @abstractmethod
    def notify_stream_ended(
        self, playback_position: float, interrupted: bool
    ) -> None | Coroutine[None, None, None]:
        """Send a stream_ended ack back to the agent."""

    @abstractmethod
    def __aiter__(self) -> AsyncIterator[AudioFrame | AudioSegmentEnd]:
        """Yield AudioFrame items, then AudioSegmentEnd when the segment is done."""

    async def aclose(self) -> None:
        pass

Abstract base for the avatar-worker-side audio receiver.

The Avatar Server iterates over an AvatarInput to receive AudioFrame objects from the agent. When the agent finishes a TTS segment it sends a segment_end control message which causes the receiver to enqueue an AudioSegmentEnd sentinel. The receiver also emits a reset_stream event when an interrupt arrives from the agent.

Ancestors

Subclasses

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    pass
def notify_stream_ended(self, playback_position: float, interrupted: bool) ‑> None | collections.abc.Coroutine[None, None, None]
Expand source code
@abstractmethod
def notify_stream_ended(
    self, playback_position: float, interrupted: bool
) -> None | Coroutine[None, None, None]:
    """Send a stream_ended ack back to the agent."""

Send a stream_ended ack back to the agent.

async def start_stream(self) ‑> None
Expand source code
async def start_stream(self) -> None:
    """Optional hook called before the first frame is consumed."""
    pass

Optional hook called before the first frame is consumed.

Inherited members

class AvatarRenderer
Expand source code
class AvatarRenderer(ABC):
    """
    Abstract base for the avatar-worker-side video/audio renderer.

    An AvatarRenderer receives audio frames via push_stream_chunk and yields
    interleaved VideoFrame / AudioFrame items plus an AudioSegmentEnd sentinel
    when the current segment has been fully rendered.
    """

    @abstractmethod
    async def push_stream_chunk(self, frame: AudioFrame | AudioSegmentEnd) -> None:
        """Receive an audio frame (or segment-end sentinel) from the controller."""

    @abstractmethod
    def reset_stream(self) -> None | Coroutine[None, None, None]:
        """Immediately discard buffered audio (called on interrupt)."""

    @abstractmethod
    def __aiter__(
        self,
    ) -> AsyncIterator[VideoFrame | AudioFrame | AudioSegmentEnd]:
        """Yield interleaved video+audio frames, then AudioSegmentEnd."""

Abstract base for the avatar-worker-side video/audio renderer.

An AvatarRenderer receives audio frames via push_stream_chunk and yields interleaved VideoFrame / AudioFrame items plus an AudioSegmentEnd sentinel when the current segment has been fully rendered.

Ancestors

  • abc.ABC

Methods

async def push_stream_chunk(self,
frame: AudioFrame | AudioSegmentEnd) ‑> None
Expand source code
@abstractmethod
async def push_stream_chunk(self, frame: AudioFrame | AudioSegmentEnd) -> None:
    """Receive an audio frame (or segment-end sentinel) from the controller."""

Receive an audio frame (or segment-end sentinel) from the controller.

def reset_stream(self) ‑> None | collections.abc.Coroutine[None, None, None]
Expand source code
@abstractmethod
def reset_stream(self) -> None | Coroutine[None, None, None]:
    """Immediately discard buffered audio (called on interrupt)."""

Immediately discard buffered audio (called on interrupt).