Module agents.room.output_stream
Classes
class CustomAudioStreamTrack (loop)-
Expand source code
class CustomAudioStreamTrack(CustomAudioTrack): """ Base audio track implementation using a frame buffer. Audio frames are created as soon as audio data is received. Supports optional pause/resume for false-interrupt detection while maintaining compatibility with avatar plugins that need simple audio flow. """ def __init__(self, loop): super().__init__() self.loop = loop self._start = None self._timestamp = 0 self.frame_buffer = [] self.audio_data_buffer = bytearray() self.frame_time = 0 self.sample_rate = 24000 self.channels = 1 self.sample_width = 2 self.time_base_fraction = Fraction(1, self.sample_rate) self.samples = int(AUDIO_PTIME * self.sample_rate) self.chunk_size = int(self.samples * self.channels * self.sample_width) self._synthesis_complete = False self._needs_last_audio_callback = False self._last_speaking_time = 0.0 self._speaking_grace_period = 0.2 # 200ms grace period for jitter # Pause/resume support - simple flag-based (no blocking) self._is_paused = False self._paused_frames = [] # Separate buffer for paused content self._accepting_audio = True self._manual_audio_control = False # Fade-out on real interruption (seconds). 0 disables the fade and # restores the legacy instant buffer-clear. self.interrupt_fade_duration: float = 0.4 self._faded_tail_pending: bool = False self._samples_played: int = 0 self._cumulative_input_samples: int = 0 self._synthesis_start_played: int = 0 self._synthesis_start_pushed: int = 0 @property def can_pause(self) -> bool: """Returns True if this track supports pause/resume operations""" return True def mark_synthesis_start(self) -> None: """Anchor the per-synthesis baseline for played/pushed sample counters. Called by SpeechGeneration at the top of each synthesize() so that snapshot_playback() returns deltas relative to this turn only. """ self._synthesis_start_played = self._samples_played self._synthesis_start_pushed = self._cumulative_input_samples def snapshot_playback(self) -> tuple[int, int]: """Return (samples_played, samples_pushed) deltas for the active synthesis.""" played = max(0, self._samples_played - self._synthesis_start_played) pushed = max(0, self._cumulative_input_samples - self._synthesis_start_pushed) return played, pushed def _frame_to_pcm_bytes(self, frame: AudioFrame) -> bytes: """Extract raw int16 PCM bytes from an AudioFrame built by buildAudioFrames(). Uses ``to_ndarray()`` (the exact inverse of ``from_ndarray()``) rather than reading ``frame.planes[0]`` directly, since a plane's buffer can be alignment-padded beyond ``samples * sample_width``. """ arr = frame.to_ndarray() return arr.astype(np.int16, copy=False).tobytes() def _apply_fade_out(self, pcm: bytes) -> bytes: """Apply an exponential fade-out (0 dB → ~-60 dB) to mono int16 PCM. Mono assumption (channels == 1): the flat sample array is the per-sample timeline. A multi-channel track would need the ramp repeated per channel. """ samples = np.frombuffer(pcm, dtype=np.int16) n = samples.shape[0] if n == 0: return b"" t = np.linspace(0.0, 1.0, num=n, dtype=np.float32) gain = np.power(10.0, -3.0 * t, dtype=np.float32) gain[-1] = 0.0 faded = np.clip( samples.astype(np.float32) * gain, np.iinfo(np.int16).min, np.iinfo(np.int16).max, ) return faded.astype(np.int16).tobytes() def _collect_pending_pcm(self) -> bytes: """Return the TTS PCM still queued for playback, for fade-out on interrupt. The base track keeps it as built AudioFrames in ``frame_buffer`` (and in ``_paused_frames`` while paused). Overridden by mixing tracks, which keep raw bytes in ``audio_data_buffer`` instead. """ pending = bytearray() for f in (*self.frame_buffer, *self._paused_frames): try: pending += self._frame_to_pcm_bytes(f) except Exception as e: logger.warning(f"Skipping frame during fade extraction: {e}") return bytes(pending) def _load_faded_audio(self, faded_bytes: bytes) -> None: """Re-chunk faded PCM into AudioFrames and queue them in frame_buffer. The final partial chunk is zero-padded to chunk_size (it is already near silence at the tail of the fade, so the padding is inaudible). """ buf = bytearray(faded_bytes) rem = len(buf) % self.chunk_size if rem: buf += bytes(self.chunk_size - rem) for i in range(0, len(buf), self.chunk_size): try: self.frame_buffer.append( self.buildAudioFrames(bytes(buf[i : i + self.chunk_size])) ) except Exception as e: logger.error(f"Error building faded audio frame: {e}") def interrupt(self): """Interrupt playback. When ``interrupt_fade_duration > 0`` the agent's buffered TTS audio is not cut instantly — a short exponential fade-out tail is kept so the voice ducks gracefully to silence. Otherwise all buffers are cleared immediately (legacy behavior). """ if self._faded_tail_pending: logger.debug("Audio track interrupt re-entered — faded tail already pending.") self._is_paused = False self._last_speaking_time = 0.0 self._synthesis_complete = False self._needs_last_audio_callback = False self._accepting_audio = not self._manual_audio_control return pending = b"" if self.interrupt_fade_duration and self.interrupt_fade_duration > 0: pending = self._collect_pending_pcm() self.frame_buffer.clear() self.audio_data_buffer.clear() self._paused_frames.clear() if pending: fade_frames = max(1, round(self.interrupt_fade_duration / AUDIO_PTIME)) fade_bytes_len = fade_frames * self.chunk_size faded_tail = self._apply_fade_out(bytes(pending[:fade_bytes_len])) if faded_tail: self._load_faded_audio(faded_tail) self._faded_tail_pending = True logger.info( f"Audio track interrupted — fading out {len(faded_tail) // self.chunk_size} frame(s)." ) else: logger.info("Audio track interrupted, clearing buffers.") self._is_paused = False self._last_speaking_time = 0.0 self._synthesis_complete = False self._needs_last_audio_callback = False # Handle manual audio control mode if self._manual_audio_control: self._accepting_audio = False else: self._accepting_audio = True async def pause(self) -> None: """ Pause audio playback. Instead of blocking recv(), we move remaining frames to a separate buffer so they can be resumed later. This approach keeps the audio flow simple for avatars. """ if self._is_paused: logger.warning("Audio track already paused") return logger.info("Audio track paused - preserving current buffer state.") self._is_paused = True # Move current frames to paused buffer for later resume self._paused_frames = self.frame_buffer.copy() self.frame_buffer.clear() async def resume(self) -> None: """ Resume audio playback from paused position. Restores frames that were saved when paused. """ if not self._is_paused: logger.warning("Audio track not paused, nothing to resume") return logger.info("Audio track resumed - restoring paused buffer.") self._is_paused = False # Restore frames from paused buffer self.frame_buffer = self._paused_frames.copy() self._paused_frames.clear() def enable_audio_input(self, manual_control: bool = False): """ Allow fresh audio data to be buffered. When manual_control is True, future interrupts will pause intake until this method is called again. This is useful for preventing old audio from bleeding into new responses. """ self._manual_audio_control = manual_control self._accepting_audio = True self._faded_tail_pending = False logger.debug(f"Audio input enabled (manual_control={manual_control})") def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when the final audio byte of synthesis is produced""" logger.info("on last audio callback") self._last_audio_callback = callback self._synthesis_complete = False self._needs_last_audio_callback = False @property def is_speaking(self) -> bool: """ True if the track is currently playing audio or has buffered data, including a small grace period to bridge gaps in streaming TTS. """ has_data = len(self.frame_buffer) > 0 or len(self.audio_data_buffer) > 0 if has_data: return True return (time() - self._last_speaking_time) < self._speaking_grace_period def mark_synthesis_complete(self) -> None: """ Mark that TTS synthesis has finished sending all audio data. If the buffer is already empty (all audio consumed), fires the on_last_audio_byte callback immediately. Otherwise, the callback will fire when recv() drains the remaining buffer. """ self._synthesis_complete = True # If we're not currently speaking (grace period passed) and buffer is empty, fire immediately if not self.is_speaking and self._needs_last_audio_callback: self._needs_last_audio_callback = False self._synthesis_complete = False logger.info("[AudioTrack] Synthesis complete and buffer already empty — triggering last_audio_callback.") if hasattr(self, "_last_audio_callback") and self._last_audio_callback: asyncio.create_task(self._last_audio_callback()) async def add_new_bytes(self, audio_data: bytes): """ Add new audio bytes to the buffer. Respects _accepting_audio flag for manual audio control mode. """ if not self._accepting_audio: logger.debug("Audio input currently disabled, dropping audio data") return self._cumulative_input_samples += len(audio_data) // (self.channels * self.sample_width) self.audio_data_buffer += audio_data while len(self.audio_data_buffer) >= self.chunk_size: chunk = self.audio_data_buffer[: self.chunk_size] self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :] try: audio_frame = self.buildAudioFrames(chunk) # If paused, add to paused buffer instead if self._is_paused: self._paused_frames.append(audio_frame) logger.debug("Added frame to paused buffer") else: self.frame_buffer.append(audio_frame) logger.debug( f"Added audio frame to buffer, total frames: {len(self.frame_buffer)}" ) except Exception as e: logger.error(f"Error building audio frame: {e}") break def buildAudioFrames(self, chunk: bytes) -> AudioFrame: if len(chunk) != self.chunk_size: logger.warning( f"Incorrect chunk size received {len(chunk)}, expected {self.chunk_size}" ) data = np.frombuffer(chunk, dtype=np.int16) expected_samples = self.samples * self.channels if len(data) != expected_samples: logger.warning( f"Incorrect number of samples in chunk {len(data)}, expected {expected_samples}" ) data = data.reshape(-1, self.channels) layout = "mono" if self.channels == 1 else "stereo" audio_frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout) return audio_frame def next_timestamp(self): pts = int(self.frame_time) time_base = self.time_base_fraction self.frame_time += self.samples return pts, time_base async def recv(self) -> AudioFrame: """ Receive next audio frame. When paused, produces silence frames but keeps timing synchronized. This ensures smooth resume without audio glitches. """ try: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time() self._timestamp = 0 else: self._timestamp += self.samples wait = self._start + (self._timestamp / self.sample_rate) - time() if wait > 0: await asyncio.sleep(wait) pts, time_base = self.next_timestamp() # When paused, always produce silence but keep timing # This allows smooth resume without timing jumps if self._is_paused: frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) elif len(self.frame_buffer) > 0: frame = self.frame_buffer.pop(0) self._samples_played += self.samples self._is_speaking = True self._last_speaking_time = time() else: # No audio data available — silence if getattr(self, "_is_speaking", False): # Only declare we've stopped speaking if the grace period has passed # This bridges gaps in streaming TTS (like Sarvam jitter) if (time() - self._last_speaking_time) >= self._speaking_grace_period: self._is_speaking = False if self._synthesis_complete: # TTS finished and buffer drained — fire callback self._synthesis_complete = False self._needs_last_audio_callback = False logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.") if hasattr(self, "_last_audio_callback") and self._last_audio_callback: asyncio.create_task(self._last_audio_callback()) else: # Buffer temporarily empty but synthesis still in progress logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.") self._needs_last_audio_callback = True # Produce silence frame frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) frame.pts = pts frame.time_base = time_base frame.sample_rate = self.sample_rate return frame except MediaStreamError: raise except Exception as e: traceback.print_exc() logger.error(f"Error while creating tts->rtc frame: {e}") async def cleanup(self): self.interrupt() self.stop()Base audio track implementation using a frame buffer. Audio frames are created as soon as audio data is received.
Supports optional pause/resume for false-interrupt detection while maintaining compatibility with avatar plugins that need simple audio flow.
Ancestors
- videosdk.custom_audio_track.CustomAudioTrack
- vsaiortc.mediastreams.MediaStreamTrack
- pyee.asyncio.AsyncIOEventEmitter
- pyee.base.EventEmitter
Subclasses
Instance variables
prop can_pause : bool-
Expand source code
@property def can_pause(self) -> bool: """Returns True if this track supports pause/resume operations""" return TrueReturns True if this track supports pause/resume operations
prop is_speaking : bool-
Expand source code
@property def is_speaking(self) -> bool: """ True if the track is currently playing audio or has buffered data, including a small grace period to bridge gaps in streaming TTS. """ has_data = len(self.frame_buffer) > 0 or len(self.audio_data_buffer) > 0 if has_data: return True return (time() - self._last_speaking_time) < self._speaking_grace_periodTrue if the track is currently playing audio or has buffered data, including a small grace period to bridge gaps in streaming TTS.
Methods
async def add_new_bytes(self, audio_data: bytes)-
Expand source code
async def add_new_bytes(self, audio_data: bytes): """ Add new audio bytes to the buffer. Respects _accepting_audio flag for manual audio control mode. """ if not self._accepting_audio: logger.debug("Audio input currently disabled, dropping audio data") return self._cumulative_input_samples += len(audio_data) // (self.channels * self.sample_width) self.audio_data_buffer += audio_data while len(self.audio_data_buffer) >= self.chunk_size: chunk = self.audio_data_buffer[: self.chunk_size] self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :] try: audio_frame = self.buildAudioFrames(chunk) # If paused, add to paused buffer instead if self._is_paused: self._paused_frames.append(audio_frame) logger.debug("Added frame to paused buffer") else: self.frame_buffer.append(audio_frame) logger.debug( f"Added audio frame to buffer, total frames: {len(self.frame_buffer)}" ) except Exception as e: logger.error(f"Error building audio frame: {e}") breakAdd new audio bytes to the buffer. Respects _accepting_audio flag for manual audio control mode.
def buildAudioFrames(self, chunk: bytes) ‑> av.audio.frame.AudioFrame-
Expand source code
def buildAudioFrames(self, chunk: bytes) -> AudioFrame: if len(chunk) != self.chunk_size: logger.warning( f"Incorrect chunk size received {len(chunk)}, expected {self.chunk_size}" ) data = np.frombuffer(chunk, dtype=np.int16) expected_samples = self.samples * self.channels if len(data) != expected_samples: logger.warning( f"Incorrect number of samples in chunk {len(data)}, expected {expected_samples}" ) data = data.reshape(-1, self.channels) layout = "mono" if self.channels == 1 else "stereo" audio_frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout) return audio_frame async def cleanup(self)-
Expand source code
async def cleanup(self): self.interrupt() self.stop() def enable_audio_input(self, manual_control: bool = False)-
Expand source code
def enable_audio_input(self, manual_control: bool = False): """ Allow fresh audio data to be buffered. When manual_control is True, future interrupts will pause intake until this method is called again. This is useful for preventing old audio from bleeding into new responses. """ self._manual_audio_control = manual_control self._accepting_audio = True self._faded_tail_pending = False logger.debug(f"Audio input enabled (manual_control={manual_control})")Allow fresh audio data to be buffered. When manual_control is True, future interrupts will pause intake until this method is called again.
This is useful for preventing old audio from bleeding into new responses.
def interrupt(self)-
Expand source code
def interrupt(self): """Interrupt playback. When ``interrupt_fade_duration > 0`` the agent's buffered TTS audio is not cut instantly — a short exponential fade-out tail is kept so the voice ducks gracefully to silence. Otherwise all buffers are cleared immediately (legacy behavior). """ if self._faded_tail_pending: logger.debug("Audio track interrupt re-entered — faded tail already pending.") self._is_paused = False self._last_speaking_time = 0.0 self._synthesis_complete = False self._needs_last_audio_callback = False self._accepting_audio = not self._manual_audio_control return pending = b"" if self.interrupt_fade_duration and self.interrupt_fade_duration > 0: pending = self._collect_pending_pcm() self.frame_buffer.clear() self.audio_data_buffer.clear() self._paused_frames.clear() if pending: fade_frames = max(1, round(self.interrupt_fade_duration / AUDIO_PTIME)) fade_bytes_len = fade_frames * self.chunk_size faded_tail = self._apply_fade_out(bytes(pending[:fade_bytes_len])) if faded_tail: self._load_faded_audio(faded_tail) self._faded_tail_pending = True logger.info( f"Audio track interrupted — fading out {len(faded_tail) // self.chunk_size} frame(s)." ) else: logger.info("Audio track interrupted, clearing buffers.") self._is_paused = False self._last_speaking_time = 0.0 self._synthesis_complete = False self._needs_last_audio_callback = False # Handle manual audio control mode if self._manual_audio_control: self._accepting_audio = False else: self._accepting_audio = TrueInterrupt playback.
When
interrupt_fade_duration > 0the agent's buffered TTS audio is not cut instantly — a short exponential fade-out tail is kept so the voice ducks gracefully to silence. Otherwise all buffers are cleared immediately (legacy behavior). def mark_synthesis_complete(self) ‑> None-
Expand source code
def mark_synthesis_complete(self) -> None: """ Mark that TTS synthesis has finished sending all audio data. If the buffer is already empty (all audio consumed), fires the on_last_audio_byte callback immediately. Otherwise, the callback will fire when recv() drains the remaining buffer. """ self._synthesis_complete = True # If we're not currently speaking (grace period passed) and buffer is empty, fire immediately if not self.is_speaking and self._needs_last_audio_callback: self._needs_last_audio_callback = False self._synthesis_complete = False logger.info("[AudioTrack] Synthesis complete and buffer already empty — triggering last_audio_callback.") if hasattr(self, "_last_audio_callback") and self._last_audio_callback: asyncio.create_task(self._last_audio_callback())Mark that TTS synthesis has finished sending all audio data. If the buffer is already empty (all audio consumed), fires the on_last_audio_byte callback immediately. Otherwise, the callback will fire when recv() drains the remaining buffer.
def mark_synthesis_start(self) ‑> None-
Expand source code
def mark_synthesis_start(self) -> None: """Anchor the per-synthesis baseline for played/pushed sample counters. Called by SpeechGeneration at the top of each synthesize() so that snapshot_playback() returns deltas relative to this turn only. """ self._synthesis_start_played = self._samples_played self._synthesis_start_pushed = self._cumulative_input_samplesAnchor the per-synthesis baseline for played/pushed sample counters. Called by SpeechGeneration at the top of each synthesize() so that snapshot_playback() returns deltas relative to this turn only.
def next_timestamp(self)-
Expand source code
def next_timestamp(self): pts = int(self.frame_time) time_base = self.time_base_fraction self.frame_time += self.samples return pts, time_base def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) ‑> None-
Expand source code
def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when the final audio byte of synthesis is produced""" logger.info("on last audio callback") self._last_audio_callback = callback self._synthesis_complete = False self._needs_last_audio_callback = FalseSet callback for when the final audio byte of synthesis is produced
async def pause(self) ‑> None-
Expand source code
async def pause(self) -> None: """ Pause audio playback. Instead of blocking recv(), we move remaining frames to a separate buffer so they can be resumed later. This approach keeps the audio flow simple for avatars. """ if self._is_paused: logger.warning("Audio track already paused") return logger.info("Audio track paused - preserving current buffer state.") self._is_paused = True # Move current frames to paused buffer for later resume self._paused_frames = self.frame_buffer.copy() self.frame_buffer.clear()Pause audio playback. Instead of blocking recv(), we move remaining frames to a separate buffer so they can be resumed later. This approach keeps the audio flow simple for avatars.
async def recv(self) ‑> av.audio.frame.AudioFrame-
Expand source code
async def recv(self) -> AudioFrame: """ Receive next audio frame. When paused, produces silence frames but keeps timing synchronized. This ensures smooth resume without audio glitches. """ try: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time() self._timestamp = 0 else: self._timestamp += self.samples wait = self._start + (self._timestamp / self.sample_rate) - time() if wait > 0: await asyncio.sleep(wait) pts, time_base = self.next_timestamp() # When paused, always produce silence but keep timing # This allows smooth resume without timing jumps if self._is_paused: frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) elif len(self.frame_buffer) > 0: frame = self.frame_buffer.pop(0) self._samples_played += self.samples self._is_speaking = True self._last_speaking_time = time() else: # No audio data available — silence if getattr(self, "_is_speaking", False): # Only declare we've stopped speaking if the grace period has passed # This bridges gaps in streaming TTS (like Sarvam jitter) if (time() - self._last_speaking_time) >= self._speaking_grace_period: self._is_speaking = False if self._synthesis_complete: # TTS finished and buffer drained — fire callback self._synthesis_complete = False self._needs_last_audio_callback = False logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.") if hasattr(self, "_last_audio_callback") and self._last_audio_callback: asyncio.create_task(self._last_audio_callback()) else: # Buffer temporarily empty but synthesis still in progress logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.") self._needs_last_audio_callback = True # Produce silence frame frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) frame.pts = pts frame.time_base = time_base frame.sample_rate = self.sample_rate return frame except MediaStreamError: raise except Exception as e: traceback.print_exc() logger.error(f"Error while creating tts->rtc frame: {e}")Receive next audio frame. When paused, produces silence frames but keeps timing synchronized. This ensures smooth resume without audio glitches.
async def resume(self) ‑> None-
Expand source code
async def resume(self) -> None: """ Resume audio playback from paused position. Restores frames that were saved when paused. """ if not self._is_paused: logger.warning("Audio track not paused, nothing to resume") return logger.info("Audio track resumed - restoring paused buffer.") self._is_paused = False # Restore frames from paused buffer self.frame_buffer = self._paused_frames.copy() self._paused_frames.clear()Resume audio playback from paused position. Restores frames that were saved when paused.
def snapshot_playback(self) ‑> tuple[int, int]-
Expand source code
def snapshot_playback(self) -> tuple[int, int]: """Return (samples_played, samples_pushed) deltas for the active synthesis.""" played = max(0, self._samples_played - self._synthesis_start_played) pushed = max(0, self._cumulative_input_samples - self._synthesis_start_pushed) return played, pushedReturn (samples_played, samples_pushed) deltas for the active synthesis.
class MediaStreamError (*args, **kwargs)-
Expand source code
class MediaStreamError(Exception): passCommon base class for all non-exit exceptions.
Ancestors
- builtins.Exception
- builtins.BaseException
class MixingCustomAudioStreamTrack (loop)-
Expand source code
class MixingCustomAudioStreamTrack(CustomAudioStreamTrack): """Audio track that mixes primary TTS audio with a background audio buffer, creating frames just-in-time during recv.""" def __init__(self, loop): super().__init__(loop) self.background_audio_buffer = bytearray() def interrupt(self): """Interrupt playback. The base implementation applies the exponential fade-out — here the overridden ``_collect_pending_pcm`` / ``_load_faded_audio`` make it act on ``audio_data_buffer`` (where this track keeps queued TTS audio). ``background_audio_buffer`` is cleared outright — background audio is never faded. """ super().interrupt() self.background_audio_buffer.clear() def _collect_pending_pcm(self) -> bytes: """Mixing tracks build frames just-in-time, so queued TTS audio lives in ``audio_data_buffer`` as raw bytes. ``background_audio_buffer`` is deliberately excluded — background audio is never faded. """ return bytes(self.audio_data_buffer) def _load_faded_audio(self, faded_bytes: bytes) -> None: """Mixing tracks play from ``audio_data_buffer``; queue the faded tail there. Padded to a chunk_size multiple so recv() fully drains it — a sub-chunk remainder would never be consumed and would leave ``is_speaking`` stuck True. """ buf = bytearray(faded_bytes) rem = len(buf) % self.chunk_size if rem: buf += bytes(self.chunk_size - rem) self.audio_data_buffer = buf async def add_new_bytes(self, audio_data: bytes): """Overrides base method to buffer bytes instead of creating frames.""" self._cumulative_input_samples += len(audio_data) // (self.channels * self.sample_width) self.audio_data_buffer += audio_data async def add_background_bytes(self, audio_data: bytes): self.background_audio_buffer += audio_data def mix_audio(self, primary_chunk, background_chunk): if not background_chunk: return primary_chunk primary_arr = np.frombuffer(primary_chunk, dtype=np.int16) background_arr = np.frombuffer(background_chunk, dtype=np.int16) if len(background_arr) < len(primary_arr): background_arr = np.pad(background_arr, (0, len(primary_arr) - len(background_arr)), 'constant') elif len(background_arr) > len(primary_arr): background_arr = background_arr[:len(primary_arr)] # Sum in int32 then clip to int16 range to avoid wrap-around distortion on loud peaks. mixed_arr = np.clip( primary_arr.astype(np.int32) + background_arr.astype(np.int32), np.iinfo(np.int16).min, np.iinfo(np.int16).max, ).astype(np.int16) return mixed_arr.tobytes() def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when the final audio byte of synthesis is produced""" logger.info("on last audio callback") self._last_audio_callback = callback self._synthesis_complete = False self._needs_last_audio_callback = False async def recv(self) -> AudioFrame: """ Overrides base method to perform mixing and just-in-time frame creation. """ try: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time() self._timestamp = 0 else: self._timestamp += self.samples wait = self._start + (self._timestamp / self.sample_rate) - time() if wait > 0: await asyncio.sleep(wait) pts, time_base = self.next_timestamp() primary_chunk = b'' has_primary = len(self.audio_data_buffer) >= self.chunk_size if has_primary: primary_chunk = self.audio_data_buffer[: self.chunk_size] self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :] self._samples_played += self.samples self._is_speaking = True self._last_speaking_time = time() elif getattr(self, "_is_speaking", False): # Apply grace period for mixing track as well if (time() - self._last_speaking_time) >= self._speaking_grace_period: self._is_speaking = False if self._synthesis_complete: # TTS finished and buffer drained — fire callback self._synthesis_complete = False self._needs_last_audio_callback = False logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.") if hasattr(self, "_last_audio_callback") and self._last_audio_callback: asyncio.create_task(self._last_audio_callback()) else: # Buffer temporarily empty but synthesis still in progress logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.") self._needs_last_audio_callback = True background_chunk = b'' has_background = len(self.background_audio_buffer) >= self.chunk_size if has_background: background_chunk = self.background_audio_buffer[: self.chunk_size] self.background_audio_buffer = self.background_audio_buffer[self.chunk_size :] final_chunk = None if has_primary: final_chunk = self.mix_audio(primary_chunk, background_chunk) elif has_background: final_chunk = background_chunk if final_chunk: frame = self.buildAudioFrames(final_chunk) else: frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) frame.pts = pts frame.time_base = time_base frame.sample_rate = self.sample_rate return frame except MediaStreamError: raise except Exception as e: traceback.print_exc() logger.error(f"Error while creating tts->rtc frame: {e}")Audio track that mixes primary TTS audio with a background audio buffer, creating frames just-in-time during recv.
Ancestors
- CustomAudioStreamTrack
- videosdk.custom_audio_track.CustomAudioTrack
- vsaiortc.mediastreams.MediaStreamTrack
- pyee.asyncio.AsyncIOEventEmitter
- pyee.base.EventEmitter
Subclasses
Methods
async def add_background_bytes(self, audio_data: bytes)-
Expand source code
async def add_background_bytes(self, audio_data: bytes): self.background_audio_buffer += audio_data async def add_new_bytes(self, audio_data: bytes)-
Expand source code
async def add_new_bytes(self, audio_data: bytes): """Overrides base method to buffer bytes instead of creating frames.""" self._cumulative_input_samples += len(audio_data) // (self.channels * self.sample_width) self.audio_data_buffer += audio_dataOverrides base method to buffer bytes instead of creating frames.
def interrupt(self)-
Expand source code
def interrupt(self): """Interrupt playback. The base implementation applies the exponential fade-out — here the overridden ``_collect_pending_pcm`` / ``_load_faded_audio`` make it act on ``audio_data_buffer`` (where this track keeps queued TTS audio). ``background_audio_buffer`` is cleared outright — background audio is never faded. """ super().interrupt() self.background_audio_buffer.clear()Interrupt playback.
The base implementation applies the exponential fade-out — here the overridden
_collect_pending_pcm/_load_faded_audiomake it act onaudio_data_buffer(where this track keeps queued TTS audio).background_audio_bufferis cleared outright — background audio is never faded. def mix_audio(self, primary_chunk, background_chunk)-
Expand source code
def mix_audio(self, primary_chunk, background_chunk): if not background_chunk: return primary_chunk primary_arr = np.frombuffer(primary_chunk, dtype=np.int16) background_arr = np.frombuffer(background_chunk, dtype=np.int16) if len(background_arr) < len(primary_arr): background_arr = np.pad(background_arr, (0, len(primary_arr) - len(background_arr)), 'constant') elif len(background_arr) > len(primary_arr): background_arr = background_arr[:len(primary_arr)] # Sum in int32 then clip to int16 range to avoid wrap-around distortion on loud peaks. mixed_arr = np.clip( primary_arr.astype(np.int32) + background_arr.astype(np.int32), np.iinfo(np.int16).min, np.iinfo(np.int16).max, ).astype(np.int16) return mixed_arr.tobytes() async def recv(self) ‑> av.audio.frame.AudioFrame-
Expand source code
async def recv(self) -> AudioFrame: """ Overrides base method to perform mixing and just-in-time frame creation. """ try: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time() self._timestamp = 0 else: self._timestamp += self.samples wait = self._start + (self._timestamp / self.sample_rate) - time() if wait > 0: await asyncio.sleep(wait) pts, time_base = self.next_timestamp() primary_chunk = b'' has_primary = len(self.audio_data_buffer) >= self.chunk_size if has_primary: primary_chunk = self.audio_data_buffer[: self.chunk_size] self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :] self._samples_played += self.samples self._is_speaking = True self._last_speaking_time = time() elif getattr(self, "_is_speaking", False): # Apply grace period for mixing track as well if (time() - self._last_speaking_time) >= self._speaking_grace_period: self._is_speaking = False if self._synthesis_complete: # TTS finished and buffer drained — fire callback self._synthesis_complete = False self._needs_last_audio_callback = False logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.") if hasattr(self, "_last_audio_callback") and self._last_audio_callback: asyncio.create_task(self._last_audio_callback()) else: # Buffer temporarily empty but synthesis still in progress logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.") self._needs_last_audio_callback = True background_chunk = b'' has_background = len(self.background_audio_buffer) >= self.chunk_size if has_background: background_chunk = self.background_audio_buffer[: self.chunk_size] self.background_audio_buffer = self.background_audio_buffer[self.chunk_size :] final_chunk = None if has_primary: final_chunk = self.mix_audio(primary_chunk, background_chunk) elif has_background: final_chunk = background_chunk if final_chunk: frame = self.buildAudioFrames(final_chunk) else: frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) frame.pts = pts frame.time_base = time_base frame.sample_rate = self.sample_rate return frame except MediaStreamError: raise except Exception as e: traceback.print_exc() logger.error(f"Error while creating tts->rtc frame: {e}")Overrides base method to perform mixing and just-in-time frame creation.
Inherited members
class TeeCustomAudioStreamTrack (loop, sinks=None, pipeline=None)-
Expand source code
class TeeCustomAudioStreamTrack(CustomAudioStreamTrack): """Audio track that duplicates outgoing audio bytes to registered sinks such as avatar plugins or local speakers.""" def __init__(self, loop, sinks=None, pipeline=None): super().__init__(loop) self.sinks = sinks if sinks is not None else [] self.pipeline = pipeline def add_sink(self, sink): """Add a new sink (callback or object)""" if sink not in self.sinks: self.sinks.append(sink) def remove_sink(self, sink): if sink in self.sinks: self.sinks.remove(sink) async def add_new_bytes(self, audio_data: bytes): await super().add_new_bytes(audio_data) # Route audio to sinks (avatars, etc.) for sink in self.sinks: try: if hasattr(sink, "handle_audio_input"): await sink.handle_audio_input(audio_data) elif callable(sink): if asyncio.iscoroutinefunction(sink): await sink(audio_data) else: sink(audio_data) except Exception as e: import logging as _logging _logging.getLogger(__name__).warning("Avatar sink error (audio will continue): %s", e) async def recv(self) -> AudioFrame: """ When avatar sinks are present the Avatar Server publishes audio to the meeting, so we must NOT publish the raw TTS audio directly (it would cause double audio). We still call super().recv() so that all internal state tracking and callbacks (is_speaking, on_first_audio_byte, on_last_audio_byte) continue to work correctly. The actual audio payload is replaced with silence before handing back to WebRTC. """ frame = await super().recv() if self.sinks and frame is not None: silence = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in silence.planes: p.update(bytes(p.buffer_size)) silence.pts = frame.pts silence.time_base = frame.time_base silence.sample_rate = frame.sample_rate return silence return frameAudio track that duplicates outgoing audio bytes to registered sinks such as avatar plugins or local speakers.
Ancestors
- CustomAudioStreamTrack
- videosdk.custom_audio_track.CustomAudioTrack
- vsaiortc.mediastreams.MediaStreamTrack
- pyee.asyncio.AsyncIOEventEmitter
- pyee.base.EventEmitter
Subclasses
Methods
def add_sink(self, sink)-
Expand source code
def add_sink(self, sink): """Add a new sink (callback or object)""" if sink not in self.sinks: self.sinks.append(sink)Add a new sink (callback or object)
async def recv(self) ‑> av.audio.frame.AudioFrame-
Expand source code
async def recv(self) -> AudioFrame: """ When avatar sinks are present the Avatar Server publishes audio to the meeting, so we must NOT publish the raw TTS audio directly (it would cause double audio). We still call super().recv() so that all internal state tracking and callbacks (is_speaking, on_first_audio_byte, on_last_audio_byte) continue to work correctly. The actual audio payload is replaced with silence before handing back to WebRTC. """ frame = await super().recv() if self.sinks and frame is not None: silence = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in silence.planes: p.update(bytes(p.buffer_size)) silence.pts = frame.pts silence.time_base = frame.time_base silence.sample_rate = frame.sample_rate return silence return frameWhen avatar sinks are present the Avatar Server publishes audio to the meeting, so we must NOT publish the raw TTS audio directly (it would cause double audio). We still call super().recv() so that all internal state tracking and callbacks (is_speaking, on_first_audio_byte, on_last_audio_byte) continue to work correctly. The actual audio payload is replaced with silence before handing back to WebRTC.
def remove_sink(self, sink)-
Expand source code
def remove_sink(self, sink): if sink in self.sinks: self.sinks.remove(sink)
Inherited members
class TeeMixingCustomAudioStreamTrack (loop, sinks=None, pipeline=None)-
Expand source code
class TeeMixingCustomAudioStreamTrack(MixingCustomAudioStreamTrack): """Combines mixing and tee functionality, mixing background audio while also forwarding audio bytes to registered sinks.""" def __init__(self, loop, sinks=None, pipeline=None): super().__init__(loop) self.sinks = sinks if sinks is not None else [] self.pipeline = pipeline async def add_new_bytes(self, audio_data: bytes): await super().add_new_bytes(audio_data) # Route audio to sinks (avatars, etc.) for sink in self.sinks: try: if hasattr(sink, "handle_audio_input"): await sink.handle_audio_input(audio_data) elif callable(sink): if asyncio.iscoroutinefunction(sink): await sink(audio_data) else: sink(audio_data) except Exception as e: import logging as _logging _logging.getLogger(__name__).warning("Avatar sink error (audio will continue): %s", e) async def recv(self) -> AudioFrame: """Silence the direct WebRTC output when avatar sinks are handling playback.""" frame = await super().recv() if self.sinks and frame is not None: silence = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in silence.planes: p.update(bytes(p.buffer_size)) silence.pts = frame.pts silence.time_base = frame.time_base silence.sample_rate = frame.sample_rate return silence return frameCombines mixing and tee functionality, mixing background audio while also forwarding audio bytes to registered sinks.
Ancestors
- MixingCustomAudioStreamTrack
- CustomAudioStreamTrack
- videosdk.custom_audio_track.CustomAudioTrack
- vsaiortc.mediastreams.MediaStreamTrack
- pyee.asyncio.AsyncIOEventEmitter
- pyee.base.EventEmitter
Methods
async def recv(self) ‑> av.audio.frame.AudioFrame-
Expand source code
async def recv(self) -> AudioFrame: """Silence the direct WebRTC output when avatar sinks are handling playback.""" frame = await super().recv() if self.sinks and frame is not None: silence = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in silence.planes: p.update(bytes(p.buffer_size)) silence.pts = frame.pts silence.time_base = frame.time_base silence.sample_rate = frame.sample_rate return silence return frameSilence the direct WebRTC output when avatar sinks are handling playback.
Inherited members