Module agents.room.output_stream

Classes

class CustomAudioStreamTrack (loop)
Expand source code
class CustomAudioStreamTrack(CustomAudioTrack):
    """
    Base audio track implementation using a frame buffer.
    Audio frames are created as soon as audio data is received.
    
    Supports optional pause/resume for false-interrupt detection while maintaining
    compatibility with avatar plugins that need simple audio flow.
    """
    def __init__(self, loop):
        super().__init__()
        self.loop = loop
        self._start = None
        self._timestamp = 0
        self.frame_buffer = []
        self.audio_data_buffer = bytearray()
        self.frame_time = 0
        self.sample_rate = 24000
        self.channels = 1
        self.sample_width = 2
        self.time_base_fraction = Fraction(1, self.sample_rate)
        self.samples = int(AUDIO_PTIME * self.sample_rate)
        self.chunk_size = int(self.samples * self.channels * self.sample_width)
        self._synthesis_complete = False
        self._needs_last_audio_callback = False
        self._last_speaking_time = 0.0
        self._speaking_grace_period = 0.2 # 200ms grace period for jitter

        # Pause/resume support - simple flag-based (no blocking)
        self._is_paused = False
        self._paused_frames = []  # Separate buffer for paused content
        self._accepting_audio = True
        self._manual_audio_control = False

        # Fade-out on real interruption (seconds). 0 disables the fade and
        # restores the legacy instant buffer-clear.
        self.interrupt_fade_duration: float = 0.4
        self._faded_tail_pending: bool = False

        self._samples_played: int = 0
        self._cumulative_input_samples: int = 0
        self._synthesis_start_played: int = 0
        self._synthesis_start_pushed: int = 0


    @property
    def can_pause(self) -> bool:
        """Returns True if this track supports pause/resume operations"""
        return True
    def mark_synthesis_start(self) -> None:
        """Anchor the per-synthesis baseline for played/pushed sample counters.
        Called by SpeechGeneration at the top of each synthesize() so that
        snapshot_playback() returns deltas relative to this turn only.
        """
        self._synthesis_start_played = self._samples_played
        self._synthesis_start_pushed = self._cumulative_input_samples

    def snapshot_playback(self) -> tuple[int, int]:
        """Return (samples_played, samples_pushed) deltas for the active synthesis."""
        played = max(0, self._samples_played - self._synthesis_start_played)
        pushed = max(0, self._cumulative_input_samples - self._synthesis_start_pushed)
        return played, pushed


    def _frame_to_pcm_bytes(self, frame: AudioFrame) -> bytes:
        """Extract raw int16 PCM bytes from an AudioFrame built by buildAudioFrames().

        Uses ``to_ndarray()`` (the exact inverse of ``from_ndarray()``) rather
        than reading ``frame.planes[0]`` directly, since a plane's buffer can be
        alignment-padded beyond ``samples * sample_width``.
        """
        arr = frame.to_ndarray()
        return arr.astype(np.int16, copy=False).tobytes()

    def _apply_fade_out(self, pcm: bytes) -> bytes:
        """Apply an exponential fade-out (0 dB → ~-60 dB) to mono int16 PCM.

        Mono assumption (channels == 1): the flat sample array is the per-sample
        timeline. A multi-channel track would need the ramp repeated per channel.
        """
        samples = np.frombuffer(pcm, dtype=np.int16)
        n = samples.shape[0]
        if n == 0:
            return b""
        t = np.linspace(0.0, 1.0, num=n, dtype=np.float32)
        gain = np.power(10.0, -3.0 * t, dtype=np.float32) 
        gain[-1] = 0.0  
        faded = np.clip(
            samples.astype(np.float32) * gain,
            np.iinfo(np.int16).min,
            np.iinfo(np.int16).max,
        )
        return faded.astype(np.int16).tobytes()

    def _collect_pending_pcm(self) -> bytes:
        """Return the TTS PCM still queued for playback, for fade-out on interrupt.

        The base track keeps it as built AudioFrames in ``frame_buffer`` (and in
        ``_paused_frames`` while paused). Overridden by mixing tracks, which keep
        raw bytes in ``audio_data_buffer`` instead.
        """
        pending = bytearray()
        for f in (*self.frame_buffer, *self._paused_frames):
            try:
                pending += self._frame_to_pcm_bytes(f)
            except Exception as e:
                logger.warning(f"Skipping frame during fade extraction: {e}")
        return bytes(pending)

    def _load_faded_audio(self, faded_bytes: bytes) -> None:
        """Re-chunk faded PCM into AudioFrames and queue them in frame_buffer.

        The final partial chunk is zero-padded to chunk_size (it is already near
        silence at the tail of the fade, so the padding is inaudible).
        """
        buf = bytearray(faded_bytes)
        rem = len(buf) % self.chunk_size
        if rem:
            buf += bytes(self.chunk_size - rem)
        for i in range(0, len(buf), self.chunk_size):
            try:
                self.frame_buffer.append(
                    self.buildAudioFrames(bytes(buf[i : i + self.chunk_size]))
                )
            except Exception as e:
                logger.error(f"Error building faded audio frame: {e}")

    def interrupt(self):
        """Interrupt playback.

        When ``interrupt_fade_duration > 0`` the agent's buffered TTS audio is
        not cut instantly — a short exponential fade-out tail is kept so the
        voice ducks gracefully to silence. Otherwise all buffers are cleared
        immediately (legacy behavior).
        """

        if self._faded_tail_pending:
            logger.debug("Audio track interrupt re-entered — faded tail already pending.")
            self._is_paused = False
            self._last_speaking_time = 0.0
            self._synthesis_complete = False
            self._needs_last_audio_callback = False
            self._accepting_audio = not self._manual_audio_control
            return

        pending = b""
        if self.interrupt_fade_duration and self.interrupt_fade_duration > 0:
            pending = self._collect_pending_pcm()

        self.frame_buffer.clear()
        self.audio_data_buffer.clear()
        self._paused_frames.clear()

        if pending:
            fade_frames = max(1, round(self.interrupt_fade_duration / AUDIO_PTIME))
            fade_bytes_len = fade_frames * self.chunk_size
            faded_tail = self._apply_fade_out(bytes(pending[:fade_bytes_len]))
            if faded_tail:
                self._load_faded_audio(faded_tail)
                self._faded_tail_pending = True
                logger.info(
                    f"Audio track interrupted — fading out {len(faded_tail) // self.chunk_size} frame(s)."
                )
        else:
            logger.info("Audio track interrupted, clearing buffers.")

        self._is_paused = False
        self._last_speaking_time = 0.0
        self._synthesis_complete = False
        self._needs_last_audio_callback = False
        
        # Handle manual audio control mode
        if self._manual_audio_control:
            self._accepting_audio = False
        else:
            self._accepting_audio = True

    async def pause(self) -> None:
        """
        Pause audio playback. Instead of blocking recv(), we move remaining
        frames to a separate buffer so they can be resumed later.
        This approach keeps the audio flow simple for avatars.
        """
        if self._is_paused:
            logger.warning("Audio track already paused")
            return
            
        logger.info("Audio track paused - preserving current buffer state.")
        self._is_paused = True
        
        # Move current frames to paused buffer for later resume
        self._paused_frames = self.frame_buffer.copy()
        self.frame_buffer.clear()

    async def resume(self) -> None:
        """
        Resume audio playback from paused position.
        Restores frames that were saved when paused.
        """
        if not self._is_paused:
            logger.warning("Audio track not paused, nothing to resume")
            return
            
        logger.info("Audio track resumed - restoring paused buffer.")
        self._is_paused = False
        
        # Restore frames from paused buffer
        self.frame_buffer = self._paused_frames.copy()
        self._paused_frames.clear()

    def enable_audio_input(self, manual_control: bool = False):
        """
        Allow fresh audio data to be buffered. When manual_control is True,
        future interrupts will pause intake until this method is called again.
        
        This is useful for preventing old audio from bleeding into new responses.
        """
        self._manual_audio_control = manual_control
        self._accepting_audio = True
        self._faded_tail_pending = False
        logger.debug(f"Audio input enabled (manual_control={manual_control})")

    def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None:
        """Set callback for when the final audio byte of synthesis is produced"""
        logger.info("on last audio callback")
        self._last_audio_callback = callback
        self._synthesis_complete = False
        self._needs_last_audio_callback = False

    @property
    def is_speaking(self) -> bool:
        """
        True if the track is currently playing audio or has buffered data,
        including a small grace period to bridge gaps in streaming TTS.
        """
        has_data = len(self.frame_buffer) > 0 or len(self.audio_data_buffer) > 0
        if has_data:
            return True
        return (time() - self._last_speaking_time) < self._speaking_grace_period

    def mark_synthesis_complete(self) -> None:
        """
        Mark that TTS synthesis has finished sending all audio data.
        If the buffer is already empty (all audio consumed), fires the
        on_last_audio_byte callback immediately. Otherwise, the callback
        will fire when recv() drains the remaining buffer.
        """
        self._synthesis_complete = True
        # If we're not currently speaking (grace period passed) and buffer is empty, fire immediately
        if not self.is_speaking and self._needs_last_audio_callback:
            self._needs_last_audio_callback = False
            self._synthesis_complete = False
            logger.info("[AudioTrack] Synthesis complete and buffer already empty — triggering last_audio_callback.")
            if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
                asyncio.create_task(self._last_audio_callback())

    async def add_new_bytes(self, audio_data: bytes):
        """
        Add new audio bytes to the buffer. Respects _accepting_audio flag
        for manual audio control mode.
        """
        if not self._accepting_audio:
            logger.debug("Audio input currently disabled, dropping audio data")
            return
        self._cumulative_input_samples += len(audio_data) // (self.channels * self.sample_width)
        self.audio_data_buffer += audio_data

        while len(self.audio_data_buffer) >= self.chunk_size:
            chunk = self.audio_data_buffer[: self.chunk_size]
            self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :]
            try:
                audio_frame = self.buildAudioFrames(chunk)
                
                # If paused, add to paused buffer instead
                if self._is_paused:
                    self._paused_frames.append(audio_frame)
                    logger.debug("Added frame to paused buffer")
                else:
                    self.frame_buffer.append(audio_frame)
                    logger.debug(
                        f"Added audio frame to buffer, total frames: {len(self.frame_buffer)}"
                    )
            except Exception as e:
                logger.error(f"Error building audio frame: {e}")
                break

    def buildAudioFrames(self, chunk: bytes) -> AudioFrame:
        if len(chunk) != self.chunk_size:
            logger.warning(
                f"Incorrect chunk size received {len(chunk)}, expected {self.chunk_size}"
            )

        data = np.frombuffer(chunk, dtype=np.int16)
        expected_samples = self.samples * self.channels
        if len(data) != expected_samples:
            logger.warning(
                f"Incorrect number of samples in chunk {len(data)}, expected {expected_samples}"
            )

        data = data.reshape(-1, self.channels)
        layout = "mono" if self.channels == 1 else "stereo"

        audio_frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout)
        return audio_frame

    def next_timestamp(self):
        pts = int(self.frame_time)
        time_base = self.time_base_fraction
        self.frame_time += self.samples
        return pts, time_base

    async def recv(self) -> AudioFrame:
        """
        Receive next audio frame. When paused, produces silence frames but keeps
        timing synchronized. This ensures smooth resume without audio glitches.
        """
        try:
            if self.readyState != "live":
                raise MediaStreamError

            if self._start is None:
                self._start = time()
                self._timestamp = 0
            else:
                self._timestamp += self.samples

            wait = self._start + (self._timestamp / self.sample_rate) - time()

            if wait > 0:
                await asyncio.sleep(wait)

            pts, time_base = self.next_timestamp()

            # When paused, always produce silence but keep timing
            # This allows smooth resume without timing jumps
            if self._is_paused:
                frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
                for p in frame.planes:
                    p.update(bytes(p.buffer_size))
            elif len(self.frame_buffer) > 0:
                frame = self.frame_buffer.pop(0)
                self._samples_played += self.samples
                self._is_speaking = True
                self._last_speaking_time = time()
            else:
                # No audio data available — silence
                if getattr(self, "_is_speaking", False):
                    # Only declare we've stopped speaking if the grace period has passed
                    # This bridges gaps in streaming TTS (like Sarvam jitter)
                    if (time() - self._last_speaking_time) >= self._speaking_grace_period:
                        self._is_speaking = False

                        if self._synthesis_complete:
                            # TTS finished and buffer drained — fire callback
                            self._synthesis_complete = False
                            self._needs_last_audio_callback = False
                            logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.")
                            if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
                                asyncio.create_task(self._last_audio_callback())
                        else:
                            # Buffer temporarily empty but synthesis still in progress
                            logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.")
                            self._needs_last_audio_callback = True

                # Produce silence frame
                frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
                for p in frame.planes:
                    p.update(bytes(p.buffer_size))

            frame.pts = pts
            frame.time_base = time_base
            frame.sample_rate = self.sample_rate
            return frame
        except MediaStreamError:
            raise
        except Exception as e:
            traceback.print_exc()
            logger.error(f"Error while creating tts->rtc frame: {e}")

    async def cleanup(self):
        self.interrupt()
        self.stop()

