Module agents.room.output_stream

Classes

class CustomAudioStreamTrack (loop)
Expand source code
class CustomAudioStreamTrack(CustomAudioTrack):
    """
    Base audio track implementation using a frame buffer.
    Audio frames are created as soon as audio data is received.
    
    Supports optional pause/resume for false-interrupt detection while maintaining
    compatibility with avatar plugins that need simple audio flow.
    """
    def __init__(self, loop):
        super().__init__()
        self.loop = loop
        self._start = None
        self._timestamp = 0
        self.frame_buffer = []
        self.audio_data_buffer = bytearray()
        self.frame_time = 0
        self.sample_rate = 24000
        self.channels = 1
        self.sample_width = 2
        self.time_base_fraction = Fraction(1, self.sample_rate)
        self.samples = int(AUDIO_PTIME * self.sample_rate)
        self.chunk_size = int(self.samples * self.channels * self.sample_width)
        self._synthesis_complete = False
        self._needs_last_audio_callback = False
        self._last_speaking_time = 0.0
        self._speaking_grace_period = 0.5 # 500ms grace period for jitter

        # Pause/resume support - simple flag-based (no blocking)
        self._is_paused = False
        self._paused_frames = []  # Separate buffer for paused content
        self._accepting_audio = True
        self._manual_audio_control = False

    @property
    def can_pause(self) -> bool:
        """Returns True if this track supports pause/resume operations"""
        return True

    def interrupt(self):
        """Clear all buffers and reset state"""
        logger.info("Audio track interrupted, clearing buffers.")
        self.frame_buffer.clear()
        self.audio_data_buffer.clear()
        self._paused_frames.clear()
        self._is_paused = False
        self._last_speaking_time = 0.0
        self._synthesis_complete = False
        self._needs_last_audio_callback = False
        
        # Handle manual audio control mode
        if self._manual_audio_control:
            self._accepting_audio = False
        else:
            self._accepting_audio = True

    async def pause(self) -> None:
        """
        Pause audio playback. Instead of blocking recv(), we move remaining
        frames to a separate buffer so they can be resumed later.
        This approach keeps the audio flow simple for avatars.
        """
        if self._is_paused:
            logger.warning("Audio track already paused")
            return
            
        logger.info("Audio track paused - preserving current buffer state.")
        self._is_paused = True
        
        # Move current frames to paused buffer for later resume
        self._paused_frames = self.frame_buffer.copy()
        self.frame_buffer.clear()

    async def resume(self) -> None:
        """
        Resume audio playback from paused position.
        Restores frames that were saved when paused.
        """
        if not self._is_paused:
            logger.warning("Audio track not paused, nothing to resume")
            return
            
        logger.info("Audio track resumed - restoring paused buffer.")
        self._is_paused = False
        
        # Restore frames from paused buffer
        self.frame_buffer = self._paused_frames.copy()
        self._paused_frames.clear()

    def enable_audio_input(self, manual_control: bool = False):
        """
        Allow fresh audio data to be buffered. When manual_control is True,
        future interrupts will pause intake until this method is called again.
        
        This is useful for preventing old audio from bleeding into new responses.
        """
        self._manual_audio_control = manual_control
        self._accepting_audio = True
        logger.debug(f"Audio input enabled (manual_control={manual_control})")

    def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None:
        """Set callback for when the final audio byte of synthesis is produced"""
        logger.info("on last audio callback")
        self._last_audio_callback = callback
        self._synthesis_complete = False
        self._needs_last_audio_callback = False

    @property
    def is_speaking(self) -> bool:
        """
        True if the track is currently playing audio or has buffered data,
        including a small grace period to bridge gaps in streaming TTS.
        """
        has_data = len(self.frame_buffer) > 0 or len(self.audio_data_buffer) > 0
        if has_data:
            return True
        return (time() - self._last_speaking_time) < self._speaking_grace_period

    def mark_synthesis_complete(self) -> None:
        """
        Mark that TTS synthesis has finished sending all audio data.
        If the buffer is already empty (all audio consumed), fires the
        on_last_audio_byte callback immediately. Otherwise, the callback
        will fire when recv() drains the remaining buffer.
        """
        self._synthesis_complete = True
        # If we're not currently speaking (grace period passed) and buffer is empty, fire immediately
        if not self.is_speaking and self._needs_last_audio_callback:
            self._needs_last_audio_callback = False
            self._synthesis_complete = False
            logger.info("[AudioTrack] Synthesis complete and buffer already empty — triggering last_audio_callback.")
            if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
                asyncio.create_task(self._last_audio_callback())

    async def add_new_bytes(self, audio_data: bytes):
        """
        Add new audio bytes to the buffer. Respects _accepting_audio flag
        for manual audio control mode.
        """
        if not self._accepting_audio:
            logger.debug("Audio input currently disabled, dropping audio data")
            return
        
        self.audio_data_buffer += audio_data

        while len(self.audio_data_buffer) >= self.chunk_size:
            chunk = self.audio_data_buffer[: self.chunk_size]
            self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :]
            try:
                audio_frame = self.buildAudioFrames(chunk)
                
                # If paused, add to paused buffer instead
                if self._is_paused:
                    self._paused_frames.append(audio_frame)
                    logger.debug("Added frame to paused buffer")
                else:
                    self.frame_buffer.append(audio_frame)
                    logger.debug(
                        f"Added audio frame to buffer, total frames: {len(self.frame_buffer)}"
                    )
            except Exception as e:
                logger.error(f"Error building audio frame: {e}")
                break

    def buildAudioFrames(self, chunk: bytes) -> AudioFrame:
        if len(chunk) != self.chunk_size:
            logger.warning(
                f"Incorrect chunk size received {len(chunk)}, expected {self.chunk_size}"
            )

        data = np.frombuffer(chunk, dtype=np.int16)
        expected_samples = self.samples * self.channels
        if len(data) != expected_samples:
            logger.warning(
                f"Incorrect number of samples in chunk {len(data)}, expected {expected_samples}"
            )

        data = data.reshape(-1, self.channels)
        layout = "mono" if self.channels == 1 else "stereo"

        audio_frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout)
        return audio_frame

    def next_timestamp(self):
        pts = int(self.frame_time)
        time_base = self.time_base_fraction
        self.frame_time += self.samples
        return pts, time_base

    async def recv(self) -> AudioFrame:
        """
        Receive next audio frame. When paused, produces silence frames but keeps
        timing synchronized. This ensures smooth resume without audio glitches.
        """
        try:
            if self.readyState != "live":
                raise MediaStreamError

            if self._start is None:
                self._start = time()
                self._timestamp = 0
            else:
                self._timestamp += self.samples

            wait = self._start + (self._timestamp / self.sample_rate) - time()

            if wait > 0:
                await asyncio.sleep(wait)

            pts, time_base = self.next_timestamp()

            # When paused, always produce silence but keep timing
            # This allows smooth resume without timing jumps
            if self._is_paused:
                frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
                for p in frame.planes:
                    p.update(bytes(p.buffer_size))
            elif len(self.frame_buffer) > 0:
                frame = self.frame_buffer.pop(0)
                self._is_speaking = True
                self._last_speaking_time = time()
            else:
                # No audio data available — silence
                if getattr(self, "_is_speaking", False):
                    # Only declare we've stopped speaking if the grace period has passed
                    # This bridges gaps in streaming TTS (like Sarvam jitter)
                    if (time() - self._last_speaking_time) >= self._speaking_grace_period:
                        self._is_speaking = False

                        if self._synthesis_complete:
                            # TTS finished and buffer drained — fire callback
                            self._synthesis_complete = False
                            self._needs_last_audio_callback = False
                            logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.")
                            if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
                                asyncio.create_task(self._last_audio_callback())
                        else:
                            # Buffer temporarily empty but synthesis still in progress
                            logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.")
                            self._needs_last_audio_callback = True

                # Produce silence frame
                frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
                for p in frame.planes:
                    p.update(bytes(p.buffer_size))

            frame.pts = pts
            frame.time_base = time_base
            frame.sample_rate = self.sample_rate
            return frame
        except MediaStreamError:
            raise
        except Exception as e:
            traceback.print_exc()
            logger.error(f"Error while creating tts->rtc frame: {e}")

    async def cleanup(self):
        self.interrupt()
        self.stop()

