Module videosdk.plugins.silero.vad

Classes

class SileroVAD (input_sample_rate: int = 48000,
model_sample_rate: Literal[8000, 16000] = 16000,
threshold: float = 0.5,
start_threshold: float = 0.4,
end_threshold: float = 0.25,
min_speech_duration: float = 0.3,
min_silence_duration: float = 0.4,
padding_duration: float = 0.5,
max_buffered_speech: float = 60.0,
force_cpu: bool = True)
Expand source code
class SileroVAD(BaseVAD):
    """Silero Voice Activity Detection implementation using ONNX runtime.
    
    This implementation buffers audio, runs it through the Silero model,
    applies exponential smoothing to the probabilities to accurately detect the start and end of speech.
    """

    def __init__(
        self,
        input_sample_rate: int = 48000,
        model_sample_rate: Literal[8000, 16000] = 16000,
        threshold: float = 0.5,
        start_threshold: float = 0.4,
        end_threshold: float = 0.25,
        min_speech_duration: float = 0.3,
        min_silence_duration: float = 0.4,
        padding_duration: float = 0.5,
        max_buffered_speech: float = 60.0,
        force_cpu: bool = True,
    ) -> None:
        if model_sample_rate not in SAMPLE_RATES:
            self.emit("error", f"Invalid model sample rate {model_sample_rate}: must be one of {SAMPLE_RATES}")
            raise ValueError(f"Model sample rate {model_sample_rate} not supported. Must be one of {SAMPLE_RATES}")

        super().__init__(
            sample_rate=model_sample_rate,
            threshold=threshold,
            min_speech_duration=min_speech_duration,
            min_silence_duration=min_silence_duration,
        )

        # Config properties
        self._start_thresh = start_threshold
        self._stop_thresh = end_threshold
        self._padding_sec = padding_duration
        self._max_buffer_sec = max_buffered_speech

        self._in_rate = input_sample_rate
        self._mod_rate = model_sample_rate
        self._requires_resample = input_sample_rate != model_sample_rate

        try:
            self._onnx_sess = VadModelWrapper.create_inference_session(force_cpu)
            self._silero = VadModelWrapper(session=self._onnx_sess, rate=model_sample_rate)
        except Exception as e:
            self.emit("error", f"Failed to init VAD model: {e}")
            raise

        # Smoothing
        self._smooth_factor = 0.35
        self._smoothed_prob = 0.0

        # State tracking
        self._speech_run_time = 0.0
        self._silence_run_time = 0.0

        self._is_active = False
        self._active_speech_time = 0.0
        self._active_silence_time = 0.0
        self._total_time = 0.0

        self._fract_offset = 0.0

        # Buffers
        self._raw_queue = np.array([], dtype=np.int16)
        self._model_queue = np.array([], dtype=np.float32)

        # Padding / Speech
        self._pad_frames = int(self._padding_sec * self._in_rate)
        buffer_size = int(self._max_buffer_sec * self._in_rate) + self._pad_frames
        self._audio_capture = np.empty(buffer_size, dtype=np.int16)
        self._capture_ptr = 0
        self._buffer_full = False

        self._lag_time = 0.0

    def _smooth_probability(self, val: float) -> float:
        self._smoothed_prob = (self._smooth_factor * val) + ((1 - self._smooth_factor) * self._smoothed_prob)
        return self._smoothed_prob

    def _flush_capture_buffer(self) -> None:
        if self._capture_ptr <= self._pad_frames:
            return

        retained = self._audio_capture[self._capture_ptr - self._pad_frames : self._capture_ptr].copy()
        self._buffer_full = False
        self._audio_capture[: self._pad_frames] = retained
        self._capture_ptr = self._pad_frames

    async def process_audio(self, audio_frames: bytes, **kwargs: Any) -> None:
        try:
            incoming = np.frombuffer(audio_frames, dtype=np.int16)

            self._raw_queue = np.concatenate([self._raw_queue, incoming])

            if self._requires_resample:
                normalized = incoming.astype(np.float32) / 32768.0
                target_len = int(len(normalized) * self._mod_rate / self._in_rate)
                if target_len > 0:
                    resampled = signal.resample(normalized, target_len)
                    self._model_queue = np.concatenate([self._model_queue, resampled.astype(np.float32)])
            else:
                normalized = incoming.astype(np.float32) / 32768.0
                self._model_queue = np.concatenate([self._model_queue, normalized])

            frame_size = self._silero.frame_size

            while len(self._model_queue) >= frame_size:
                t0 = time.perf_counter()
                chunk = self._model_queue[:frame_size]

                try:
                    p_raw = self._silero.process(chunk)
                except Exception as err:
                    self.emit("error", f"VAD error: {err}")
                    p_raw = 0.0

                p = self._smooth_probability(p_raw)
                step_time = frame_size / self._mod_rate
                self._total_time += step_time

                ratio = self._in_rate / self._mod_rate
                samples_needed = (frame_size * ratio) + self._fract_offset
                consume_count = int(samples_needed)
                self._fract_offset = samples_needed - consume_count

                space_left = len(self._audio_capture) - self._capture_ptr
                copy_amt = min(consume_count, space_left)

                if copy_amt > 0 and len(self._raw_queue) >= consume_count:
                    self._audio_capture[self._capture_ptr : self._capture_ptr + copy_amt] = self._raw_queue[:copy_amt]
                    self._capture_ptr += copy_amt
                elif not self._buffer_full:
                    self._buffer_full = True
                    logger.warning("VAD buffer full, dropping new samples")

                exec_time = time.perf_counter() - t0
                self._lag_time = max(0.0, self._lag_time + exec_time - step_time)
                if exec_time > INFERENCE_DELAY_TOLERANCE:
                    logger.warning(f"VAD slow: delay {self._lag_time:.3f}s")

                if self._is_active:
                    self._active_speech_time += step_time
                else:
                    self._active_silence_time += step_time

                if p >= self._start_thresh or (self._is_active and p > self._stop_thresh):
                    self._speech_run_time += step_time
                    self._silence_run_time = 0.0

                    if not self._is_active:
                        if self._speech_run_time >= self._min_speech_duration:
                            self._is_active = True
                            self._active_silence_time = 0.0
                            self._active_speech_time = self._speech_run_time
                            self._dispatch_event(VADEventType.START_OF_SPEECH)
                            logger.info("[VAD DEBUG]: START_OF_SPEECH")
                else:
                    self._silence_run_time += step_time
                    self._speech_run_time = 0.0

                    if not self._is_active:
                        self._flush_capture_buffer()

                    if self._is_active and self._silence_run_time >= self._min_silence_duration:
                        self._is_active = False
                        self._active_silence_time = self._silence_run_time
                        self._dispatch_event(VADEventType.END_OF_SPEECH)
                        logger.info("[VAD DEBUG]: END_OF_SPEECH")
                        self._active_speech_time = 0.0
                        self._flush_capture_buffer()

                if len(self._raw_queue) >= consume_count:
                    self._raw_queue = self._raw_queue[consume_count:]
                else:
                    self._raw_queue = np.array([], dtype=np.int16)

                self._model_queue = self._model_queue[frame_size:]

        except Exception as e:
            self.emit("error", f"VAD processing failed: {e}")

    def _dispatch_event(self, event_type: VADEventType) -> None:
        dur = self._active_speech_time
        if event_type == VADEventType.END_OF_SPEECH:
            dur = max(0.0, self._active_speech_time - self._silence_run_time)

        evt = VADResponse(
            event_type=event_type,
            data=VADData(
                is_speech=event_type == VADEventType.START_OF_SPEECH,
                confidence=self._smoothed_prob,
                timestamp=self._total_time,
                speech_duration=dur,
                silence_duration=self._active_silence_time,
            ),
        )
        if self._vad_callback:
            asyncio.create_task(self._vad_callback(evt))

    async def aclose(self) -> None:
        try:
            logger.info("SileroVAD garbage collection completed")

            self._raw_queue = np.array([], dtype=np.int16)
            self._model_queue = np.array([], dtype=np.float32)
            
            if hasattr(self, "_silero") and self._silero is not None:
                try:
                    if hasattr(self._silero, "_hidden_state"):
                        self._silero._hidden_state = None
                    if hasattr(self._silero, "_prev_context"):
                        self._silero._prev_context = None
                    if hasattr(self._silero, "_model_session"):
                        self._silero._model_session = None
                    self._silero = None
                except Exception as e:
                    logger.error(f"Error closing model: {e}")

            if hasattr(self, "_onnx_sess") and self._onnx_sess is not None:
                try:
                    self._onnx_sess = None
                except Exception as e:
                    logger.error(f"Error closing session: {e}")

            try:
                import gc
                gc.collect()
            except Exception as e:
                logger.error(f"GC error: {e}")

            await super().aclose()
        except Exception as e:
            self.emit("error", f"VAD close error: {e}")

