Module agents.room.audio_stream

Classes

class CustomAudioStreamTrack (loop)
Expand source code
class CustomAudioStreamTrack(CustomAudioTrack):
    """
    Base audio track implementation using a frame buffer.
    Audio frames are created as soon as audio data is received.
    """
    def __init__(self, loop):
        super().__init__()
        self.loop = loop
        self._start = None
        self._timestamp = 0
        self.frame_buffer = []
        self.audio_data_buffer = bytearray()
        self.frame_time = 0
        self.sample_rate = 24000
        self.channels = 1
        self.sample_width = 2
        self.time_base_fraction = Fraction(1, self.sample_rate)
        self.samples = int(AUDIO_PTIME * self.sample_rate)
        self.chunk_size = int(self.samples * self.channels * self.sample_width)
        self._is_speaking = False

    def interrupt(self):
        self.frame_buffer.clear()
        self.audio_data_buffer.clear()

    def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None:
        """Set callback for when the final audio byte of synthesis is produced"""
        logger.info("on last audio callback")
        self._last_audio_callback = callback
            
    async def add_new_bytes(self, audio_data: bytes):
        global_event_emitter.emit("ON_SPEECH_OUT", {"audio_data": audio_data})
        self.audio_data_buffer += audio_data

        while len(self.audio_data_buffer) >= self.chunk_size:
            chunk = self.audio_data_buffer[: self.chunk_size]
            self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :]
            try:
                audio_frame = self.buildAudioFrames(chunk)
                self.frame_buffer.append(audio_frame)
                logger.debug(
                    f"Added audio frame to buffer, total frames: {len(self.frame_buffer)}"
                )
            except Exception as e:
                logger.error(f"Error building audio frame: {e}")
                break

    def buildAudioFrames(self, chunk: bytes) -> AudioFrame:
        if len(chunk) != self.chunk_size:
            logger.warning(
                f"Incorrect chunk size received {len(chunk)}, expected {self.chunk_size}"
            )

        data = np.frombuffer(chunk, dtype=np.int16)
        expected_samples = self.samples * self.channels
        if len(data) != expected_samples:
            logger.warning(
                f"Incorrect number of samples in chunk {len(data)}, expected {expected_samples}"
            )

        data = data.reshape(-1, self.channels)
        layout = "mono" if self.channels == 1 else "stereo"

        audio_frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout)
        return audio_frame

    def next_timestamp(self):
        pts = int(self.frame_time)
        time_base = self.time_base_fraction
        self.frame_time += self.samples
        return pts, time_base

    async def recv(self) -> AudioFrame:
        try:
            if self.readyState != "live":
                raise MediaStreamError

            if self._start is None:
                self._start = time()
                self._timestamp = 0
            else:
                self._timestamp += self.samples

            wait = self._start + (self._timestamp / self.sample_rate) - time()

            if wait > 0:
                await asyncio.sleep(wait)

            pts, time_base = self.next_timestamp()

            if len(self.frame_buffer) > 0:
                frame = self.frame_buffer.pop(0)
                self._is_speaking = True
            else:
                # No audio data available — silence
                if getattr(self, "_is_speaking", False):
                    logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.")
                    self._is_speaking = False

                    if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
                        asyncio.create_task(self._last_audio_callback())

                # Produce silence frame
                frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
                for p in frame.planes:
                    p.update(bytes(p.buffer_size))

            frame.pts = pts
            frame.time_base = time_base
            frame.sample_rate = self.sample_rate
            return frame
        except Exception as e:
            traceback.print_exc()
            logger.error(f"Error while creating tts->rtc frame: {e}")

    async def cleanup(self):
        self.interrupt()
        self.stop()

Base audio track implementation using a frame buffer. Audio frames are created as soon as audio data is received.

Ancestors

  • videosdk.custom_audio_track.CustomAudioTrack
  • vsaiortc.mediastreams.MediaStreamTrack
  • pyee.asyncio.AsyncIOEventEmitter
  • pyee.base.EventEmitter

Subclasses

Methods