Base audio track implementation using a frame buffer. Audio frames are created as soon as audio data is received.

Supports optional pause/resume for false-interrupt detection while maintaining compatibility with avatar plugins that need simple audio flow.

Ancestors

  • videosdk.custom_audio_track.CustomAudioTrack
  • vsaiortc.mediastreams.MediaStreamTrack
  • pyee.asyncio.AsyncIOEventEmitter
  • pyee.base.EventEmitter

Subclasses

Instance variables

prop can_pause : bool
Expand source code
@property
def can_pause(self) -> bool:
    """Returns True if this track supports pause/resume operations"""
    return True

Returns True if this track supports pause/resume operations

prop is_speaking : bool
Expand source code
@property
def is_speaking(self) -> bool:
    """
    True if the track is currently playing audio or has buffered data,
    including a small grace period to bridge gaps in streaming TTS.
    """
    has_data = len(self.frame_buffer) > 0 or len(self.audio_data_buffer) > 0
    if has_data:
        return True
    return (time() - self._last_speaking_time) < self._speaking_grace_period

True if the track is currently playing audio or has buffered data, including a small grace period to bridge gaps in streaming TTS.

Methods

async def add_new_bytes(self, audio_data: bytes)
Expand source code
async def add_new_bytes(self, audio_data: bytes):
    """
    Add new audio bytes to the buffer. Respects _accepting_audio flag
    for manual audio control mode.
    """
    if not self._accepting_audio:
        logger.debug("Audio input currently disabled, dropping audio data")
        return
    
    self.audio_data_buffer += audio_data

    while len(self.audio_data_buffer) >= self.chunk_size:
        chunk = self.audio_data_buffer[: self.chunk_size]
        self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :]
        try:
            audio_frame = self.buildAudioFrames(chunk)
            
            # If paused, add to paused buffer instead
            if self._is_paused:
                self._paused_frames.append(audio_frame)
                logger.debug("Added frame to paused buffer")
            else:
                self.frame_buffer.append(audio_frame)
                logger.debug(
                    f"Added audio frame to buffer, total frames: {len(self.frame_buffer)}"
                )
        except Exception as e:
            logger.error(f"Error building audio frame: {e}")
            break

