Module videosdk.plugins.silero.onnx_runtime

Classes

class VadModelWrapper (*,
session: onnxruntime.capi.onnxruntime_inference_collection.InferenceSession,
rate: int)
Expand source code
class VadModelWrapper:
    def __init__(self, *, session: onnxruntime.InferenceSession, rate: int) -> None:
        if rate not in SAMPLE_RATES:
            raise ValueError(f"Rate {rate} not supported; use 8000 or 16000")
        
        self._model_session = session
        self._audio_rate = rate
        self._frame_size = 256 if rate == 8000 else 512
        self._history_len = 32 if rate == 8000 else 64
        
        self._hidden_state = np.zeros((2, 1, 128), dtype=np.float32)
        self._prev_context = np.zeros((1, self._history_len), dtype=np.float32)
        
    @property
    def frame_size(self) -> int:
        return self._frame_size
    
    @property
    def history_len(self) -> int:
        return self._history_len

    def process(self, input_audio: np.ndarray) -> float:
        if input_audio.ndim == 1:
            input_audio = input_audio.reshape(1, -1)
        if input_audio.ndim > 2:
            raise ValueError(f"Too many dimensions for input audio chunk {input_audio.ndim}")
        
        if self._audio_rate / input_audio.shape[1] > 31.25:
            raise ValueError("Input audio chunk is too short")
        
        num_samples = 512 if self._audio_rate == 16000 else 256
        if input_audio.shape[-1] != num_samples:
            raise ValueError(f"Provided number of samples is {input_audio.shape[-1]} (Supported values: 256 for 8000 sample rate, 512 for 16000)")
        
        buffer = np.concatenate((self._prev_context, input_audio.reshape(1, -1)), axis=1)
        
        inputs = {
            "input": buffer.astype(np.float32),
            "state": self._hidden_state,
            "sr": np.array([self._audio_rate], dtype=np.int64),
        }
        
        prob, state = self._model_session.run(None, inputs)
        
        self._hidden_state = state
        self._prev_context = buffer[:, -self._history_len:]
        
        return prob.item()
    
    @staticmethod
    def create_inference_session(use_cpu_only: bool) -> onnxruntime.InferenceSession:
        model_path = importlib.resources.files("videosdk.plugins.silero.model") / "silero_vad.onnx"
        with importlib.resources.as_file(model_path) as temp_path:
            session_opts = onnxruntime.SessionOptions()
            session_opts.inter_op_num_threads = 1
            session_opts.intra_op_num_threads = 1
            session_opts.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL

            providers = (
                ["CPUExecutionProvider"]
                if use_cpu_only and "CPUExecutionProvider" in onnxruntime.get_available_providers()
                else None
            )

            return onnxruntime.InferenceSession(
                str(temp_path),
                sess_options=session_opts,
                providers=providers
            )

Static methods

def create_inference_session(use_cpu_only: bool) ‑> onnxruntime.capi.onnxruntime_inference_collection.InferenceSession
Expand source code
@staticmethod
def create_inference_session(use_cpu_only: bool) -> onnxruntime.InferenceSession:
    model_path = importlib.resources.files("videosdk.plugins.silero.model") / "silero_vad.onnx"
    with importlib.resources.as_file(model_path) as temp_path:
        session_opts = onnxruntime.SessionOptions()
        session_opts.inter_op_num_threads = 1
        session_opts.intra_op_num_threads = 1
        session_opts.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL

        providers = (
            ["CPUExecutionProvider"]
            if use_cpu_only and "CPUExecutionProvider" in onnxruntime.get_available_providers()
            else None
        )

        return onnxruntime.InferenceSession(
            str(temp_path),
            sess_options=session_opts,
            providers=providers
        )

Instance variables

prop frame_size : int
Expand source code
@property
def frame_size(self) -> int:
    return self._frame_size
prop history_len : int
Expand source code
@property
def history_len(self) -> int:
    return self._history_len

Methods

def process(self, input_audio: numpy.ndarray) ‑> float
Expand source code
def process(self, input_audio: np.ndarray) -> float:
    if input_audio.ndim == 1:
        input_audio = input_audio.reshape(1, -1)
    if input_audio.ndim > 2:
        raise ValueError(f"Too many dimensions for input audio chunk {input_audio.ndim}")
    
    if self._audio_rate / input_audio.shape[1] > 31.25:
        raise ValueError("Input audio chunk is too short")
    
    num_samples = 512 if self._audio_rate == 16000 else 256
    if input_audio.shape[-1] != num_samples:
        raise ValueError(f"Provided number of samples is {input_audio.shape[-1]} (Supported values: 256 for 8000 sample rate, 512 for 16000)")
    
    buffer = np.concatenate((self._prev_context, input_audio.reshape(1, -1)), axis=1)
    
    inputs = {
        "input": buffer.astype(np.float32),
        "state": self._hidden_state,
        "sr": np.array([self._audio_rate], dtype=np.int64),
    }
    
    prob, state = self._model_session.run(None, inputs)
    
    self._hidden_state = state
    self._prev_context = buffer[:, -self._history_len:]
    
    return prob.item()