async def add_new_bytes(self, audio_data: bytes)
Expand source code
async def add_new_bytes(self, audio_data: bytes):
    global_event_emitter.emit("ON_SPEECH_OUT", {"audio_data": audio_data})
    self.audio_data_buffer += audio_data

    while len(self.audio_data_buffer) >= self.chunk_size:
        chunk = self.audio_data_buffer[: self.chunk_size]
        self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :]
        try:
            audio_frame = self.buildAudioFrames(chunk)
            self.frame_buffer.append(audio_frame)
            logger.debug(
                f"Added audio frame to buffer, total frames: {len(self.frame_buffer)}"
            )
        except Exception as e:
            logger.error(f"Error building audio frame: {e}")
            break
def buildAudioFrames(self, chunk: bytes) ‑> av.audio.frame.AudioFrame
Expand source code
def buildAudioFrames(self, chunk: bytes) -> AudioFrame:
    if len(chunk) != self.chunk_size:
        logger.warning(
            f"Incorrect chunk size received {len(chunk)}, expected {self.chunk_size}"
        )

    data = np.frombuffer(chunk, dtype=np.int16)
    expected_samples = self.samples * self.channels
    if len(data) != expected_samples:
        logger.warning(
            f"Incorrect number of samples in chunk {len(data)}, expected {expected_samples}"
        )

    data = data.reshape(-1, self.channels)
    layout = "mono" if self.channels == 1 else "stereo"

    audio_frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout)
    return audio_frame
async def cleanup(self)
Expand source code
async def cleanup(self):
    self.interrupt()
    self.stop()
def interrupt(self)
Expand source code
def interrupt(self):
    self.frame_buffer.clear()
    self.audio_data_buffer.clear()
def next_timestamp(self)
Expand source code
def next_timestamp(self):
    pts = int(self.frame_time)
    time_base = self.time_base_fraction
    self.frame_time += self.samples
    return pts, time_base
def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) ‑> None
Expand source code
def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None:
    """Set callback for when the final audio byte of synthesis is produced"""
    logger.info("on last audio callback")
    self._last_audio_callback = callback

Set callback for when the final audio byte of synthesis is produced

async def recv(self) ‑> av.audio.frame.AudioFrame
Expand source code
async def recv(self) -> AudioFrame:
    try:
        if self.readyState != "live":
            raise MediaStreamError

        if self._start is None:
            self._start = time()
            self._timestamp = 0
        else:
            self._timestamp += self.samples

        wait = self._start + (self._timestamp / self.sample_rate) - time()

        if wait > 0:
            await asyncio.sleep(wait)

        pts, time_base = self.next_timestamp()

        if len(self.frame_buffer) > 0:
            frame = self.frame_buffer.pop(0)
            self._is_speaking = True
        else:
            # No audio data available — silence
            if getattr(self, "_is_speaking", False):
                logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.")
                self._is_speaking = False

                if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
                    asyncio.create_task(self._last_audio_callback())

            # Produce silence frame
            frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
            for p in frame.planes:
                p.update(bytes(p.buffer_size))

        frame.pts = pts
        frame.time_base = time_base
        frame.sample_rate = self.sample_rate
        return frame
    except Exception as e:
        traceback.print_exc()
        logger.error(f"Error while creating tts->rtc frame: {e}")

Receive the next :class:~av.audio.frame.AudioFrame.

The base implementation just reads silence, subclass :class:AudioStreamTrack to provide a useful implementation.