Add new audio bytes to the buffer. Respects _accepting_audio flag for manual audio control mode.

def buildAudioFrames(self, chunk: bytes) ‑> av.audio.frame.AudioFrame
Expand source code
def buildAudioFrames(self, chunk: bytes) -> AudioFrame:
    if len(chunk) != self.chunk_size:
        logger.warning(
            f"Incorrect chunk size received {len(chunk)}, expected {self.chunk_size}"
        )

    data = np.frombuffer(chunk, dtype=np.int16)
    expected_samples = self.samples * self.channels
    if len(data) != expected_samples:
        logger.warning(
            f"Incorrect number of samples in chunk {len(data)}, expected {expected_samples}"
        )

    data = data.reshape(-1, self.channels)
    layout = "mono" if self.channels == 1 else "stereo"

    audio_frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout)
    return audio_frame
async def cleanup(self)
Expand source code
async def cleanup(self):
    self.interrupt()
    self.stop()
def enable_audio_input(self, manual_control: bool = False)
Expand source code
def enable_audio_input(self, manual_control: bool = False):
    """
    Allow fresh audio data to be buffered. When manual_control is True,
    future interrupts will pause intake until this method is called again.
    
    This is useful for preventing old audio from bleeding into new responses.
    """
    self._manual_audio_control = manual_control
    self._accepting_audio = True
    logger.debug(f"Audio input enabled (manual_control={manual_control})")

