Skip to main content

RealTime Pipeline

The Realtime Pipeline provides direct speech-to-speech processing with minimal latency. It uses unified models that handle the entire audio processing pipeline in a single step, offering the fastest possible response times for conversational AI.

Realtime Pipeline Architecture

tip

The RealTimePipeline is specifically designed for real-time AI models that provide end-to-end speech processing which is better for conversational agent. For use cases requiring more granular control over individual components (STT, LLM, TTS) for context support and response control, the CascadingPipeline ↗ would be more appropriate.

Basic Usage

Setting up a RealTimePipeline is straightforward. You simply need to initialize your chosen real-time model and pass it to the pipeline's constructor.

from videosdk.agents import RealTimePipeline
from videosdk.plugins.openai import OpenAIRealtime, OpenAIRealtimeConfig

# Initialize the desired real-time model
model = OpenAIRealtime(
model="gpt-4o-realtime-preview",
config=OpenAIRealtimeConfig(
voice="alloy",
response_modalities=["AUDIO"]
)
)

# Create the pipeline with the model
pipeline = RealTimePipeline(model=model)

In addition to OpenAI ↗, the Realtime Pipeline also supports other advanced models like Google Gemini (Live API) ↗ and AWS Nova Sonic ↗, each offering unique features for building high-performance conversational agents, you can check their pages for advance configuration options.

tip
  • Choose a model based on its optimal audio sample rate (OpenAI/Nova Sonic: 16kHz, Gemini: 24kHz) to best fit your needs.

  • For cloud providers like AWS, select the server region closest to your users to minimize network latency.

Custom Model Integration

To integrate a custom real-time model, you need to implement the RealtimeBaseModel interface, which requires implementing methods like connect(), handle_audio_input(), send_message(), and interrupt().

from videosdk.agents import RealtimeBaseModel, Agent, CustomAudioStreamTrack
from typing import Literal, Optional
import asyncio

class CustomRealtime(RealtimeBaseModel[Literal["user_speech_started", "error"]]):
"""Custom real-time AI model implementation"""

def __init__(self, model_name: str, api_key: str):
super().__init__()
self.model_name = model_name
self.api_key = api_key
self.audio_track: Optional[CustomAudioStreamTrack] = None
self._instructions = ""
self._tools = []
self._connected = False

def set_agent(self, agent: Agent) -> None:
"""Set agent instructions and tools"""
self._instructions = agent.instructions
self._tools = agent.tools

async def connect(self) -> None:
"""Initialize connection to your AI service"""
# Your connection logic here
self._connected = True
print(f"Connected to {self.model_name}")

async def handle_audio_input(self, audio_data: bytes) -> None:
"""Process incoming audio from user"""
if not self._connected:
return

# Process audio and generate response
# Your audio processing logic here

# Emit user speech detection
self.emit("user_speech_started", {"type": "detected"})

# Generate and play response audio
if self.audio_track:
response_audio = b"your_generated_audio_bytes"
await self.audio_track.add_new_bytes(response_audio)

async def send_message(self, message: str) -> None:
"""Send text message to model"""
# Your text processing logic here
pass

async def interrupt(self) -> None:
"""Interrupt current response"""
if self.audio_track:
self.audio_track.interrupt()

async def aclose(self) -> None:
"""Cleanup resources"""
self._connected = False
if self.audio_track:
await self.audio_track.cleanup()

Comparison with Cascading Pipeline

The key architectural difference is that RealTimePipeline uses integrated models that handle the entire speech-to-speech pipeline internally, while cascading pipelines coordinate separate STT, LLM, and TTS components.

FeatureRealtime PipelineCascading Pipeline
LatencySignificantly lower latency, ideal for highly interactive, real-time conversations.Higher latency due to coordinating separate STT, LLM, and TTS components.
ControlLess granular control; tools are handled directly by the integrated model.Granular control over each step (STT, LLM, TTS), allowing for more complex logic.
FlexibilityLimited to the capabilities of the single, chosen real-time model.Allows mixing and matching different providers for each component (e.g., Google STT, OpenAI LLM).
ComplexitySimpler to configure as it involves a single, unified model.More complex to set up due to the coordination of multiple separate components.
CostVaries depending on the chosen real-time model and usage patterns.Varies depending on the combination of providers and usage for each component.

Examples - Try out yourself

We have examples to get you started. Go ahead, try out, talk to agent, understand and customize according to your needs.

Got a Question? Ask us on discord