Base audio track implementation using a frame buffer. Audio frames are created as soon as audio data is received.

Supports optional pause/resume for false-interrupt detection while maintaining compatibility with avatar plugins that need simple audio flow.

Ancestors

  • videosdk.custom_audio_track.CustomAudioTrack
  • vsaiortc.mediastreams.MediaStreamTrack
  • pyee.asyncio.AsyncIOEventEmitter
  • pyee.base.EventEmitter

Subclasses

Instance variables

prop can_pause : bool
Expand source code
@property
def can_pause(self) -> bool:
    """Returns True if this track supports pause/resume operations"""
    return True

Returns True if this track supports pause/resume operations

prop is_speaking : bool
Expand source code
@property
def is_speaking(self) -> bool:
    """
    True if the track is currently playing audio or has buffered data,
    including a small grace period to bridge gaps in streaming TTS.
    """
    has_data = len(self.frame_buffer) > 0 or len(self.audio_data_buffer) > 0
    if has_data:
        return True
    return (time() - self._last_speaking_time) < self._speaking_grace_period

True if the track is currently playing audio or has buffered data, including a small grace period to bridge gaps in streaming TTS.

Methods

async def add_new_bytes(self, audio_data: bytes)
Expand source code
async def add_new_bytes(self, audio_data: bytes):
    """
    Add new audio bytes to the buffer. Respects _accepting_audio flag
    for manual audio control mode.
    """
    if not self._accepting_audio:
        logger.debug("Audio input currently disabled, dropping audio data")
        return
    self._cumulative_input_samples += len(audio_data) // (self.channels * self.sample_width)
    self.audio_data_buffer += audio_data

    while len(self.audio_data_buffer) >= self.chunk_size:
        chunk = self.audio_data_buffer[: self.chunk_size]
        self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :]
        try:
            audio_frame = self.buildAudioFrames(chunk)
            
            # If paused, add to paused buffer instead
            if self._is_paused:
                self._paused_frames.append(audio_frame)
                logger.debug("Added frame to paused buffer")
            else:
                self.frame_buffer.append(audio_frame)
                logger.debug(
                    f"Added audio frame to buffer, total frames: {len(self.frame_buffer)}"
                )
        except Exception as e:
            logger.error(f"Error building audio frame: {e}")
            break