Allow fresh audio data to be buffered. When manual_control is True, future interrupts will pause intake until this method is called again.

This is useful for preventing old audio from bleeding into new responses.

def interrupt(self)
Expand source code
def interrupt(self):
    """Clear all buffers and reset state"""
    logger.info("Audio track interrupted, clearing buffers.")
    self.frame_buffer.clear()
    self.audio_data_buffer.clear()
    self._paused_frames.clear()
    self._is_paused = False
    self._last_speaking_time = 0.0
    self._synthesis_complete = False
    self._needs_last_audio_callback = False
    
    # Handle manual audio control mode
    if self._manual_audio_control:
        self._accepting_audio = False
    else:
        self._accepting_audio = True

Clear all buffers and reset state

def mark_synthesis_complete(self) ‑> None
Expand source code
def mark_synthesis_complete(self) -> None:
    """
    Mark that TTS synthesis has finished sending all audio data.
    If the buffer is already empty (all audio consumed), fires the
    on_last_audio_byte callback immediately. Otherwise, the callback
    will fire when recv() drains the remaining buffer.
    """
    self._synthesis_complete = True
    # If we're not currently speaking (grace period passed) and buffer is empty, fire immediately
    if not self.is_speaking and self._needs_last_audio_callback:
        self._needs_last_audio_callback = False
        self._synthesis_complete = False
        logger.info("[AudioTrack] Synthesis complete and buffer already empty — triggering last_audio_callback.")
        if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
            asyncio.create_task(self._last_audio_callback())

Mark that TTS synthesis has finished sending all audio data. If the buffer is already empty (all audio consumed), fires the on_last_audio_byte callback immediately. Otherwise, the callback will fire when recv() drains the remaining buffer.

def next_timestamp(self)
Expand source code
def next_timestamp(self):
    pts = int(self.frame_time)
    time_base = self.time_base_fraction
    self.frame_time += self.samples
    return pts, time_base
def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) ‑> None
Expand source code
def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None:
    """Set callback for when the final audio byte of synthesis is produced"""
    logger.info("on last audio callback")
    self._last_audio_callback = callback
    self._synthesis_complete = False
    self._needs_last_audio_callback = False

Set callback for when the final audio byte of synthesis is produced

async def pause(self) ‑> None
Expand source code
async def pause(self) -> None:
    """
    Pause audio playback. Instead of blocking recv(), we move remaining
    frames to a separate buffer so they can be resumed later.
    This approach keeps the audio flow simple for avatars.
    """
    if self._is_paused:
        logger.warning("Audio track already paused")
        return
        
    logger.info("Audio track paused - preserving current buffer state.")
    self._is_paused = True
    
    # Move current frames to paused buffer for later resume
    self._paused_frames = self.frame_buffer.copy()
    self.frame_buffer.clear()

Pause audio playback. Instead of blocking recv(), we move remaining frames to a separate buffer so they can be resumed later. This approach keeps the audio flow simple for avatars.

async def recv(self) ‑> av.audio.frame.AudioFrame
Expand source code
async def recv(self) -> AudioFrame:
    """
    Receive next audio frame. When paused, produces silence frames but keeps
    timing synchronized. This ensures smooth resume without audio glitches.
    """
    try:
        if self.readyState != "live":
            raise MediaStreamError

        if self._start is None:
            self._start = time()
            self._timestamp = 0
        else:
            self._timestamp += self.samples

        wait = self._start + (self._timestamp / self.sample_rate) - time()

        if wait > 0:
            await asyncio.sleep(wait)

        pts, time_base = self.next_timestamp()

        # When paused, always produce silence but keep timing
        # This allows smooth resume without timing jumps
        if self._is_paused:
            frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
            for p in frame.planes:
                p.update(bytes(p.buffer_size))
        elif len(self.frame_buffer) > 0:
            frame = self.frame_buffer.pop(0)
            self._is_speaking = True
            self._last_speaking_time = time()
        else:
            # No audio data available — silence
            if getattr(self, "_is_speaking", False):
                # Only declare we've stopped speaking if the grace period has passed
                # This bridges gaps in streaming TTS (like Sarvam jitter)
                if (time() - self._last_speaking_time) >= self._speaking_grace_period:
                    self._is_speaking = False

                    if self._synthesis_complete:
                        # TTS finished and buffer drained — fire callback
                        self._synthesis_complete = False
                        self._needs_last_audio_callback = False
                        logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.")
                        if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
                            asyncio.create_task(self._last_audio_callback())
                    else:
                        # Buffer temporarily empty but synthesis still in progress
                        logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.")
                        self._needs_last_audio_callback = True

            # Produce silence frame
            frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
            for p in frame.planes:
                p.update(bytes(p.buffer_size))

        frame.pts = pts
        frame.time_base = time_base
        frame.sample_rate = self.sample_rate
        return frame
    except MediaStreamError:
        raise
    except Exception as e:
        traceback.print_exc()
        logger.error(f"Error while creating tts->rtc frame: {e}")

