Module agents.knowledge_base
Sub-modules
agents.knowledge_base.baseagents.knowledge_base.config
Classes
class KnowledgeBase (config: KnowledgeBaseConfig)-
Expand source code
class KnowledgeBase(ABC): """ Base class for handling knowledge-base retrieval operations. Provides hooks developers can override: - allow_retrieval: Decide if the knowledge base should be used. - pre_process_query: Preprocess the query before searching. - format_context: Format retrieved documents for the final prompt. """ def __init__(self, config: KnowledgeBaseConfig): """ Initialize the knowledge base handler. Args: config (KnowledgeBaseConfig): Configuration for retrieval settings. """ self.config = config def allow_retrieval(self, transcript: str) -> bool: """ Decide whether the knowledge base should be used for this message. Args: transcript (str): User message. Returns: bool: True to perform retrieval, False otherwise. """ return True def pre_process_query(self, transcript: str) -> str: """ Preprocess the user message before searching the knowledge base. Args: transcript (str): Original user message. Returns: str: Processed query string. """ return transcript def format_context(self, documents: List[str]) -> str: """ Format retrieved documents into a context string. Args: documents (List[str]): Retrieved document texts. Returns: str: Formatted context for the model. """ if not documents: return "" doc_str = "\n".join([f"- {doc}" for doc in documents]) return f"Use the following context to answer the user:\n{doc_str}\n" async def retrieve_documents(self, query: str) -> List[str]: """ Fetch documents from the configured knowledge base. Args: query (str): Search query. Returns: List[str]: Retrieved document texts. """ api_base_url = "https://api.videosdk.live/ai/v1" auth_token = os.getenv("VIDEOSDK_AUTH_TOKEN") if not auth_token: logger.warning("VIDEOSDK_AUTH_TOKEN not set, skipping KB retrieval") return [] try: url = f"{api_base_url}/knowledge-bases/{self.config.id}/search" headers = { "Authorization": auth_token, "Content-Type": "application/json" } payload = { "queryText": query, "topK": self.config.top_k } cascading_metrics_collector.on_knowledge_base_start() async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, headers=headers) as response: if response.status == 200: data = await response.json() results = data.get("results", []) # Extract text from each result's payload documents = [] scores = [] record_ids = [] for result in results: if isinstance(result, dict): payload = result.get("payload", {}) if isinstance(payload, dict): text = payload.get("text", "") if text and text.strip(): # Only add non-empty text documents.append(text.strip()) scores.append(result.get("score", 0)) record_ids.append(result.get("recordId", "")) logger.debug(f"Retrieved {len(documents)} documents from knowledge base") cascading_metrics_collector.on_knowledge_base_complete(documents, scores, record_ids) return documents else: error_text = await response.text() logger.error( f"KB API error {response.status}: {error_text}" ) return [] except Exception as e: logger.error(f"Error retrieving KB documents: {e}") return [] async def process_query(self, transcript: str) -> Optional[str]: """ Run the full knowledge-base retrieval flow for a user message. Args: transcript (str): User message. Returns: Optional[str]: Formatted context or None if retrieval is skipped. """ # Check if KB should be triggered if not self.allow_retrieval(transcript): return None # Transform the query query = self.pre_process_query(transcript) # Retrieve documents documents = await self.retrieve_documents(query) if not documents: return None # Format the prompt formatted_context = self.format_context(documents) return formatted_contextBase class for handling knowledge-base retrieval operations.
Provides hooks developers can override: - allow_retrieval: Decide if the knowledge base should be used. - pre_process_query: Preprocess the query before searching. - format_context: Format retrieved documents for the final prompt.
Initialize the knowledge base handler.
Args
config:KnowledgeBaseConfig- Configuration for retrieval settings.
Ancestors
- abc.ABC
Methods
def allow_retrieval(self, transcript: str) ‑> bool-
Expand source code
def allow_retrieval(self, transcript: str) -> bool: """ Decide whether the knowledge base should be used for this message. Args: transcript (str): User message. Returns: bool: True to perform retrieval, False otherwise. """ return TrueDecide whether the knowledge base should be used for this message.
Args
transcript:str- User message.
Returns
bool- True to perform retrieval, False otherwise.
def format_context(self, documents: List[str]) ‑> str-
Expand source code
def format_context(self, documents: List[str]) -> str: """ Format retrieved documents into a context string. Args: documents (List[str]): Retrieved document texts. Returns: str: Formatted context for the model. """ if not documents: return "" doc_str = "\n".join([f"- {doc}" for doc in documents]) return f"Use the following context to answer the user:\n{doc_str}\n"Format retrieved documents into a context string.
Args
documents:List[str]- Retrieved document texts.
Returns
str- Formatted context for the model.
def pre_process_query(self, transcript: str) ‑> str-
Expand source code
def pre_process_query(self, transcript: str) -> str: """ Preprocess the user message before searching the knowledge base. Args: transcript (str): Original user message. Returns: str: Processed query string. """ return transcriptPreprocess the user message before searching the knowledge base.
Args
transcript:str- Original user message.
Returns
str- Processed query string.
async def process_query(self, transcript: str) ‑> str | None-
Expand source code
async def process_query(self, transcript: str) -> Optional[str]: """ Run the full knowledge-base retrieval flow for a user message. Args: transcript (str): User message. Returns: Optional[str]: Formatted context or None if retrieval is skipped. """ # Check if KB should be triggered if not self.allow_retrieval(transcript): return None # Transform the query query = self.pre_process_query(transcript) # Retrieve documents documents = await self.retrieve_documents(query) if not documents: return None # Format the prompt formatted_context = self.format_context(documents) return formatted_contextRun the full knowledge-base retrieval flow for a user message.
Args
transcript:str- User message.
Returns
Optional[str]- Formatted context or None if retrieval is skipped.
async def retrieve_documents(self, query: str) ‑> List[str]-
Expand source code
async def retrieve_documents(self, query: str) -> List[str]: """ Fetch documents from the configured knowledge base. Args: query (str): Search query. Returns: List[str]: Retrieved document texts. """ api_base_url = "https://api.videosdk.live/ai/v1" auth_token = os.getenv("VIDEOSDK_AUTH_TOKEN") if not auth_token: logger.warning("VIDEOSDK_AUTH_TOKEN not set, skipping KB retrieval") return [] try: url = f"{api_base_url}/knowledge-bases/{self.config.id}/search" headers = { "Authorization": auth_token, "Content-Type": "application/json" } payload = { "queryText": query, "topK": self.config.top_k } cascading_metrics_collector.on_knowledge_base_start() async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, headers=headers) as response: if response.status == 200: data = await response.json() results = data.get("results", []) # Extract text from each result's payload documents = [] scores = [] record_ids = [] for result in results: if isinstance(result, dict): payload = result.get("payload", {}) if isinstance(payload, dict): text = payload.get("text", "") if text and text.strip(): # Only add non-empty text documents.append(text.strip()) scores.append(result.get("score", 0)) record_ids.append(result.get("recordId", "")) logger.debug(f"Retrieved {len(documents)} documents from knowledge base") cascading_metrics_collector.on_knowledge_base_complete(documents, scores, record_ids) return documents else: error_text = await response.text() logger.error( f"KB API error {response.status}: {error_text}" ) return [] except Exception as e: logger.error(f"Error retrieving KB documents: {e}") return []Fetch documents from the configured knowledge base.
Args
query:str- Search query.
Returns
List[str]- Retrieved document texts.
class KnowledgeBaseConfig (id: str, top_k: int = 3)-
Expand source code
@dataclass class KnowledgeBaseConfig: """ Configuration for managed RAG (Retrieval-Augmented Generation). Attributes: id: The ID of the knowledge base provided by your app dashboard top_k: Optional number of documents to retrieve (default: 3) """ id: str top_k: int = 3 def __post_init__(self): if not self.id: raise ValueError("id cannot be empty") if self.top_k < 1: raise ValueError("top_k must be at least 1")Configuration for managed RAG (Retrieval-Augmented Generation).
Attributes
id- The ID of the knowledge base provided by your app dashboard
top_k- Optional number of documents to retrieve (default: 3)
Instance variables
var id : strvar top_k : int