Add new audio bytes to the buffer. Respects _accepting_audio flag for manual audio control mode.

def buildAudioFrames(self, chunk: bytes) ‑> av.audio.frame.AudioFrame
Expand source code
def buildAudioFrames(self, chunk: bytes) -> AudioFrame:
    if len(chunk) != self.chunk_size:
        logger.warning(
            f"Incorrect chunk size received {len(chunk)}, expected {self.chunk_size}"
        )

    data = np.frombuffer(chunk, dtype=np.int16)
    expected_samples = self.samples * self.channels
    if len(data) != expected_samples:
        logger.warning(
            f"Incorrect number of samples in chunk {len(data)}, expected {expected_samples}"
        )

    data = data.reshape(-1, self.channels)
    layout = "mono" if self.channels == 1 else "stereo"

    audio_frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout)
    return audio_frame
async def cleanup(self)
Expand source code
async def cleanup(self):
    self.interrupt()
    self.stop()
def enable_audio_input(self, manual_control: bool = False)
Expand source code
def enable_audio_input(self, manual_control: bool = False):
    """
    Allow fresh audio data to be buffered. When manual_control is True,
    future interrupts will pause intake until this method is called again.
    
    This is useful for preventing old audio from bleeding into new responses.
    """
    self._manual_audio_control = manual_control
    self._accepting_audio = True
    self._faded_tail_pending = False
    logger.debug(f"Audio input enabled (manual_control={manual_control})")