Receive next audio frame. When paused, produces silence frames but keeps timing synchronized. This ensures smooth resume without audio glitches.

async def resume(self) ‑> None
Expand source code
async def resume(self) -> None:
    """
    Resume audio playback from paused position.
    Restores frames that were saved when paused.
    """
    if not self._is_paused:
        logger.warning("Audio track not paused, nothing to resume")
        return
        
    logger.info("Audio track resumed - restoring paused buffer.")
    self._is_paused = False
    
    # Restore frames from paused buffer
    self.frame_buffer = self._paused_frames.copy()
    self._paused_frames.clear()

Resume audio playback from paused position. Restores frames that were saved when paused.

class MediaStreamError (*args, **kwargs)
Expand source code
class MediaStreamError(Exception):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class MixingCustomAudioStreamTrack (loop)
Expand source code
class MixingCustomAudioStreamTrack(CustomAudioStreamTrack):
    """Audio track that mixes primary TTS audio with a background audio buffer, creating frames just-in-time during recv."""
    def __init__(self, loop):
        super().__init__(loop)
        self.background_audio_buffer = bytearray()

    def interrupt(self):
        super().interrupt()
        self.background_audio_buffer.clear()

    async def add_new_bytes(self, audio_data: bytes):
        """Overrides base method to buffer bytes instead of creating frames."""
        self.audio_data_buffer += audio_data

    async def add_background_bytes(self, audio_data: bytes):
        self.background_audio_buffer += audio_data

    def mix_audio(self, primary_chunk, background_chunk):
        if not background_chunk:
            return primary_chunk

        primary_arr = np.frombuffer(primary_chunk, dtype=np.int16)
        background_arr = np.frombuffer(background_chunk, dtype=np.int16)

        if len(background_arr) < len(primary_arr):
            background_arr = np.pad(background_arr, (0, len(primary_arr) - len(background_arr)), 'constant')
        elif len(background_arr) > len(primary_arr):
            background_arr = background_arr[:len(primary_arr)]

        mixed_arr = np.add(primary_arr, background_arr, dtype=np.int16)
        return mixed_arr.tobytes()

    def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None:
        """Set callback for when the final audio byte of synthesis is produced"""
        logger.info("on last audio callback")
        self._last_audio_callback = callback
        self._synthesis_complete = False
        self._needs_last_audio_callback = False

    async def recv(self) -> AudioFrame:
        """
        Overrides base method to perform mixing and just-in-time frame creation.
        """
        try:
            if self.readyState != "live":
                raise MediaStreamError

            if self._start is None:
                self._start = time()
                self._timestamp = 0
            else:
                self._timestamp += self.samples

            wait = self._start + (self._timestamp / self.sample_rate) - time()
            if wait > 0:
                await asyncio.sleep(wait)

            pts, time_base = self.next_timestamp()

            primary_chunk = b''
            has_primary = len(self.audio_data_buffer) >= self.chunk_size
            if has_primary:
                primary_chunk = self.audio_data_buffer[: self.chunk_size]
                self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :]
                self._is_speaking = True
                self._last_speaking_time = time()
            elif getattr(self, "_is_speaking", False):
                # Apply grace period for mixing track as well
                if (time() - self._last_speaking_time) >= self._speaking_grace_period:
                    self._is_speaking = False

                    if self._synthesis_complete:
                        # TTS finished and buffer drained — fire callback
                        self._synthesis_complete = False
                        self._needs_last_audio_callback = False
                        logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.")
                        if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
                            asyncio.create_task(self._last_audio_callback())
                    else:
                        # Buffer temporarily empty but synthesis still in progress
                        logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.")
                        self._needs_last_audio_callback = True

            background_chunk = b''
            has_background = len(self.background_audio_buffer) >= self.chunk_size
            if has_background:
                background_chunk = self.background_audio_buffer[: self.chunk_size]
                self.background_audio_buffer = self.background_audio_buffer[self.chunk_size :]
            
            final_chunk = None
            if has_primary:
                final_chunk = self.mix_audio(primary_chunk, background_chunk)
            elif has_background:
                final_chunk = background_chunk
            
            if final_chunk:
                frame = self.buildAudioFrames(final_chunk)
            else:
                frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
                for p in frame.planes:
                    p.update(bytes(p.buffer_size))

            frame.pts = pts
            frame.time_base = time_base
            frame.sample_rate = self.sample_rate
            return frame
        except MediaStreamError:
            raise
        except Exception as e:
            traceback.print_exc()
            logger.error(f"Error while creating tts->rtc frame: {e}")

