Module agents.room.output_stream
Classes
class CustomAudioStreamTrack (loop)-
Expand source code
class CustomAudioStreamTrack(CustomAudioTrack): """ Base audio track implementation using a frame buffer. Audio frames are created as soon as audio data is received. Supports optional pause/resume for false-interrupt detection while maintaining compatibility with avatar plugins that need simple audio flow. """ def __init__(self, loop): super().__init__() self.loop = loop self._start = None self._timestamp = 0 self.frame_buffer = [] self.audio_data_buffer = bytearray() self.frame_time = 0 self.sample_rate = 24000 self.channels = 1 self.sample_width = 2 self.time_base_fraction = Fraction(1, self.sample_rate) self.samples = int(AUDIO_PTIME * self.sample_rate) self.chunk_size = int(self.samples * self.channels * self.sample_width) self._synthesis_complete = False self._needs_last_audio_callback = False self._last_speaking_time = 0.0 self._speaking_grace_period = 0.5 # 500ms grace period for jitter # Pause/resume support - simple flag-based (no blocking) self._is_paused = False self._paused_frames = [] # Separate buffer for paused content self._accepting_audio = True self._manual_audio_control = False @property def can_pause(self) -> bool: """Returns True if this track supports pause/resume operations""" return True def interrupt(self): """Clear all buffers and reset state""" logger.info("Audio track interrupted, clearing buffers.") self.frame_buffer.clear() self.audio_data_buffer.clear() self._paused_frames.clear() self._is_paused = False self._last_speaking_time = 0.0 self._synthesis_complete = False self._needs_last_audio_callback = False # Handle manual audio control mode if self._manual_audio_control: self._accepting_audio = False else: self._accepting_audio = True async def pause(self) -> None: """ Pause audio playback. Instead of blocking recv(), we move remaining frames to a separate buffer so they can be resumed later. This approach keeps the audio flow simple for avatars. """ if self._is_paused: logger.warning("Audio track already paused") return logger.info("Audio track paused - preserving current buffer state.") self._is_paused = True # Move current frames to paused buffer for later resume self._paused_frames = self.frame_buffer.copy() self.frame_buffer.clear() async def resume(self) -> None: """ Resume audio playback from paused position. Restores frames that were saved when paused. """ if not self._is_paused: logger.warning("Audio track not paused, nothing to resume") return logger.info("Audio track resumed - restoring paused buffer.") self._is_paused = False # Restore frames from paused buffer self.frame_buffer = self._paused_frames.copy() self._paused_frames.clear() def enable_audio_input(self, manual_control: bool = False): """ Allow fresh audio data to be buffered. When manual_control is True, future interrupts will pause intake until this method is called again. This is useful for preventing old audio from bleeding into new responses. """ self._manual_audio_control = manual_control self._accepting_audio = True logger.debug(f"Audio input enabled (manual_control={manual_control})") def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when the final audio byte of synthesis is produced""" logger.info("on last audio callback") self._last_audio_callback = callback self._synthesis_complete = False self._needs_last_audio_callback = False @property def is_speaking(self) -> bool: """ True if the track is currently playing audio or has buffered data, including a small grace period to bridge gaps in streaming TTS. """ has_data = len(self.frame_buffer) > 0 or len(self.audio_data_buffer) > 0 if has_data: return True return (time() - self._last_speaking_time) < self._speaking_grace_period def mark_synthesis_complete(self) -> None: """ Mark that TTS synthesis has finished sending all audio data. If the buffer is already empty (all audio consumed), fires the on_last_audio_byte callback immediately. Otherwise, the callback will fire when recv() drains the remaining buffer. """ self._synthesis_complete = True # If we're not currently speaking (grace period passed) and buffer is empty, fire immediately if not self.is_speaking and self._needs_last_audio_callback: self._needs_last_audio_callback = False self._synthesis_complete = False logger.info("[AudioTrack] Synthesis complete and buffer already empty — triggering last_audio_callback.") if hasattr(self, "_last_audio_callback") and self._last_audio_callback: asyncio.create_task(self._last_audio_callback()) async def add_new_bytes(self, audio_data: bytes): """ Add new audio bytes to the buffer. Respects _accepting_audio flag for manual audio control mode. """ if not self._accepting_audio: logger.debug("Audio input currently disabled, dropping audio data") return self.audio_data_buffer += audio_data while len(self.audio_data_buffer) >= self.chunk_size: chunk = self.audio_data_buffer[: self.chunk_size] self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :] try: audio_frame = self.buildAudioFrames(chunk) # If paused, add to paused buffer instead if self._is_paused: self._paused_frames.append(audio_frame) logger.debug("Added frame to paused buffer") else: self.frame_buffer.append(audio_frame) logger.debug( f"Added audio frame to buffer, total frames: {len(self.frame_buffer)}" ) except Exception as e: logger.error(f"Error building audio frame: {e}") break def buildAudioFrames(self, chunk: bytes) -> AudioFrame: if len(chunk) != self.chunk_size: logger.warning( f"Incorrect chunk size received {len(chunk)}, expected {self.chunk_size}" ) data = np.frombuffer(chunk, dtype=np.int16) expected_samples = self.samples * self.channels if len(data) != expected_samples: logger.warning( f"Incorrect number of samples in chunk {len(data)}, expected {expected_samples}" ) data = data.reshape(-1, self.channels) layout = "mono" if self.channels == 1 else "stereo" audio_frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout) return audio_frame def next_timestamp(self): pts = int(self.frame_time) time_base = self.time_base_fraction self.frame_time += self.samples return pts, time_base async def recv(self) -> AudioFrame: """ Receive next audio frame. When paused, produces silence frames but keeps timing synchronized. This ensures smooth resume without audio glitches. """ try: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time() self._timestamp = 0 else: self._timestamp += self.samples wait = self._start + (self._timestamp / self.sample_rate) - time() if wait > 0: await asyncio.sleep(wait) pts, time_base = self.next_timestamp() # When paused, always produce silence but keep timing # This allows smooth resume without timing jumps if self._is_paused: frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) elif len(self.frame_buffer) > 0: frame = self.frame_buffer.pop(0) self._is_speaking = True self._last_speaking_time = time() else: # No audio data available — silence if getattr(self, "_is_speaking", False): # Only declare we've stopped speaking if the grace period has passed # This bridges gaps in streaming TTS (like Sarvam jitter) if (time() - self._last_speaking_time) >= self._speaking_grace_period: self._is_speaking = False if self._synthesis_complete: # TTS finished and buffer drained — fire callback self._synthesis_complete = False self._needs_last_audio_callback = False logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.") if hasattr(self, "_last_audio_callback") and self._last_audio_callback: asyncio.create_task(self._last_audio_callback()) else: # Buffer temporarily empty but synthesis still in progress logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.") self._needs_last_audio_callback = True # Produce silence frame frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) frame.pts = pts frame.time_base = time_base frame.sample_rate = self.sample_rate return frame except MediaStreamError: raise except Exception as e: traceback.print_exc() logger.error(f"Error while creating tts->rtc frame: {e}") async def cleanup(self): self.interrupt() self.stop()Base audio track implementation using a frame buffer. Audio frames are created as soon as audio data is received.
Supports optional pause/resume for false-interrupt detection while maintaining compatibility with avatar plugins that need simple audio flow.
Ancestors
- videosdk.custom_audio_track.CustomAudioTrack
- vsaiortc.mediastreams.MediaStreamTrack
- pyee.asyncio.AsyncIOEventEmitter
- pyee.base.EventEmitter
Subclasses
Instance variables
prop can_pause : bool-
Expand source code
@property def can_pause(self) -> bool: """Returns True if this track supports pause/resume operations""" return TrueReturns True if this track supports pause/resume operations
prop is_speaking : bool-
Expand source code
@property def is_speaking(self) -> bool: """ True if the track is currently playing audio or has buffered data, including a small grace period to bridge gaps in streaming TTS. """ has_data = len(self.frame_buffer) > 0 or len(self.audio_data_buffer) > 0 if has_data: return True return (time() - self._last_speaking_time) < self._speaking_grace_periodTrue if the track is currently playing audio or has buffered data, including a small grace period to bridge gaps in streaming TTS.
Methods
async def add_new_bytes(self, audio_data: bytes)-
Expand source code
async def add_new_bytes(self, audio_data: bytes): """ Add new audio bytes to the buffer. Respects _accepting_audio flag for manual audio control mode. """ if not self._accepting_audio: logger.debug("Audio input currently disabled, dropping audio data") return self.audio_data_buffer += audio_data while len(self.audio_data_buffer) >= self.chunk_size: chunk = self.audio_data_buffer[: self.chunk_size] self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :] try: audio_frame = self.buildAudioFrames(chunk) # If paused, add to paused buffer instead if self._is_paused: self._paused_frames.append(audio_frame) logger.debug("Added frame to paused buffer") else: self.frame_buffer.append(audio_frame) logger.debug( f"Added audio frame to buffer, total frames: {len(self.frame_buffer)}" ) except Exception as e: logger.error(f"Error building audio frame: {e}") breakAdd new audio bytes to the buffer. Respects _accepting_audio flag for manual audio control mode.
def buildAudioFrames(self, chunk: bytes) ‑> av.audio.frame.AudioFrame-
Expand source code
def buildAudioFrames(self, chunk: bytes) -> AudioFrame: if len(chunk) != self.chunk_size: logger.warning( f"Incorrect chunk size received {len(chunk)}, expected {self.chunk_size}" ) data = np.frombuffer(chunk, dtype=np.int16) expected_samples = self.samples * self.channels if len(data) != expected_samples: logger.warning( f"Incorrect number of samples in chunk {len(data)}, expected {expected_samples}" ) data = data.reshape(-1, self.channels) layout = "mono" if self.channels == 1 else "stereo" audio_frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout) return audio_frame async def cleanup(self)-
Expand source code
async def cleanup(self): self.interrupt() self.stop() def enable_audio_input(self, manual_control: bool = False)-
Expand source code
def enable_audio_input(self, manual_control: bool = False): """ Allow fresh audio data to be buffered. When manual_control is True, future interrupts will pause intake until this method is called again. This is useful for preventing old audio from bleeding into new responses. """ self._manual_audio_control = manual_control self._accepting_audio = True logger.debug(f"Audio input enabled (manual_control={manual_control})")Allow fresh audio data to be buffered. When manual_control is True, future interrupts will pause intake until this method is called again.
This is useful for preventing old audio from bleeding into new responses.
def interrupt(self)-
Expand source code
def interrupt(self): """Clear all buffers and reset state""" logger.info("Audio track interrupted, clearing buffers.") self.frame_buffer.clear() self.audio_data_buffer.clear() self._paused_frames.clear() self._is_paused = False self._last_speaking_time = 0.0 self._synthesis_complete = False self._needs_last_audio_callback = False # Handle manual audio control mode if self._manual_audio_control: self._accepting_audio = False else: self._accepting_audio = TrueClear all buffers and reset state
def mark_synthesis_complete(self) ‑> None-
Expand source code
def mark_synthesis_complete(self) -> None: """ Mark that TTS synthesis has finished sending all audio data. If the buffer is already empty (all audio consumed), fires the on_last_audio_byte callback immediately. Otherwise, the callback will fire when recv() drains the remaining buffer. """ self._synthesis_complete = True # If we're not currently speaking (grace period passed) and buffer is empty, fire immediately if not self.is_speaking and self._needs_last_audio_callback: self._needs_last_audio_callback = False self._synthesis_complete = False logger.info("[AudioTrack] Synthesis complete and buffer already empty — triggering last_audio_callback.") if hasattr(self, "_last_audio_callback") and self._last_audio_callback: asyncio.create_task(self._last_audio_callback())Mark that TTS synthesis has finished sending all audio data. If the buffer is already empty (all audio consumed), fires the on_last_audio_byte callback immediately. Otherwise, the callback will fire when recv() drains the remaining buffer.
def next_timestamp(self)-
Expand source code
def next_timestamp(self): pts = int(self.frame_time) time_base = self.time_base_fraction self.frame_time += self.samples return pts, time_base def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) ‑> None-
Expand source code
def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when the final audio byte of synthesis is produced""" logger.info("on last audio callback") self._last_audio_callback = callback self._synthesis_complete = False self._needs_last_audio_callback = FalseSet callback for when the final audio byte of synthesis is produced
async def pause(self) ‑> None-
Expand source code
async def pause(self) -> None: """ Pause audio playback. Instead of blocking recv(), we move remaining frames to a separate buffer so they can be resumed later. This approach keeps the audio flow simple for avatars. """ if self._is_paused: logger.warning("Audio track already paused") return logger.info("Audio track paused - preserving current buffer state.") self._is_paused = True # Move current frames to paused buffer for later resume self._paused_frames = self.frame_buffer.copy() self.frame_buffer.clear()Pause audio playback. Instead of blocking recv(), we move remaining frames to a separate buffer so they can be resumed later. This approach keeps the audio flow simple for avatars.
async def recv(self) ‑> av.audio.frame.AudioFrame-
Expand source code
async def recv(self) -> AudioFrame: """ Receive next audio frame. When paused, produces silence frames but keeps timing synchronized. This ensures smooth resume without audio glitches. """ try: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time() self._timestamp = 0 else: self._timestamp += self.samples wait = self._start + (self._timestamp / self.sample_rate) - time() if wait > 0: await asyncio.sleep(wait) pts, time_base = self.next_timestamp() # When paused, always produce silence but keep timing # This allows smooth resume without timing jumps if self._is_paused: frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) elif len(self.frame_buffer) > 0: frame = self.frame_buffer.pop(0) self._is_speaking = True self._last_speaking_time = time() else: # No audio data available — silence if getattr(self, "_is_speaking", False): # Only declare we've stopped speaking if the grace period has passed # This bridges gaps in streaming TTS (like Sarvam jitter) if (time() - self._last_speaking_time) >= self._speaking_grace_period: self._is_speaking = False if self._synthesis_complete: # TTS finished and buffer drained — fire callback self._synthesis_complete = False self._needs_last_audio_callback = False logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.") if hasattr(self, "_last_audio_callback") and self._last_audio_callback: asyncio.create_task(self._last_audio_callback()) else: # Buffer temporarily empty but synthesis still in progress logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.") self._needs_last_audio_callback = True # Produce silence frame frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) frame.pts = pts frame.time_base = time_base frame.sample_rate = self.sample_rate return frame except MediaStreamError: raise except Exception as e: traceback.print_exc() logger.error(f"Error while creating tts->rtc frame: {e}")Receive next audio frame. When paused, produces silence frames but keeps timing synchronized. This ensures smooth resume without audio glitches.
async def resume(self) ‑> None-
Expand source code
async def resume(self) -> None: """ Resume audio playback from paused position. Restores frames that were saved when paused. """ if not self._is_paused: logger.warning("Audio track not paused, nothing to resume") return logger.info("Audio track resumed - restoring paused buffer.") self._is_paused = False # Restore frames from paused buffer self.frame_buffer = self._paused_frames.copy() self._paused_frames.clear()Resume audio playback from paused position. Restores frames that were saved when paused.
class MediaStreamError (*args, **kwargs)-
Expand source code
class MediaStreamError(Exception): passCommon base class for all non-exit exceptions.
Ancestors
- builtins.Exception
- builtins.BaseException
class MixingCustomAudioStreamTrack (loop)-
Expand source code
class MixingCustomAudioStreamTrack(CustomAudioStreamTrack): """Audio track that mixes primary TTS audio with a background audio buffer, creating frames just-in-time during recv.""" def __init__(self, loop): super().__init__(loop) self.background_audio_buffer = bytearray() def interrupt(self): super().interrupt() self.background_audio_buffer.clear() async def add_new_bytes(self, audio_data: bytes): """Overrides base method to buffer bytes instead of creating frames.""" self.audio_data_buffer += audio_data async def add_background_bytes(self, audio_data: bytes): self.background_audio_buffer += audio_data def mix_audio(self, primary_chunk, background_chunk): if not background_chunk: return primary_chunk primary_arr = np.frombuffer(primary_chunk, dtype=np.int16) background_arr = np.frombuffer(background_chunk, dtype=np.int16) if len(background_arr) < len(primary_arr): background_arr = np.pad(background_arr, (0, len(primary_arr) - len(background_arr)), 'constant') elif len(background_arr) > len(primary_arr): background_arr = background_arr[:len(primary_arr)] mixed_arr = np.add(primary_arr, background_arr, dtype=np.int16) return mixed_arr.tobytes() def on_last_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when the final audio byte of synthesis is produced""" logger.info("on last audio callback") self._last_audio_callback = callback self._synthesis_complete = False self._needs_last_audio_callback = False async def recv(self) -> AudioFrame: """ Overrides base method to perform mixing and just-in-time frame creation. """ try: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time() self._timestamp = 0 else: self._timestamp += self.samples wait = self._start + (self._timestamp / self.sample_rate) - time() if wait > 0: await asyncio.sleep(wait) pts, time_base = self.next_timestamp() primary_chunk = b'' has_primary = len(self.audio_data_buffer) >= self.chunk_size if has_primary: primary_chunk = self.audio_data_buffer[: self.chunk_size] self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :] self._is_speaking = True self._last_speaking_time = time() elif getattr(self, "_is_speaking", False): # Apply grace period for mixing track as well if (time() - self._last_speaking_time) >= self._speaking_grace_period: self._is_speaking = False if self._synthesis_complete: # TTS finished and buffer drained — fire callback self._synthesis_complete = False self._needs_last_audio_callback = False logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.") if hasattr(self, "_last_audio_callback") and self._last_audio_callback: asyncio.create_task(self._last_audio_callback()) else: # Buffer temporarily empty but synthesis still in progress logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.") self._needs_last_audio_callback = True background_chunk = b'' has_background = len(self.background_audio_buffer) >= self.chunk_size if has_background: background_chunk = self.background_audio_buffer[: self.chunk_size] self.background_audio_buffer = self.background_audio_buffer[self.chunk_size :] final_chunk = None if has_primary: final_chunk = self.mix_audio(primary_chunk, background_chunk) elif has_background: final_chunk = background_chunk if final_chunk: frame = self.buildAudioFrames(final_chunk) else: frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) frame.pts = pts frame.time_base = time_base frame.sample_rate = self.sample_rate return frame except MediaStreamError: raise except Exception as e: traceback.print_exc() logger.error(f"Error while creating tts->rtc frame: {e}")Audio track that mixes primary TTS audio with a background audio buffer, creating frames just-in-time during recv.
Ancestors
- CustomAudioStreamTrack
- videosdk.custom_audio_track.CustomAudioTrack
- vsaiortc.mediastreams.MediaStreamTrack
- pyee.asyncio.AsyncIOEventEmitter
- pyee.base.EventEmitter
Subclasses
Methods
async def add_background_bytes(self, audio_data: bytes)-
Expand source code
async def add_background_bytes(self, audio_data: bytes): self.background_audio_buffer += audio_data async def add_new_bytes(self, audio_data: bytes)-
Expand source code
async def add_new_bytes(self, audio_data: bytes): """Overrides base method to buffer bytes instead of creating frames.""" self.audio_data_buffer += audio_dataOverrides base method to buffer bytes instead of creating frames.
def mix_audio(self, primary_chunk, background_chunk)-
Expand source code
def mix_audio(self, primary_chunk, background_chunk): if not background_chunk: return primary_chunk primary_arr = np.frombuffer(primary_chunk, dtype=np.int16) background_arr = np.frombuffer(background_chunk, dtype=np.int16) if len(background_arr) < len(primary_arr): background_arr = np.pad(background_arr, (0, len(primary_arr) - len(background_arr)), 'constant') elif len(background_arr) > len(primary_arr): background_arr = background_arr[:len(primary_arr)] mixed_arr = np.add(primary_arr, background_arr, dtype=np.int16) return mixed_arr.tobytes() async def recv(self) ‑> av.audio.frame.AudioFrame-
Expand source code
async def recv(self) -> AudioFrame: """ Overrides base method to perform mixing and just-in-time frame creation. """ try: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time() self._timestamp = 0 else: self._timestamp += self.samples wait = self._start + (self._timestamp / self.sample_rate) - time() if wait > 0: await asyncio.sleep(wait) pts, time_base = self.next_timestamp() primary_chunk = b'' has_primary = len(self.audio_data_buffer) >= self.chunk_size if has_primary: primary_chunk = self.audio_data_buffer[: self.chunk_size] self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :] self._is_speaking = True self._last_speaking_time = time() elif getattr(self, "_is_speaking", False): # Apply grace period for mixing track as well if (time() - self._last_speaking_time) >= self._speaking_grace_period: self._is_speaking = False if self._synthesis_complete: # TTS finished and buffer drained — fire callback self._synthesis_complete = False self._needs_last_audio_callback = False logger.info("[AudioTrack] Agent finished speaking — triggering last_audio_callback.") if hasattr(self, "_last_audio_callback") and self._last_audio_callback: asyncio.create_task(self._last_audio_callback()) else: # Buffer temporarily empty but synthesis still in progress logger.debug("[AudioTrack] Buffer empty — waiting for more TTS audio.") self._needs_last_audio_callback = True background_chunk = b'' has_background = len(self.background_audio_buffer) >= self.chunk_size if has_background: background_chunk = self.background_audio_buffer[: self.chunk_size] self.background_audio_buffer = self.background_audio_buffer[self.chunk_size :] final_chunk = None if has_primary: final_chunk = self.mix_audio(primary_chunk, background_chunk) elif has_background: final_chunk = background_chunk if final_chunk: frame = self.buildAudioFrames(final_chunk) else: frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) frame.pts = pts frame.time_base = time_base frame.sample_rate = self.sample_rate return frame except MediaStreamError: raise except Exception as e: traceback.print_exc() logger.error(f"Error while creating tts->rtc frame: {e}")Overrides base method to perform mixing and just-in-time frame creation.
Inherited members
class TeeCustomAudioStreamTrack (loop, sinks=None, pipeline=None)-
Expand source code
class TeeCustomAudioStreamTrack(CustomAudioStreamTrack): """Audio track that duplicates outgoing audio bytes to registered sinks such as avatar plugins or local speakers.""" def __init__(self, loop, sinks=None, pipeline=None): super().__init__(loop) self.sinks = sinks if sinks is not None else [] self.pipeline = pipeline def add_sink(self, sink): """Add a new sink (callback or object)""" if sink not in self.sinks: self.sinks.append(sink) def remove_sink(self, sink): if sink in self.sinks: self.sinks.remove(sink) async def add_new_bytes(self, audio_data: bytes): await super().add_new_bytes(audio_data) # Route audio to sinks (avatars, etc.) for sink in self.sinks: try: if hasattr(sink, "handle_audio_input"): await sink.handle_audio_input(audio_data) elif callable(sink): if asyncio.iscoroutinefunction(sink): await sink(audio_data) else: sink(audio_data) except Exception as e: import logging as _logging _logging.getLogger(__name__).warning("Avatar sink error (audio will continue): %s", e) async def recv(self) -> AudioFrame: """ When avatar sinks are present the Avatar Server publishes audio to the meeting, so we must NOT publish the raw TTS audio directly (it would cause double audio). We still call super().recv() so that all internal state tracking and callbacks (is_speaking, on_first_audio_byte, on_last_audio_byte) continue to work correctly. The actual audio payload is replaced with silence before handing back to WebRTC. """ frame = await super().recv() if self.sinks and frame is not None: silence = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in silence.planes: p.update(bytes(p.buffer_size)) silence.pts = frame.pts silence.time_base = frame.time_base silence.sample_rate = frame.sample_rate return silence return frameAudio track that duplicates outgoing audio bytes to registered sinks such as avatar plugins or local speakers.
Ancestors
- CustomAudioStreamTrack
- videosdk.custom_audio_track.CustomAudioTrack
- vsaiortc.mediastreams.MediaStreamTrack
- pyee.asyncio.AsyncIOEventEmitter
- pyee.base.EventEmitter
Subclasses
Methods
def add_sink(self, sink)-
Expand source code
def add_sink(self, sink): """Add a new sink (callback or object)""" if sink not in self.sinks: self.sinks.append(sink)Add a new sink (callback or object)
async def recv(self) ‑> av.audio.frame.AudioFrame-
Expand source code
async def recv(self) -> AudioFrame: """ When avatar sinks are present the Avatar Server publishes audio to the meeting, so we must NOT publish the raw TTS audio directly (it would cause double audio). We still call super().recv() so that all internal state tracking and callbacks (is_speaking, on_first_audio_byte, on_last_audio_byte) continue to work correctly. The actual audio payload is replaced with silence before handing back to WebRTC. """ frame = await super().recv() if self.sinks and frame is not None: silence = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in silence.planes: p.update(bytes(p.buffer_size)) silence.pts = frame.pts silence.time_base = frame.time_base silence.sample_rate = frame.sample_rate return silence return frameWhen avatar sinks are present the Avatar Server publishes audio to the meeting, so we must NOT publish the raw TTS audio directly (it would cause double audio). We still call super().recv() so that all internal state tracking and callbacks (is_speaking, on_first_audio_byte, on_last_audio_byte) continue to work correctly. The actual audio payload is replaced with silence before handing back to WebRTC.
def remove_sink(self, sink)-
Expand source code
def remove_sink(self, sink): if sink in self.sinks: self.sinks.remove(sink)
Inherited members
class TeeMixingCustomAudioStreamTrack (loop, sinks=None, pipeline=None)-
Expand source code
class TeeMixingCustomAudioStreamTrack(MixingCustomAudioStreamTrack): """Combines mixing and tee functionality, mixing background audio while also forwarding audio bytes to registered sinks.""" def __init__(self, loop, sinks=None, pipeline=None): super().__init__(loop) self.sinks = sinks if sinks is not None else [] self.pipeline = pipeline async def add_new_bytes(self, audio_data: bytes): await super().add_new_bytes(audio_data) # Route audio to sinks (avatars, etc.) for sink in self.sinks: try: if hasattr(sink, "handle_audio_input"): await sink.handle_audio_input(audio_data) elif callable(sink): if asyncio.iscoroutinefunction(sink): await sink(audio_data) else: sink(audio_data) except Exception as e: import logging as _logging _logging.getLogger(__name__).warning("Avatar sink error (audio will continue): %s", e) async def recv(self) -> AudioFrame: """Silence the direct WebRTC output when avatar sinks are handling playback.""" frame = await super().recv() if self.sinks and frame is not None: silence = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in silence.planes: p.update(bytes(p.buffer_size)) silence.pts = frame.pts silence.time_base = frame.time_base silence.sample_rate = frame.sample_rate return silence return frameCombines mixing and tee functionality, mixing background audio while also forwarding audio bytes to registered sinks.
Ancestors
- MixingCustomAudioStreamTrack
- CustomAudioStreamTrack
- videosdk.custom_audio_track.CustomAudioTrack
- vsaiortc.mediastreams.MediaStreamTrack
- pyee.asyncio.AsyncIOEventEmitter
- pyee.base.EventEmitter
Methods
async def recv(self) ‑> av.audio.frame.AudioFrame-
Expand source code
async def recv(self) -> AudioFrame: """Silence the direct WebRTC output when avatar sinks are handling playback.""" frame = await super().recv() if self.sinks and frame is not None: silence = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in silence.planes: p.update(bytes(p.buffer_size)) silence.pts = frame.pts silence.time_base = frame.time_base silence.sample_rate = frame.sample_rate return silence return frameSilence the direct WebRTC output when avatar sinks are handling playback.
Inherited members