class MediaStreamError (*args, **kwargs)
Expand source code
class MediaStreamError(Exception):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class MixingCustomAudioStreamTrack (loop)
Expand source code
class MixingCustomAudioStreamTrack(CustomAudioStreamTrack):
    """
    Audio track implementation with mixing capabilities.
    Inherits from CustomAudioStreamTrack and overrides methods to handle mixing.
    Frames are created just-in-time in the recv method.
    """
    def __init__(self, loop):
        super().__init__(loop)
        self.background_audio_buffer = bytearray()

    def interrupt(self):
        super().interrupt()
        self.background_audio_buffer.clear()

    async def add_new_bytes(self, audio_data: bytes):
        """Overrides base method to buffer bytes instead of creating frames."""
        global_event_emitter.emit("ON_SPEECH_OUT", {"audio_data": audio_data})
        self.audio_data_buffer += audio_data

    async def add_background_bytes(self, audio_data: bytes):
        self.background_audio_buffer += audio_data

    def mix_audio(self, primary_chunk, background_chunk):
        if not background_chunk:
            return primary_chunk

        primary_arr = np.frombuffer(primary_chunk, dtype=np.int16)
        background_arr = np.frombuffer(background_chunk, dtype=np.int16)

        if len(background_arr) < len(primary_arr):
            background_arr = np.pad(background_arr, (0, len(primary_arr) - len(background_arr)), 'constant')
        elif len(background_arr) > len(primary_arr):
            background_arr = background_arr[:len(primary_arr)]

        mixed_arr = np.add(primary_arr, background_arr, dtype=np.int16)
        return mixed_arr.tobytes()

    async def recv(self) -> AudioFrame:
        """
        Overrides base method to perform mixing and just-in-time frame creation.
        """
        try:
            if self.readyState != "live":
                raise MediaStreamError

            if self._start is None:
                self._start = time()
                self._timestamp = 0
            else:
                self._timestamp += self.samples

            wait = self._start + (self._timestamp / self.sample_rate) - time()
            if wait > 0:
                await asyncio.sleep(wait)

            pts, time_base = self.next_timestamp()

            primary_chunk = b''
            has_primary = len(self.audio_data_buffer) >= self.chunk_size
            if has_primary:
                primary_chunk = self.audio_data_buffer[: self.chunk_size]
                self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :]

            background_chunk = b''
            has_background = len(self.background_audio_buffer) >= self.chunk_size
            if has_background:
                background_chunk = self.background_audio_buffer[: self.chunk_size]
                self.background_audio_buffer = self.background_audio_buffer[self.chunk_size :]
            
            final_chunk = None
            if has_primary:
                final_chunk = self.mix_audio(primary_chunk, background_chunk)
            elif has_background:
                final_chunk = background_chunk
            
            if final_chunk:
                frame = self.buildAudioFrames(final_chunk)
            else:
                frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
                for p in frame.planes:
                    p.update(bytes(p.buffer_size))

            frame.pts = pts
            frame.time_base = time_base
            frame.sample_rate = self.sample_rate
            return frame
        except Exception as e:
            traceback.print_exc()
            logger.error(f"Error while creating tts->rtc frame: {e}")

Audio track implementation with mixing capabilities. Inherits from CustomAudioStreamTrack and overrides methods to handle mixing. Frames are created just-in-time in the recv method.

Ancestors

  • CustomAudioStreamTrack
  • videosdk.custom_audio_track.CustomAudioTrack
  • vsaiortc.mediastreams.MediaStreamTrack
  • pyee.asyncio.AsyncIOEventEmitter
  • pyee.base.EventEmitter

Subclasses

Methods

async def add_background_bytes(self, audio_data: bytes)
Expand source code
async def add_background_bytes(self, audio_data: bytes):
    self.background_audio_buffer += audio_data
async def add_new_bytes(self, audio_data: bytes)
Expand source code
async def add_new_bytes(self, audio_data: bytes):
    """Overrides base method to buffer bytes instead of creating frames."""
    global_event_emitter.emit("ON_SPEECH_OUT", {"audio_data": audio_data})
    self.audio_data_buffer += audio_data

Overrides base method to buffer bytes instead of creating frames.

def interrupt(self)
Expand source code
def interrupt(self):
    super().interrupt()
    self.background_audio_buffer.clear()
def mix_audio(self, primary_chunk, background_chunk)
Expand source code
def mix_audio(self, primary_chunk, background_chunk):
    if not background_chunk:
        return primary_chunk

    primary_arr = np.frombuffer(primary_chunk, dtype=np.int16)
    background_arr = np.frombuffer(background_chunk, dtype=np.int16)

    if len(background_arr) < len(primary_arr):
        background_arr = np.pad(background_arr, (0, len(primary_arr) - len(background_arr)), 'constant')
    elif len(background_arr) > len(primary_arr):
        background_arr = background_arr[:len(primary_arr)]

    mixed_arr = np.add(primary_arr, background_arr, dtype=np.int16)
    return mixed_arr.tobytes()