Silero Voice Activity Detection implementation using ONNX runtime.

This implementation buffers audio, runs it through the Silero model, applies exponential smoothing to the probabilities to accurately detect the start and end of speech.

Ancestors

  • videosdk.agents.vad.VAD
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    try:
        logger.info("SileroVAD garbage collection completed")

        self._raw_queue = np.array([], dtype=np.int16)
        self._model_queue = np.array([], dtype=np.float32)
        
        if hasattr(self, "_silero") and self._silero is not None:
            try:
                if hasattr(self._silero, "_hidden_state"):
                    self._silero._hidden_state = None
                if hasattr(self._silero, "_prev_context"):
                    self._silero._prev_context = None
                if hasattr(self._silero, "_model_session"):
                    self._silero._model_session = None
                self._silero = None
            except Exception as e:
                logger.error(f"Error closing model: {e}")

        if hasattr(self, "_onnx_sess") and self._onnx_sess is not None:
            try:
                self._onnx_sess = None
            except Exception as e:
                logger.error(f"Error closing session: {e}")

        try:
            import gc
            gc.collect()
        except Exception as e:
            logger.error(f"GC error: {e}")

        await super().aclose()
    except Exception as e:
        self.emit("error", f"VAD close error: {e}")

Cleanup resources

async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None
Expand source code
async def process_audio(self, audio_frames: bytes, **kwargs: Any) -> None:
    try:
        incoming = np.frombuffer(audio_frames, dtype=np.int16)

        self._raw_queue = np.concatenate([self._raw_queue, incoming])

        if self._requires_resample:
            normalized = incoming.astype(np.float32) / 32768.0
            target_len = int(len(normalized) * self._mod_rate / self._in_rate)
            if target_len > 0:
                resampled = signal.resample(normalized, target_len)
                self._model_queue = np.concatenate([self._model_queue, resampled.astype(np.float32)])
        else:
            normalized = incoming.astype(np.float32) / 32768.0
            self._model_queue = np.concatenate([self._model_queue, normalized])

        frame_size = self._silero.frame_size

        while len(self._model_queue) >= frame_size:
            t0 = time.perf_counter()
            chunk = self._model_queue[:frame_size]

            try:
                p_raw = self._silero.process(chunk)
            except Exception as err:
                self.emit("error", f"VAD error: {err}")
                p_raw = 0.0

            p = self._smooth_probability(p_raw)
            step_time = frame_size / self._mod_rate
            self._total_time += step_time

            ratio = self._in_rate / self._mod_rate
            samples_needed = (frame_size * ratio) + self._fract_offset
            consume_count = int(samples_needed)
            self._fract_offset = samples_needed - consume_count

            space_left = len(self._audio_capture) - self._capture_ptr
            copy_amt = min(consume_count, space_left)

            if copy_amt > 0 and len(self._raw_queue) >= consume_count:
                self._audio_capture[self._capture_ptr : self._capture_ptr + copy_amt] = self._raw_queue[:copy_amt]
                self._capture_ptr += copy_amt
            elif not self._buffer_full:
                self._buffer_full = True
                logger.warning("VAD buffer full, dropping new samples")

            exec_time = time.perf_counter() - t0
            self._lag_time = max(0.0, self._lag_time + exec_time - step_time)
            if exec_time > INFERENCE_DELAY_TOLERANCE:
                logger.warning(f"VAD slow: delay {self._lag_time:.3f}s")

            if self._is_active:
                self._active_speech_time += step_time
            else:
                self._active_silence_time += step_time

            if p >= self._start_thresh or (self._is_active and p > self._stop_thresh):
                self._speech_run_time += step_time
                self._silence_run_time = 0.0

                if not self._is_active:
                    if self._speech_run_time >= self._min_speech_duration:
                        self._is_active = True
                        self._active_silence_time = 0.0
                        self._active_speech_time = self._speech_run_time
                        self._dispatch_event(VADEventType.START_OF_SPEECH)
                        logger.info("[VAD DEBUG]: START_OF_SPEECH")
            else:
                self._silence_run_time += step_time
                self._speech_run_time = 0.0

                if not self._is_active:
                    self._flush_capture_buffer()

                if self._is_active and self._silence_run_time >= self._min_silence_duration:
                    self._is_active = False
                    self._active_silence_time = self._silence_run_time
                    self._dispatch_event(VADEventType.END_OF_SPEECH)
                    logger.info("[VAD DEBUG]: END_OF_SPEECH")
                    self._active_speech_time = 0.0
                    self._flush_capture_buffer()

            if len(self._raw_queue) >= consume_count:
                self._raw_queue = self._raw_queue[consume_count:]
            else:
                self._raw_queue = np.array([], dtype=np.int16)

            self._model_queue = self._model_queue[frame_size:]

    except Exception as e:
        self.emit("error", f"VAD processing failed: {e}")

Process audio frames and detect voice activity

Args

audio_frames
Iterator of audio frames to process
**kwargs
Additional provider-specific arguments

Returns

AsyncIterator yielding VADResponse objects