Allow fresh audio data to be buffered. When manual_control is True, future interrupts will pause intake until this method is called again.

This is useful for preventing old audio from bleeding into new responses.

def interrupt(self)
Expand source code
def interrupt(self):
    """Interrupt playback.

    When ``interrupt_fade_duration > 0`` the agent's buffered TTS audio is
    not cut instantly — a short exponential fade-out tail is kept so the
    voice ducks gracefully to silence. Otherwise all buffers are cleared
    immediately (legacy behavior).
    """

    if self._faded_tail_pending:
        logger.debug("Audio track interrupt re-entered — faded tail already pending.")
        self._is_paused = False
        self._last_speaking_time = 0.0
        self._synthesis_complete = False
        self._needs_last_audio_callback = False
        self._accepting_audio = not self._manual_audio_control
        return

    pending = b""
    if self.interrupt_fade_duration and self.interrupt_fade_duration > 0:
        pending = self._collect_pending_pcm()

    self.frame_buffer.clear()
    self.audio_data_buffer.clear()
    self._paused_frames.clear()

    if pending:
        fade_frames = max(1, round(self.interrupt_fade_duration / AUDIO_PTIME))
        fade_bytes_len = fade_frames * self.chunk_size
        faded_tail = self._apply_fade_out(bytes(pending[:fade_bytes_len]))
        if faded_tail:
            self._load_faded_audio(faded_tail)
            self._faded_tail_pending = True
            logger.info(
                f"Audio track interrupted — fading out {len(faded_tail) // self.chunk_size} frame(s)."
            )
    else:
        logger.info("Audio track interrupted, clearing buffers.")

    self._is_paused = False
    self._last_speaking_time = 0.0
    self._synthesis_complete = False
    self._needs_last_audio_callback = False
    
    # Handle manual audio control mode
    if self._manual_audio_control:
        self._accepting_audio = False
    else:
        self._accepting_audio = True

Interrupt playback.

When interrupt_fade_duration > 0 the agent's buffered TTS audio is not cut instantly — a short exponential fade-out tail is kept so the voice ducks gracefully to silence. Otherwise all buffers are cleared immediately (legacy behavior).

def mark_synthesis_complete(self) ‑> None
Expand source code
def mark_synthesis_complete(self) -> None:
    """
    Mark that TTS synthesis has finished sending all audio data.
    If the buffer is already empty (all audio consumed), fires the
    on_last_audio_byte callback immediately. Otherwise, the callback
    will fire when recv() drains the remaining buffer.
    """
    self._synthesis_complete = True
    # If we're not currently speaking (grace period passed) and buffer is empty, fire immediately
    if not self.is_speaking and self._needs_last_audio_callback:
        self._needs_last_audio_callback = False
        self._synthesis_complete = False
        logger.info("[AudioTrack] Synthesis complete and buffer already empty — triggering last_audio_callback.")
        if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
            asyncio.create_task(self._last_audio_callback())

Mark that TTS synthesis has finished sending all audio data. If the buffer is already empty (all audio consumed), fires the on_last_audio_byte callback immediately. Otherwise, the callback will fire when recv() drains the remaining buffer.

def mark_synthesis_start(self) ‑> None
Expand source code
def mark_synthesis_start(self) -> None:
    """Anchor the per-synthesis baseline for played/pushed sample counters.
    Called by SpeechGeneration at the top of each synthesize() so that
    snapshot_playback() returns deltas relative to this turn only.
    """
    self._synthesis_start_played = self._samples_played
    self._synthesis_start_pushed = self._cumulative_input_samples

Anchor the per-synthesis baseline for played/pushed sample counters. Called by SpeechGeneration at the top of each synthesize() so that snapshot_playback() returns deltas relative to this turn only.

def next_timestamp(self)
Expand source code
def next_timestamp(self):
    pts = int(self.frame_time)
    time_base = self.time_base_fraction
    self.frame_time += self.samples
    return pts, time_base