Audio track that mixes primary TTS audio with a background audio buffer, creating frames just-in-time during recv.

Ancestors

  • CustomAudioStreamTrack
  • videosdk.custom_audio_track.CustomAudioTrack
  • vsaiortc.mediastreams.MediaStreamTrack
  • pyee.asyncio.AsyncIOEventEmitter
  • pyee.base.EventEmitter

Subclasses

Methods

async def add_background_bytes(self, audio_data: bytes)
Expand source code
async def add_background_bytes(self, audio_data: bytes):
    self.background_audio_buffer += audio_data
async def add_new_bytes(self, audio_data: bytes)
Expand source code
async def add_new_bytes(self, audio_data: bytes):
    """Overrides base method to buffer bytes instead of creating frames."""
    self.audio_data_buffer += audio_data

Overrides base method to buffer bytes instead of creating frames.

def mix_audio(self, primary_chunk, background_chunk)
Expand source code
def mix_audio(self, primary_chunk, background_chunk):
    if not background_chunk:
        return primary_chunk

    primary_arr = np.frombuffer(primary_chunk, dtype=np.int16)
    background_arr = np.frombuffer(background_chunk, dtype=np.int16)

    if len(background_arr) < len(primary_arr):
        background_arr = np.pad(background_arr, (0, len(primary_arr) - len(background_arr)), 'constant')
    elif len(background_arr) > len(primary_arr):
        background_arr = background_arr[:len(primary_arr)]

    mixed_arr = np.add(primary_arr, background_arr, dtype=np.int16)
    return mixed_arr.tobytes()