async def recv(self) ‑> av.audio.frame.AudioFrame
Expand source code
async def recv(self) -> AudioFrame:
    """
    Overrides base method to perform mixing and just-in-time frame creation.
    """
    try:
        if self.readyState != "live":
            raise MediaStreamError

        if self._start is None:
            self._start = time()
            self._timestamp = 0
        else:
            self._timestamp += self.samples

        wait = self._start + (self._timestamp / self.sample_rate) - time()
        if wait > 0:
            await asyncio.sleep(wait)

        pts, time_base = self.next_timestamp()

        primary_chunk = b''
        has_primary = len(self.audio_data_buffer) >= self.chunk_size
        if has_primary:
            primary_chunk = self.audio_data_buffer[: self.chunk_size]
            self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :]

        background_chunk = b''
        has_background = len(self.background_audio_buffer) >= self.chunk_size
        if has_background:
            background_chunk = self.background_audio_buffer[: self.chunk_size]
            self.background_audio_buffer = self.background_audio_buffer[self.chunk_size :]
        
        final_chunk = None
        if has_primary:
            final_chunk = self.mix_audio(primary_chunk, background_chunk)
        elif has_background:
            final_chunk = background_chunk
        
        if final_chunk:
            frame = self.buildAudioFrames(final_chunk)
        else:
            frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
            for p in frame.planes:
                p.update(bytes(p.buffer_size))

        frame.pts = pts
        frame.time_base = time_base
        frame.sample_rate = self.sample_rate
        return frame
    except Exception as e:
        traceback.print_exc()
        logger.error(f"Error while creating tts->rtc frame: {e}")

Overrides base method to perform mixing and just-in-time frame creation.

Inherited members

class TeeCustomAudioStreamTrack (loop, sinks=None, pipeline=None)
Expand source code
class TeeCustomAudioStreamTrack(CustomAudioStreamTrack):
    def __init__(self, loop, sinks=None, pipeline=None):
        super().__init__(loop)
        self.sinks = sinks if sinks is not None else []
        self.pipeline = pipeline

    async def add_new_bytes(self, audio_data: bytes):
        await super().add_new_bytes(audio_data)

        # Route audio to sinks (avatars, etc.)
        for sink in self.sinks:
            if hasattr(sink, "handle_audio_input"):
                await sink.handle_audio_input(audio_data)

        # DO NOT route agent's own TTS audio back to pipeline
        # The pipeline should only receive audio from other participants
        # This prevents the agent from hearing itself speak

Base audio track implementation using a frame buffer. Audio frames are created as soon as audio data is received.

Ancestors

  • CustomAudioStreamTrack
  • videosdk.custom_audio_track.CustomAudioTrack
  • vsaiortc.mediastreams.MediaStreamTrack
  • pyee.asyncio.AsyncIOEventEmitter
  • pyee.base.EventEmitter

Methods

async def add_new_bytes(self, audio_data: bytes)
Expand source code
async def add_new_bytes(self, audio_data: bytes):
    await super().add_new_bytes(audio_data)

    # Route audio to sinks (avatars, etc.)
    for sink in self.sinks:
        if hasattr(sink, "handle_audio_input"):
            await sink.handle_audio_input(audio_data)

    # DO NOT route agent's own TTS audio back to pipeline
    # The pipeline should only receive audio from other participants
    # This prevents the agent from hearing itself speak

Inherited members

class TeeMixingCustomAudioStreamTrack (loop, sinks=None, pipeline=None)
Expand source code
class TeeMixingCustomAudioStreamTrack(MixingCustomAudioStreamTrack):
    def __init__(self, loop, sinks=None, pipeline=None):
        super().__init__(loop)
        self.sinks = sinks if sinks is not None else []
        self.pipeline = pipeline

    async def add_new_bytes(self, audio_data: bytes):
        await super().add_new_bytes(audio_data)

        # Route audio to sinks (avatars, etc.)
        for sink in self.sinks:
            if hasattr(sink, "handle_audio_input"):
                await sink.handle_audio_input(audio_data)

Audio track implementation with mixing capabilities. Inherits from CustomAudioStreamTrack and overrides methods to handle mixing. Frames are created just-in-time in the recv method.

Ancestors

Inherited members