def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) ‑> None
Expand source code
def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None:
    """Set callback for when the final audio byte of synthesis is produced"""
    logger.info("on last audio callback")
    self._last_audio_callback = callback
    self._synthesis_complete = False
    self._needs_last_audio_callback = False

Set callback for when the final audio byte of synthesis is produced

async def pause(self) ‑> None
Expand source code
async def pause(self) -> None:
    """
    Pause audio playback. Instead of blocking recv(), we move remaining
    frames to a separate buffer so they can be resumed later.
    This approach keeps the audio flow simple for avatars.
    """
    if self._is_paused:
        logger.warning("Audio track already paused")
        return
        
    logger.info("Audio track paused - preserving current buffer state.")
    self._is_paused = True
    
    # Move current frames to paused buffer for later resume
    self._paused_frames = self.frame_buffer.copy()
    self.frame_buffer.clear()

Pause audio playback. Instead of blocking recv(), we move remaining frames to a separate buffer so they can be resumed later. This approach keeps the audio flow simple for avatars.

async def recv(self) ‑> av.audio.frame.AudioFrame
Expand source code
async def recv(self) -> AudioFrame:
    """
    Receive next audio frame. When paused, produces silence frames but keeps
    timing synchronized. This ensures smooth resume without audio glitches.
    """
    try:
        if self.readyState != "live":
            raise MediaStreamError

        if self._start is None:
            self._start = time()
            self._timestamp = 0
        else:
            self._timestamp += self.samples

        wait = self._start + (self._timestamp / self.sample_rate) - time()

        if wait > 0:
            await asyncio.sleep(wait)

        pts, time_base = self.next_timestamp()

        # When paused, always produce silence but keep timing
        # This allows smooth resume without timing jumps
        if self._is_paused:
            frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
            for p in frame.planes:
                p.update(bytes(p.buffer_size))
        elif len(self.frame_buffer) > 0:
            frame = self.frame_buffer.pop(0)
            self._samples_played += self.samples
            self._is_speaking = True
            self._last_speaking_time = time()
        else:
            # No audio data available — silence
            if getattr(self, "_is_speaking", False):
                # Only declare we've stopped speaking if the grace period has passed
                # This bridges gaps in streaming TTS (like Sarvam jitter)
                if (time() - self._last_speaking_time) >= self._speaking_grace_period:
                    self._is_speaking = False

                    if self._synthesis_complete:
                        # TTS finished and buffer drained — fire callback
                        self._synthesis_complete = False
                        self._needs_last_audio_callback = False
                        logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.")
                        if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
                            asyncio.create_task(self._last_audio_callback())
                    else:
                        # Buffer temporarily empty but synthesis still in progress
                        logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.")
                        self._needs_last_audio_callback = True

            # Produce silence frame
            frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
            for p in frame.planes:
                p.update(bytes(p.buffer_size))

        frame.pts = pts
        frame.time_base = time_base
        frame.sample_rate = self.sample_rate
        return frame
    except MediaStreamError:
        raise
    except Exception as e:
        traceback.print_exc()
        logger.error(f"Error while creating tts->rtc frame: {e}")

Receive next audio frame. When paused, produces silence frames but keeps timing synchronized. This ensures smooth resume without audio glitches.

async def resume(self) ‑> None
Expand source code
async def resume(self) -> None:
    """
    Resume audio playback from paused position.
    Restores frames that were saved when paused.
    """
    if not self._is_paused:
        logger.warning("Audio track not paused, nothing to resume")
        return
        
    logger.info("Audio track resumed - restoring paused buffer.")
    self._is_paused = False
    
    # Restore frames from paused buffer
    self.frame_buffer = self._paused_frames.copy()
    self._paused_frames.clear()

Resume audio playback from paused position. Restores frames that were saved when paused.

def snapshot_playback(self) ‑> tuple[int, int]
Expand source code
def snapshot_playback(self) -> tuple[int, int]:
    """Return (samples_played, samples_pushed) deltas for the active synthesis."""
    played = max(0, self._samples_played - self._synthesis_start_played)
    pushed = max(0, self._cumulative_input_samples - self._synthesis_start_pushed)
    return played, pushed

Return (samples_played, samples_pushed) deltas for the active synthesis.