async def recv(self) ‑> av.audio.frame.AudioFrame
Expand source code
async def recv(self) -> AudioFrame:
    """
    Overrides base method to perform mixing and just-in-time frame creation.
    """
    try:
        if self.readyState != "live":
            raise MediaStreamError

        if self._start is None:
            self._start = time()
            self._timestamp = 0
        else:
            self._timestamp += self.samples

        wait = self._start + (self._timestamp / self.sample_rate) - time()
        if wait > 0:
            await asyncio.sleep(wait)

        pts, time_base = self.next_timestamp()

        primary_chunk = b''
        has_primary = len(self.audio_data_buffer) >= self.chunk_size
        if has_primary:
            primary_chunk = self.audio_data_buffer[: self.chunk_size]
            self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :]
            self._is_speaking = True
            self._last_speaking_time = time()
        elif getattr(self, "_is_speaking", False):
            # Apply grace period for mixing track as well
            if (time() - self._last_speaking_time) >= self._speaking_grace_period:
                self._is_speaking = False

                if self._synthesis_complete:
                    # TTS finished and buffer drained — fire callback
                    self._synthesis_complete = False
                    self._needs_last_audio_callback = False
                    logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.")
                    if hasattr(self, "_last_audio_callback") and self._last_audio_callback:
                        asyncio.create_task(self._last_audio_callback())
                else:
                    # Buffer temporarily empty but synthesis still in progress
                    logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.")
                    self._needs_last_audio_callback = True

        background_chunk = b''
        has_background = len(self.background_audio_buffer) >= self.chunk_size
        if has_background:
            background_chunk = self.background_audio_buffer[: self.chunk_size]
            self.background_audio_buffer = self.background_audio_buffer[self.chunk_size :]
        
        final_chunk = None
        if has_primary:
            final_chunk = self.mix_audio(primary_chunk, background_chunk)
        elif has_background:
            final_chunk = background_chunk
        
        if final_chunk:
            frame = self.buildAudioFrames(final_chunk)
        else:
            frame = AudioFrame(format="s16", layout="mono", samples=self.samples)
            for p in frame.planes:
                p.update(bytes(p.buffer_size))

        frame.pts = pts
        frame.time_base = time_base
        frame.sample_rate = self.sample_rate
        return frame
    except MediaStreamError:
        raise
    except Exception as e:
        traceback.print_exc()
        logger.error(f"Error while creating tts->rtc frame: {e}")

Overrides base method to perform mixing and just-in-time frame creation.

Inherited members

class TeeCustomAudioStreamTrack (loop, sinks=None, pipeline=None)
Expand source code
class TeeCustomAudioStreamTrack(CustomAudioStreamTrack):
    """Audio track that duplicates outgoing audio bytes to registered sinks such as avatar plugins or local speakers."""

    def __init__(self, loop, sinks=None, pipeline=None):
        super().__init__(loop)
        self.sinks = sinks if sinks is not None else []
        self.pipeline = pipeline

    def add_sink(self, sink):
        """Add a new sink (callback or object)"""
        if sink not in self.sinks:
            self.sinks.append(sink)

    def remove_sink(self, sink):
        if sink in self.sinks:
            self.sinks.remove(sink)

    async def add_new_bytes(self, audio_data: bytes):
        await super().add_new_bytes(audio_data)

        # Route audio to sinks (avatars, etc.)
        for sink in self.sinks:
            try:
                if hasattr(sink, "handle_audio_input"):
                    await sink.handle_audio_input(audio_data)
                elif callable(sink):
                    if asyncio.iscoroutinefunction(sink):
                        await sink(audio_data)
                    else:
                        sink(audio_data)
            except Exception as e:
                import logging as _logging
                _logging.getLogger(__name__).warning("Avatar sink error (audio will continue): %s", e)

    async def recv(self) -> AudioFrame:
        """
        When avatar sinks are present the Avatar Server publishes audio to the meeting,
        so we must NOT publish the raw TTS audio directly (it would cause double audio).
        We still call super().recv() so that all internal state tracking and callbacks
        (is_speaking, on_first_audio_byte, on_last_audio_byte) continue to work correctly.
        The actual audio payload is replaced with silence before handing back to WebRTC.
        """
        frame = await super().recv()
        if self.sinks and frame is not None:
            silence = AudioFrame(format="s16", layout="mono", samples=self.samples)
            for p in silence.planes:
                p.update(bytes(p.buffer_size))
            silence.pts = frame.pts
            silence.time_base = frame.time_base
            silence.sample_rate = frame.sample_rate
            return silence
        return frame

Audio track that duplicates outgoing audio bytes to registered sinks such as avatar plugins or local speakers.

Ancestors

  • CustomAudioStreamTrack
  • videosdk.custom_audio_track.CustomAudioTrack
  • vsaiortc.mediastreams.MediaStreamTrack
  • pyee.asyncio.AsyncIOEventEmitter
  • pyee.base.EventEmitter

