Skip to main content

RAG (Retrieval-Augmented Generation) integration

RAG helps your AI agent find relevant information from documents to give better answers. It searches through your knowledge base and uses that context to respond more accurately.

Architecture

RAG

The RAG pipeline flow:

  1. Voice Input → Deepgram STT converts speech to text
  2. Retrieval → Query embeddings fetch relevant documents from ChromaDB
  3. Augmentation → Retrieved context is injected into the prompt
  4. Generation → OpenAI LLM generates a grounded response
  5. Voice Output → ElevenLabs TTS converts response to speech

Prerequisites

  • Install VideoSDK agents with all dependencies:

    pip install "videosdk-agents[deepgram,openai,elevenlabs,silero,turn_detector]"
    pip install chromadb openai numpy
  • Set API keys in envrionment:

    .env
    DEEPGRAM_API_KEY = "Your Deepgram API Key"
    OPENAI_API_KEY = "Your OpenAI API Key"
    ELEVENLABS_API_KEY = "Your ElevenLabs API Key"
    VIDEOSDK_AUTH_TOKEN = "VideoSDK Auth token"

Complete Example

For a complete working example with all the code integrated together, check out our GitHub repository: RAG Implementation Example

Implementation

Step 1: Custom Voice Agent with RAG

Create a custom agent class that extends Agent and adds retrieval capabilities:

main.py
class VoiceAgent(Agent):
def __init__(self):
super().__init__(
instructions="""You are a helpful voice assistant that answers questions
based on provided context. Use the retrieved documents to ground your answers.
If no relevant context is found, say so. Be concise and conversational."""
)

# Initialize OpenAI client for embeddings
self.openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

# Define your knowledge base
self.documents = [
"What is VideoSDK? VideoSDK is a comprehensive video calling and live streaming platform...",
"How do I authenticate with VideoSDK? Use JWT tokens generated with your API key...",
# Add more documents
]

# Set up ChromaDB
self.chroma_client = chromadb.Client() # In-memory
# For persistence: chromadb.PersistentClient(path="./chroma_db")
self.collection = self.chroma_client.create_collection(
name="videosdk_faq_collection"
)

# Generate embeddings and populate database
self._initialize_knowledge_base()

def _initialize_knowledge_base(self):
"""Generate embeddings and store documents."""
embeddings = [self._get_embedding_sync(doc) for doc in self.documents]
self.collection.add(
documents=self.documents,
embeddings=embeddings,
ids=[f"doc_{i}" for i in range(len(self.documents))]
)

Step 2: Embedding Generation

Implement both synchronous (for initialization) and asynchronous (for runtime) embedding methods:

main.py
    def _get_embedding_sync(self, text: str) -> list[float]:
"""Synchronous embedding for initialization."""
import openai
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = client.embeddings.create(
input=text,
model="text-embedding-ada-002"
)
return response.data[0].embedding

async def get_embedding(self, text: str) -> list[float]:
"""Async embedding for runtime queries."""
response = await self.openai_client.embeddings.create(
input=text,
model="text-embedding-ada-002"
)
return response.data[0].embedding

Step 3: Retrieval Method

Add semantic search capability:

main.py
    async def retrieve(self, query: str, k: int = 2) -> list[str]:
"""Retrieve top-k most relevant documents from vector database."""
# Generate query embedding
query_embedding = await self.get_embedding(query)

# Query ChromaDB
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=k
)

# Return matching documents
return results["documents"][0] if results["documents"] else []

Step 4: Agent Lifecycle Hooks

Define agent behavior on entry and exit:

main.py
    async def on_enter(self) -> None:
"""Called when agent session starts."""
await self.session.say("Hello! I'm your VideoSDK assistant. How can I help you today?")

async def on_exit(self) -> None:
"""Called when agent session ends."""
await self.session.say("Thank you for using VideoSDK. Goodbye!")

Step 5: Custom Conversation Flow

Override the conversation flow to inject retrieved context:

main.py
class RAGConversationFlow(ConversationFlow):
async def run(self, transcript: str) -> AsyncIterator[str]:
"""
Process user input with RAG pipeline.
Args:
transcript: User's speech transcribed to text
Yields:
Generated response chunks
"""
# Step 1: Retrieve relevant documents
context_docs = await self.agent.retrieve(transcript)

# Step 2: Format context
if context_docs:
context_str = "\n\n".join([f"Document {i+1}: {doc}"
for i, doc in enumerate(context_docs)])
else:
context_str = "No relevant context found."

# Step 3: Inject context into conversation
self.agent.chat_context.add_message(
role="system",
content=f"Retrieved Context:\n{context_str}\n\nUse this context to answer the user's question."
)

# Step 4: Generate response with LLM
async for response_chunk in self.process_with_llm():
yield response_chunk

Step 6: Session and Pipeline Setup

Configure the agent session and start the job:

main.py
async def entrypoint(ctx: JobContext):
agent = VoiceAgent()

conversation_flow = RAGConversationFlow(
agent=agent,
)

session = AgentSession(
agent=agent,
pipeline=CascadingPipeline(
stt=DeepgramSTT(),
llm=OpenAILLM(),
tts=ElevenLabsTTS(),
vad=SileroVAD(),
turn_detector=TurnDetector()
),
conversation_flow=conversation_flow,
)

# Register cleanup
ctx.add_shutdown_callback(lambda: session.close())

# Start agent
try:
await ctx.connect()
print("Waiting for participant...")
await ctx.room.wait_for_participant()
print("Participant joined - starting session")
await session.start()
await asyncio.Event().wait()
except KeyboardInterrupt:
print("\nShutting down gracefully...")
finally:
await session.close()
await ctx.shutdown()

def make_context() -> JobContext:
room_options = RoomOptions(name="RAG Voice Assistant", playground=True)
return JobContext(room_options=room_options)

if __name__ == "__main__":
job = WorkerJob(entrypoint=entrypoint, jobctx=make_context)
job.start()

Advanced Features

Dynamic Document Updates

Add documents at runtime:

main.py
async def add_document(self, document: str, metadata: dict = None):
"""Add a new document to the knowledge base."""
doc_id = f"doc_{len(self.documents)}"
embedding = await self.get_embedding(document)
self.collection.add(
documents=[document],
embeddings=[embedding],
ids=[doc_id],
metadatas=[metadata] if metadata else None
)
self.documents.append(document)

Document Chunking

Split large documents for better retrieval:

main.py
def chunk_document(self, document: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
"""Split document into overlapping chunks."""
words = document.split()
chunks = []
for i in range(0, len(words), chunk_size - overlap):
chunk = " ".join(words[i:i + chunk_size])
chunks.append(chunk)
return chunks

# Use when adding documents
for doc in large_documents:
chunks = self.chunk_document(doc)
for chunk in chunks:
self.documents.append(chunk)

Best Practices

  1. Document Quality: Use clear, well-structured documents with specific information
  2. Chunk Size: Keep chunks between 300-800 words for optimal retrieval
  3. Retrieval Count: Start with k=2-3, adjust based on response quality and latency
  4. Context Window: Ensure retrieved context fits within LLM token limits
  5. Persistent Storage: Use PersistentClient in production to save embeddings
  6. Error Handling: Always handle retrieval failures gracefully
  7. Testing: Test with diverse queries to ensure good coverage

Common Issues

IssueSolution
Slow responsesReduce retrieval count (k), use faster embedding model, or cache embeddings
Irrelevant resultsImprove document quality, adjust chunking strategy, or use metadata filtering
Out of memoryUse PersistentClient instead of in-memory Client

Got a Question? Ask us on discord