class MediaStreamError (*args, **kwargs)
Expand source code
class MediaStreamError(Exception):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class MixingCustomAudioStreamTrack (loop)
Expand source code
class MixingCustomAudioStreamTrack(CustomAudioStreamTrack):
    """Audio track that mixes primary TTS audio with a background audio buffer, creating frames just-in-time during recv."""
    def __init__(self, loop):
        super().__init__(loop)
        self.background_audio_buffer = bytearray()

    def interrupt(self):
        """Interrupt playback.

        The base implementation applies the exponential fade-out — here the
        overridden ``_collect_pending_pcm`` / ``_load_faded_audio`` make it act
        on ``audio_data_buffer`` (where this track keeps queued TTS audio).
        ``background_audio_buffer`` is cleared outright — background audio is
        never faded.
        """
        super().interrupt()
        self.background_audio_buffer.clear()

    def _collect_pending_pcm(self) -> bytes:
        """Mixing tracks build frames just-in-time, so queued TTS audio lives in
        ``audio_data_buffer`` as raw bytes. ``background_audio_buffer`` is
        deliberately excluded — background audio is never faded.
        """
        return bytes(self.audio_data_buffer)

    def _load_faded_audio(self, faded_bytes: bytes) -> None:
        """Mixing tracks play from ``audio_data_buffer``; queue the faded tail
        there. Padded to a chunk_size multiple so recv() fully drains it — a
        sub-chunk remainder would never be consumed and would leave
        ``is_speaking`` stuck True.
        """
        buf = bytearray(faded_bytes)
        rem = len(buf) % self.chunk_size
        if rem:
            buf += bytes(self.chunk_size - rem)
        self.audio_data_buffer = buf

    async def add_new_bytes(self, audio_data: bytes):
        """Overrides base method to buffer bytes instead of creating frames."""
        self._cumulative_input_samples += len(audio_data) // (self.channels * self.sample_width)
        self.audio_data_buffer += audio_data

    async def add_background_bytes(self, audio_data: bytes):
        self.background_audio_buffer += audio_data

    def mix_audio(self, primary_chunk, background_chunk):
        if not background_chunk:
            return primary_chunk

        primary_arr = np.frombuffer(primary_chunk, dtype=np.int16)
        background_arr = np.frombuffer(background_chunk, dtype=np.int16)

        if len(background_arr) < len(primary_arr):
            background_arr = np.pad(background_arr, (0, len(primary_arr) - len(background_arr)), 'constant')
        elif len(background_arr) > len(primary_arr):
            background_arr = background_arr[:len(primary_arr)]

        # Sum in int32 then clip to int16 range to avoid wrap-around distortion on loud peaks.
        mixed_arr = np.clip(
            primary_arr.astype(np.int32) + background_arr.astype(np.int32),
            np.iinfo(np.int16).min,
            np.iinfo(np.int16).max,
        ).astype(np.int16)
        return mixed_arr.tobytes()

    def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None:
        """Set callback for when the final audio byte of synthesis is produced"""
        logger.info("on last audio callback")
        self._last_audio_callback = callback
        self._synthesis_complete = False
        self._needs_last_audio_callback = False

    async def recv(self) -> AudioFrame:
        """
        Overrides base method to perform mixing and just-in-time frame creation.
        """
        try:
            if self.readyState != "live":
                raise MediaStreamError

            if self._start is None:
                self._start = time()
                self._timestamp = 0
            else:
                self._timestamp += self.samples

            wait = self._start + (self._timestamp / self.sample_rate) - time()
            if wait > 0:
                await asyncio.sleep(wait)

            pts, time_base = self.next_timestamp()

            primary_chunk = b''
            has_primary = len(self.audio_data_buffer) >= self.chunk_size
            if has_primary:
                primary_chunk = self.audio_data_buffer[: self.chunk_size]
                self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :]
                self._samples_played += self.samples
                self._is_speaking = True
                self._last_speaking_time = time()
            elif getattr(self, "_is_speaking", False):
                # Apply grace period for mixing track as well
                if (time() - self._last_speaking_time) >= self._speaking_grace_period:
                    self._is_speaking = False

                    if self._synthesis_complete:
                        # TTS finished and buffer drained — fire callback
                        self._synthesis_complete = False
                        self._needs_last_audio_callback = False
                        logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.")
                        if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
                            asyncio.create_task(self._last_audio_callback())
                    else:
                        # Buffer temporarily empty but synthesis still in progress
                        logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.")
                        self._needs_last_audio_callback = True

            background_chunk = b''
            has_background = len(self.background_audio_buffer) >= self.chunk_size
            if has_background:
                background_chunk = self.background_audio_buffer[: self.chunk_size]
                self.background_audio_buffer = self.background_audio_buffer[self.chunk_size :]
            
            final_chunk = None
            if has_primary:
                final_chunk = self.mix_audio(primary_chunk, background_chunk)
            elif has_background:
                final_chunk = background_chunk
            
            if final_chunk:
                frame = self.buildAudioFrames(final_chunk)
            else:
                frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
                for p in frame.planes:
                    p.update(bytes(p.buffer_size))

            frame.pts = pts
            frame.time_base = time_base
            frame.sample_rate = self.sample_rate
            return frame
        except MediaStreamError:
            raise
        except Exception as e:
            traceback.print_exc()
            logger.error(f"Error while creating tts->rtc frame: {e}")

Audio track that mixes primary TTS audio with a background audio buffer, creating frames just-in-time during recv.

Ancestors

  • CustomAudioStreamTrack
  • videosdk.custom_audio_track.CustomAudioTrack
  • vsaiortc.mediastreams.MediaStreamTrack
  • pyee.asyncio.AsyncIOEventEmitter
  • pyee.base.EventEmitter

Subclasses

Methods

async def add_background_bytes(self, audio_data: bytes)
Expand source code
async def add_background_bytes(self, audio_data: bytes):
    self.background_audio_buffer += audio_data
async def add_new_bytes(self, audio_data: bytes)
Expand source code
async def add_new_bytes(self, audio_data: bytes):
    """Overrides base method to buffer bytes instead of creating frames."""
    self._cumulative_input_samples += len(audio_data) // (self.channels * self.sample_width)
    self.audio_data_buffer += audio_data