Subclasses

Methods

def add_sink(self, sink)
Expand source code
def add_sink(self, sink):
    """Add a new sink (callback or object)"""
    if sink not in self.sinks:
        self.sinks.append(sink)

Add a new sink (callback or object)

async def recv(self) ‑> av.audio.frame.AudioFrame
Expand source code
async def recv(self) -> AudioFrame:
    """
    When avatar sinks are present the Avatar Server publishes audio to the meeting,
    so we must NOT publish the raw TTS audio directly (it would cause double audio).
    We still call super().recv() so that all internal state tracking and callbacks
    (is_speaking, on_first_audio_byte, on_last_audio_byte) continue to work correctly.
    The actual audio payload is replaced with silence before handing back to WebRTC.
    """
    frame = await super().recv()
    if self.sinks and frame is not None:
        silence = AudioFrame(format="s16", layout="mono", samples=self.samples)
        for p in silence.planes:
            p.update(bytes(p.buffer_size))
        silence.pts = frame.pts
        silence.time_base = frame.time_base
        silence.sample_rate = frame.sample_rate
        return silence
    return frame

When avatar sinks are present the Avatar Server publishes audio to the meeting, so we must NOT publish the raw TTS audio directly (it would cause double audio). We still call super().recv() so that all internal state tracking and callbacks (is_speaking, on_first_audio_byte, on_last_audio_byte) continue to work correctly. The actual audio payload is replaced with silence before handing back to WebRTC.

def remove_sink(self, sink)
Expand source code
def remove_sink(self, sink):
    if sink in self.sinks:
        self.sinks.remove(sink)

Inherited members

class TeeMixingCustomAudioStreamTrack (loop, sinks=None, pipeline=None)
Expand source code
class TeeMixingCustomAudioStreamTrack(MixingCustomAudioStreamTrack):
    """Combines mixing and tee functionality, mixing background audio while also forwarding audio bytes to registered sinks."""

    def __init__(self, loop, sinks=None, pipeline=None):
        super().__init__(loop)
        self.sinks = sinks if sinks is not None else []
        self.pipeline = pipeline

    async def add_new_bytes(self, audio_data: bytes):
        await super().add_new_bytes(audio_data)

        # Route audio to sinks (avatars, etc.)
        for sink in self.sinks:
            try:
                if hasattr(sink, "handle_audio_input"):
                    await sink.handle_audio_input(audio_data)
                elif callable(sink):
                    if asyncio.iscoroutinefunction(sink):
                        await sink(audio_data)
                    else:
                        sink(audio_data)
            except Exception as e:
                import logging as _logging
                _logging.getLogger(__name__).warning("Avatar sink error (audio will continue): %s", e)

    async def recv(self) -> AudioFrame:
        """Silence the direct WebRTC output when avatar sinks are handling playback."""
        frame = await super().recv()
        if self.sinks and frame is not None:
            silence = AudioFrame(format="s16", layout="mono", samples=self.samples)
            for p in silence.planes:
                p.update(bytes(p.buffer_size))
            silence.pts = frame.pts
            silence.time_base = frame.time_base
            silence.sample_rate = frame.sample_rate
            return silence
        return frame

Combines mixing and tee functionality, mixing background audio while also forwarding audio bytes to registered sinks.

Ancestors

Methods

async def recv(self) ‑> av.audio.frame.AudioFrame
Expand source code
async def recv(self) -> AudioFrame:
    """Silence the direct WebRTC output when avatar sinks are handling playback."""
    frame = await super().recv()
    if self.sinks and frame is not None:
        silence = AudioFrame(format="s16", layout="mono", samples=self.samples)
        for p in silence.planes:
            p.update(bytes(p.buffer_size))
        silence.pts = frame.pts
        silence.time_base = frame.time_base
        silence.sample_rate = frame.sample_rate
        return silence
    return frame

Silence the direct WebRTC output when avatar sinks are handling playback.

Inherited members