Overrides base method to buffer bytes instead of creating frames.

def interrupt(self)
Expand source code
def interrupt(self):
    """Interrupt playback.

    The base implementation applies the exponential fade-out — here the
    overridden ``_collect_pending_pcm`` / ``_load_faded_audio`` make it act
    on ``audio_data_buffer`` (where this track keeps queued TTS audio).
    ``background_audio_buffer`` is cleared outright — background audio is
    never faded.
    """
    super().interrupt()
    self.background_audio_buffer.clear()

Interrupt playback.

The base implementation applies the exponential fade-out — here the overridden _collect_pending_pcm / _load_faded_audio make it act on audio_data_buffer (where this track keeps queued TTS audio). background_audio_buffer is cleared outright — background audio is never faded.

def mix_audio(self, primary_chunk, background_chunk)
Expand source code
def mix_audio(self, primary_chunk, background_chunk):
    if not background_chunk:
        return primary_chunk

    primary_arr = np.frombuffer(primary_chunk, dtype=np.int16)
    background_arr = np.frombuffer(background_chunk, dtype=np.int16)

    if len(background_arr) < len(primary_arr):
        background_arr = np.pad(background_arr, (0, len(primary_arr) - len(background_arr)), 'constant')
    elif len(background_arr) > len(primary_arr):
        background_arr = background_arr[:len(primary_arr)]

    # Sum in int32 then clip to int16 range to avoid wrap-around distortion on loud peaks.
    mixed_arr = np.clip(
        primary_arr.astype(np.int32) + background_arr.astype(np.int32),
        np.iinfo(np.int16).min,
        np.iinfo(np.int16).max,
    ).astype(np.int16)
    return mixed_arr.tobytes()
async def recv(self) ‑> av.audio.frame.AudioFrame
Expand source code
async def recv(self) -> AudioFrame:
    """
    Overrides base method to perform mixing and just-in-time frame creation.
    """
    try:
        if self.readyState != "live":
            raise MediaStreamError

        if self._start is None:
            self._start = time()
            self._timestamp = 0
        else:
            self._timestamp += self.samples

        wait = self._start + (self._timestamp / self.sample_rate) - time()
        if wait > 0:
            await asyncio.sleep(wait)

        pts, time_base = self.next_timestamp()

        primary_chunk = b''
        has_primary = len(self.audio_data_buffer) >= self.chunk_size
        if has_primary:
            primary_chunk = self.audio_data_buffer[: self.chunk_size]
            self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :]
            self._samples_played += self.samples
            self._is_speaking = True
            self._last_speaking_time = time()
        elif getattr(self, "_is_speaking", False):
            # Apply grace period for mixing track as well
            if (time() - self._last_speaking_time) >= self._speaking_grace_period:
                self._is_speaking = False

                if self._synthesis_complete:
                    # TTS finished and buffer drained — fire callback
                    self._synthesis_complete = False
                    self._needs_last_audio_callback = False
                    logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.")
                    if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
                        asyncio.create_task(self._last_audio_callback())
                else:
                    # Buffer temporarily empty but synthesis still in progress
                    logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.")
                    self._needs_last_audio_callback = True

        background_chunk = b''
        has_background = len(self.background_audio_buffer) >= self.chunk_size
        if has_background:
            background_chunk = self.background_audio_buffer[: self.chunk_size]
            self.background_audio_buffer = self.background_audio_buffer[self.chunk_size :]
        
        final_chunk = None
        if has_primary:
            final_chunk = self.mix_audio(primary_chunk, background_chunk)
        elif has_background:
            final_chunk = background_chunk
        
        if final_chunk:
            frame = self.buildAudioFrames(final_chunk)
        else:
            frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
            for p in frame.planes:
                p.update(bytes(p.buffer_size))

        frame.pts = pts
        frame.time_base = time_base
        frame.sample_rate = self.sample_rate
        return frame
    except MediaStreamError:
        raise
    except Exception as e:
        traceback.print_exc()
        logger.error(f"Error while creating tts->rtc frame: {e}")

Overrides base method to perform mixing and just-in-time frame creation.

Inherited members

class TeeCustomAudioStreamTrack (loop, sinks=None, pipeline=None)
Expand source code
class TeeCustomAudioStreamTrack(CustomAudioStreamTrack):
    """Audio track that duplicates outgoing audio bytes to registered sinks such as avatar plugins or local speakers."""

    def __init__(self, loop, sinks=None, pipeline=None):
        super().__init__(loop)
        self.sinks = sinks if sinks is not None else []
        self.pipeline = pipeline

    def add_sink(self, sink):
        """Add a new sink (callback or object)"""
        if sink not in self.sinks:
            self.sinks.append(sink)

    def remove_sink(self, sink):
        if sink in self.sinks:
            self.sinks.remove(sink)

    async def add_new_bytes(self, audio_data: bytes):
        await super().add_new_bytes(audio_data)

        # Route audio to sinks (avatars, etc.)
        for sink in self.sinks:
            try:
                if hasattr(sink, "handle_audio_input"):
                    await sink.handle_audio_input(audio_data)
                elif callable(sink):
                    if asyncio.iscoroutinefunction(sink):
                        await sink(audio_data)
                    else:
                        sink(audio_data)
            except Exception as e:
                import logging as _logging
                _logging.getLogger(__name__).warning("Avatar sink error (audio will continue): %s", e)

    async def recv(self) -> AudioFrame:
        """
        When avatar sinks are present the Avatar Server publishes audio to the meeting,
        so we must NOT publish the raw TTS audio directly (it would cause double audio).
        We still call super().recv() so that all internal state tracking and callbacks
        (is_speaking, on_first_audio_byte, on_last_audio_byte) continue to work correctly.
        The actual audio payload is replaced with silence before handing back to WebRTC.
        """
        frame = await super().recv()
        if self.sinks and frame is not None:
            silence = AudioFrame(format="s16", layout="mono", samples=self.samples)
            for p in silence.planes:
                p.update(bytes(p.buffer_size))
            silence.pts = frame.pts
            silence.time_base = frame.time_base
            silence.sample_rate = frame.sample_rate
            return silence
        return frame

Audio track that duplicates outgoing audio bytes to registered sinks such as avatar plugins or local speakers.

Ancestors

  • CustomAudioStreamTrack
  • videosdk.custom_audio_track.CustomAudioTrack
  • vsaiortc.mediastreams.MediaStreamTrack
  • pyee.asyncio.AsyncIOEventEmitter
  • pyee.base.EventEmitter

Subclasses

Methods

def add_sink(self, sink)
Expand source code
def add_sink(self, sink):
    """Add a new sink (callback or object)"""
    if sink not in self.sinks:
        self.sinks.append(sink)

Add a new sink (callback or object)

async def recv(self) ‑> av.audio.frame.AudioFrame
Expand source code
async def recv(self) -> AudioFrame:
    """
    When avatar sinks are present the Avatar Server publishes audio to the meeting,
    so we must NOT publish the raw TTS audio directly (it would cause double audio).
    We still call super().recv() so that all internal state tracking and callbacks
    (is_speaking, on_first_audio_byte, on_last_audio_byte) continue to work correctly.
    The actual audio payload is replaced with silence before handing back to WebRTC.
    """
    frame = await super().recv()
    if self.sinks and frame is not None:
        silence = AudioFrame(format="s16", layout="mono", samples=self.samples)
        for p in silence.planes:
            p.update(bytes(p.buffer_size))
        silence.pts = frame.pts
        silence.time_base = frame.time_base
        silence.sample_rate = frame.sample_rate
        return silence
    return frame

When avatar sinks are present the Avatar Server publishes audio to the meeting, so we must NOT publish the raw TTS audio directly (it would cause double audio). We still call super().recv() so that all internal state tracking and callbacks (is_speaking, on_first_audio_byte, on_last_audio_byte) continue to work correctly. The actual audio payload is replaced with silence before handing back to WebRTC.

def remove_sink(self, sink)
Expand source code
def remove_sink(self, sink):
    if sink in self.sinks:
        self.sinks.remove(sink)

Inherited members

class TeeMixingCustomAudioStreamTrack (loop, sinks=None, pipeline=None)
Expand source code
class TeeMixingCustomAudioStreamTrack(MixingCustomAudioStreamTrack):
    """Combines mixing and tee functionality, mixing background audio while also forwarding audio bytes to registered sinks."""

    def __init__(self, loop, sinks=None, pipeline=None):
        super().__init__(loop)
        self.sinks = sinks if sinks is not None else []
        self.pipeline = pipeline

    async def add_new_bytes(self, audio_data: bytes):
        await super().add_new_bytes(audio_data)

        # Route audio to sinks (avatars, etc.)
        for sink in self.sinks:
            try:
                if hasattr(sink, "handle_audio_input"):
                    await sink.handle_audio_input(audio_data)
                elif callable(sink):
                    if asyncio.iscoroutinefunction(sink):
                        await sink(audio_data)
                    else:
                        sink(audio_data)
            except Exception as e:
                import logging as _logging
                _logging.getLogger(__name__).warning("Avatar sink error (audio will continue): %s", e)

    async def recv(self) -> AudioFrame:
        """Silence the direct WebRTC output when avatar sinks are handling playback."""
        frame = await super().recv()
        if self.sinks and frame is not None:
            silence = AudioFrame(format="s16", layout="mono", samples=self.samples)
            for p in silence.planes:
                p.update(bytes(p.buffer_size))
            silence.pts = frame.pts
            silence.time_base = frame.time_base
            silence.sample_rate = frame.sample_rate
            return silence
        return frame

Combines mixing and tee functionality, mixing background audio while also forwarding audio bytes to registered sinks.

Ancestors

Methods

async def recv(self) ‑> av.audio.frame.AudioFrame
Expand source code
async def recv(self) -> AudioFrame:
    """Silence the direct WebRTC output when avatar sinks are handling playback."""
    frame = await super().recv()
    if self.sinks and frame is not None:
        silence = AudioFrame(format="s16", layout="mono", samples=self.samples)
        for p in silence.planes:
            p.update(bytes(p.buffer_size))
        silence.pts = frame.pts
        silence.time_base = frame.time_base
        silence.sample_rate = frame.sample_rate
        return silence
    return frame

Silence the direct WebRTC output when avatar sinks are handling playback.

Inherited members