Module agents.llm
Sub-modules
agents.llm.chat_contextagents.llm.fallback_llmagents.llm.llm
Classes
class ChatContext (items: Optional[List[ChatItem]] = None)-
Expand source code
class ChatContext: """ Manages a conversation context for LLM interactions. """ def __init__(self, items: Optional[List[ChatItem]] = None): """ Initialize the chat context. Args: items (Optional[List[ChatItem]]): Initial list of chat items. If None, starts with empty context. """ self._items: List[ChatItem] = items or [] @classmethod def empty(cls) -> ChatContext: """ Create an empty chat context. Returns: ChatContext: A new empty chat context instance. """ return cls([]) @property def items(self) -> List[ChatItem]: """ Get all items in the context. Returns: List[ChatItem]: List of all conversation items (messages, function calls, outputs). """ return self._items def add_message( self, role: ChatRole, content: Union[str, List[ChatContent]], message_id: Optional[str] = None, created_at: Optional[float] = None, replace: bool = False, ) -> ChatMessage: """ Add a new message to the context. Args: role (ChatRole): The role of the message sender. content (Union[str, List[ChatContent]]): The message content as text or content items. message_id (Optional[str], optional): Custom message ID. Auto-generated if not provided. created_at (Optional[float], optional): Custom creation timestamp. Uses current time if not provided. replace (bool, optional): If True and role is SYSTEM, replaces the existing system message. Defaults to False. Returns: ChatMessage: The newly created and added message. """ if replace and role == ChatRole.SYSTEM: self._items = [ item for item in self._items if not (isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM) ] if isinstance(content, str): content = [content] message = ChatMessage( role=role, content=content, id=message_id or f"msg_{int(time.time())}", created_at=created_at or time.time(), ) self._items.append(message) return message def add_function_call( self, name: str, arguments: str, call_id: Optional[str] = None ) -> FunctionCall: """ Add a function call to the context. Args: name (str): Name of the function to be called. arguments (str): JSON string containing the function arguments. call_id (Optional[str], optional): Custom call ID. Auto-generated if not provided. Returns: FunctionCall: The newly created and added function call. """ call = FunctionCall( name=name, arguments=arguments, call_id=call_id or f"call_{int(time.time())}" ) self._items.append(call) return call def add_function_output( self, name: str, output: str, call_id: str, is_error: bool = False ) -> FunctionCallOutput: """ Add a function output to the context. Args: name (str): Name of the function that was executed. output (str): The result or output from the function execution. call_id (str): ID linking this output to the original function call. is_error (bool, optional): Flag indicating if the function execution failed. Defaults to False. Returns: FunctionCallOutput: The newly created and added function output. """ function_output = FunctionCallOutput( name=name, output=output, call_id=call_id, is_error=is_error ) self._items.append(function_output) return function_output def get_by_id(self, item_id: str) -> Optional[ChatItem]: """ Find an item by its ID. Args: item_id (str): The ID of the item to find. Returns: Optional[ChatItem]: The found item or None if not found. """ return next( (item for item in self._items if item.id == item_id), None ) def copy( self, *, exclude_function_calls: bool = False, exclude_system_messages: bool = False, tools: Optional[List[FunctionTool]] = None, ) -> ChatContext: """ Create a filtered copy of the chat context. Args: exclude_function_calls (bool, optional): Whether to exclude function calls and outputs. Defaults to False. exclude_system_messages (bool, optional): Whether to exclude system messages. Defaults to False. tools (Optional[List[FunctionTool]], optional): List of available tools to filter function calls by. Defaults to None. Returns: ChatContext: A new ChatContext instance with the filtered items. """ items = [] valid_tool_names = {get_tool_info(tool).name for tool in ( tools or []) if is_function_tool(tool)} for item in self._items: # Skip function calls if excluded if exclude_function_calls and isinstance(item, (FunctionCall, FunctionCallOutput)): continue # Skip system messages if excluded if exclude_system_messages and isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM: continue # Filter by valid tools if tools are provided if tools and isinstance(item, (FunctionCall, FunctionCallOutput)): if item.name not in valid_tool_names: continue items.append(item) return ChatContext(items) def truncate(self, max_items: int) -> ChatContext: """ Truncate the context to the last N items while preserving system message. Args: max_items (int): Maximum number of items to keep in the context. Returns: ChatContext: The current context instance after truncation. """ system_msg = next( (item for item in self._items if isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM), None ) new_items = self._items[-max_items:] while new_items and isinstance(new_items[0], (FunctionCall, FunctionCallOutput)): new_items.pop(0) if system_msg and system_msg not in new_items: new_items.insert(0, system_msg) self._items = new_items return self def to_dict(self) -> dict: """ Convert the context to a dictionary representation. Returns: dict: Dictionary representation of the chat context. """ return { "items": [ { "type": item.type, "id": item.id, **({"role": item.role.value, "content": item.content} if isinstance(item, ChatMessage) else {}), **({"name": item.name, "arguments": item.arguments, "call_id": item.call_id} if isinstance(item, FunctionCall) else {}), **({"name": item.name, "output": item.output, "call_id": item.call_id, "is_error": item.is_error} if isinstance(item, FunctionCallOutput) else {}) } for item in self._items ] } @classmethod def from_dict(cls, data: dict) -> ChatContext: """ Create a ChatContext from a dictionary representation. Args: data (dict): Dictionary containing the serialized chat context data. Returns: ChatContext: A new ChatContext instance reconstructed from the data. """ items = [] for item_data in data["items"]: if item_data["type"] == "message": items.append(ChatMessage( role=ChatRole(item_data["role"]), content=item_data["content"], id=item_data["id"] )) elif item_data["type"] == "function_call": items.append(FunctionCall( name=item_data["name"], arguments=item_data["arguments"], call_id=item_data["call_id"], id=item_data["id"] )) elif item_data["type"] == "function_call_output": items.append(FunctionCallOutput( name=item_data["name"], output=item_data["output"], call_id=item_data["call_id"], is_error=item_data.get("is_error", False), id=item_data["id"] )) return cls(items) def cleanup(self) -> None: """ Clear all chat context items and references to free memory. """ logger.info(f"Cleaning up ChatContext with {len(self._items)} items") for item in self._items: if isinstance(item, ChatMessage): if isinstance(item.content, list): for content_item in item.content: if isinstance(content_item, ImageContent): content_item.image = None item.content = None elif isinstance(item, FunctionCall): item.arguments = None elif isinstance(item, FunctionCallOutput): item.output = None self._items.clear() try: import gc gc.collect() logger.info("ChatContext garbage collection completed") except Exception as e: logger.error(f"Error during ChatContext garbage collection: {e}") logger.info("ChatContext cleanup completed")Manages a conversation context for LLM interactions.
Initialize the chat context.
Args
items:Optional[List[ChatItem]]- Initial list of chat items. If None, starts with empty context.
Static methods
def empty() ‑> ChatContextdef from_dict(data: dict) ‑> ChatContext-
Create a ChatContext from a dictionary representation.
Args
data:dict- Dictionary containing the serialized chat context data.
Returns
ChatContext- A new ChatContext instance reconstructed from the data.
Instance variables
prop items : List[ChatItem]-
Expand source code
@property def items(self) -> List[ChatItem]: """ Get all items in the context. Returns: List[ChatItem]: List of all conversation items (messages, function calls, outputs). """ return self._itemsGet all items in the context.
Returns
List[ChatItem]- List of all conversation items (messages, function calls, outputs).
Methods
def add_function_call(self, name: str, arguments: str, call_id: Optional[str] = None) ‑> FunctionCall-
Expand source code
def add_function_call( self, name: str, arguments: str, call_id: Optional[str] = None ) -> FunctionCall: """ Add a function call to the context. Args: name (str): Name of the function to be called. arguments (str): JSON string containing the function arguments. call_id (Optional[str], optional): Custom call ID. Auto-generated if not provided. Returns: FunctionCall: The newly created and added function call. """ call = FunctionCall( name=name, arguments=arguments, call_id=call_id or f"call_{int(time.time())}" ) self._items.append(call) return callAdd a function call to the context.
Args
name:str- Name of the function to be called.
arguments:str- JSON string containing the function arguments.
call_id:Optional[str], optional- Custom call ID. Auto-generated if not provided.
Returns
FunctionCall- The newly created and added function call.
def add_function_output(self, name: str, output: str, call_id: str, is_error: bool = False) ‑> FunctionCallOutput-
Expand source code
def add_function_output( self, name: str, output: str, call_id: str, is_error: bool = False ) -> FunctionCallOutput: """ Add a function output to the context. Args: name (str): Name of the function that was executed. output (str): The result or output from the function execution. call_id (str): ID linking this output to the original function call. is_error (bool, optional): Flag indicating if the function execution failed. Defaults to False. Returns: FunctionCallOutput: The newly created and added function output. """ function_output = FunctionCallOutput( name=name, output=output, call_id=call_id, is_error=is_error ) self._items.append(function_output) return function_outputAdd a function output to the context.
Args
name:str- Name of the function that was executed.
output:str- The result or output from the function execution.
call_id:str- ID linking this output to the original function call.
is_error:bool, optional- Flag indicating if the function execution failed. Defaults to False.
Returns
FunctionCallOutput- The newly created and added function output.
def add_message(self,
role: ChatRole,
content: Union[str, List[ChatContent]],
message_id: Optional[str] = None,
created_at: Optional[float] = None,
replace: bool = False) ‑> ChatMessage-
Expand source code
def add_message( self, role: ChatRole, content: Union[str, List[ChatContent]], message_id: Optional[str] = None, created_at: Optional[float] = None, replace: bool = False, ) -> ChatMessage: """ Add a new message to the context. Args: role (ChatRole): The role of the message sender. content (Union[str, List[ChatContent]]): The message content as text or content items. message_id (Optional[str], optional): Custom message ID. Auto-generated if not provided. created_at (Optional[float], optional): Custom creation timestamp. Uses current time if not provided. replace (bool, optional): If True and role is SYSTEM, replaces the existing system message. Defaults to False. Returns: ChatMessage: The newly created and added message. """ if replace and role == ChatRole.SYSTEM: self._items = [ item for item in self._items if not (isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM) ] if isinstance(content, str): content = [content] message = ChatMessage( role=role, content=content, id=message_id or f"msg_{int(time.time())}", created_at=created_at or time.time(), ) self._items.append(message) return messageAdd a new message to the context.
Args
role:ChatRole- The role of the message sender.
content:Union[str, List[ChatContent]]- The message content as text or content items.
message_id:Optional[str], optional- Custom message ID. Auto-generated if not provided.
created_at:Optional[float], optional- Custom creation timestamp. Uses current time if not provided.
replace:bool, optional- If True and role is SYSTEM, replaces the existing system message. Defaults to False.
Returns
ChatMessage- The newly created and added message.
def cleanup(self) ‑> None-
Expand source code
def cleanup(self) -> None: """ Clear all chat context items and references to free memory. """ logger.info(f"Cleaning up ChatContext with {len(self._items)} items") for item in self._items: if isinstance(item, ChatMessage): if isinstance(item.content, list): for content_item in item.content: if isinstance(content_item, ImageContent): content_item.image = None item.content = None elif isinstance(item, FunctionCall): item.arguments = None elif isinstance(item, FunctionCallOutput): item.output = None self._items.clear() try: import gc gc.collect() logger.info("ChatContext garbage collection completed") except Exception as e: logger.error(f"Error during ChatContext garbage collection: {e}") logger.info("ChatContext cleanup completed")Clear all chat context items and references to free memory.
def copy(self,
*,
exclude_function_calls: bool = False,
exclude_system_messages: bool = False,
tools: Optional[List[FunctionTool]] = None) ‑> ChatContext-
Expand source code
def copy( self, *, exclude_function_calls: bool = False, exclude_system_messages: bool = False, tools: Optional[List[FunctionTool]] = None, ) -> ChatContext: """ Create a filtered copy of the chat context. Args: exclude_function_calls (bool, optional): Whether to exclude function calls and outputs. Defaults to False. exclude_system_messages (bool, optional): Whether to exclude system messages. Defaults to False. tools (Optional[List[FunctionTool]], optional): List of available tools to filter function calls by. Defaults to None. Returns: ChatContext: A new ChatContext instance with the filtered items. """ items = [] valid_tool_names = {get_tool_info(tool).name for tool in ( tools or []) if is_function_tool(tool)} for item in self._items: # Skip function calls if excluded if exclude_function_calls and isinstance(item, (FunctionCall, FunctionCallOutput)): continue # Skip system messages if excluded if exclude_system_messages and isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM: continue # Filter by valid tools if tools are provided if tools and isinstance(item, (FunctionCall, FunctionCallOutput)): if item.name not in valid_tool_names: continue items.append(item) return ChatContext(items)Create a filtered copy of the chat context.
Args
exclude_function_calls:bool, optional- Whether to exclude function calls and outputs. Defaults to False.
exclude_system_messages:bool, optional- Whether to exclude system messages. Defaults to False.
tools:Optional[List[FunctionTool]], optional- List of available tools to filter function calls by. Defaults to None.
Returns
ChatContext- A new ChatContext instance with the filtered items.
def get_by_id(self, item_id: str) ‑> ChatMessage | FunctionCall | FunctionCallOutput | None-
Expand source code
def get_by_id(self, item_id: str) -> Optional[ChatItem]: """ Find an item by its ID. Args: item_id (str): The ID of the item to find. Returns: Optional[ChatItem]: The found item or None if not found. """ return next( (item for item in self._items if item.id == item_id), None )Find an item by its ID.
Args
item_id:str- The ID of the item to find.
Returns
Optional[ChatItem]- The found item or None if not found.
def to_dict(self) ‑> dict-
Expand source code
def to_dict(self) -> dict: """ Convert the context to a dictionary representation. Returns: dict: Dictionary representation of the chat context. """ return { "items": [ { "type": item.type, "id": item.id, **({"role": item.role.value, "content": item.content} if isinstance(item, ChatMessage) else {}), **({"name": item.name, "arguments": item.arguments, "call_id": item.call_id} if isinstance(item, FunctionCall) else {}), **({"name": item.name, "output": item.output, "call_id": item.call_id, "is_error": item.is_error} if isinstance(item, FunctionCallOutput) else {}) } for item in self._items ] }Convert the context to a dictionary representation.
Returns
dict- Dictionary representation of the chat context.
def truncate(self, max_items: int) ‑> ChatContext-
Expand source code
def truncate(self, max_items: int) -> ChatContext: """ Truncate the context to the last N items while preserving system message. Args: max_items (int): Maximum number of items to keep in the context. Returns: ChatContext: The current context instance after truncation. """ system_msg = next( (item for item in self._items if isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM), None ) new_items = self._items[-max_items:] while new_items and isinstance(new_items[0], (FunctionCall, FunctionCallOutput)): new_items.pop(0) if system_msg and system_msg not in new_items: new_items.insert(0, system_msg) self._items = new_items return selfTruncate the context to the last N items while preserving system message.
Args
max_items:int- Maximum number of items to keep in the context.
Returns
ChatContext- The current context instance after truncation.
class ChatMessage (**data: Any)-
Expand source code
class ChatMessage(BaseModel): """ Represents a single message in the chat context. Attributes: role (ChatRole): The role of the message sender (system, user, or assistant). content (Union[str, List[ChatContent]]): The message content as text or list of content items. id (str): Unique identifier for the message. Auto-generated if not provided. type (Literal["message"]): Type identifier, always "message". created_at (float): Unix timestamp when the message was created. interrupted (bool): Flag indicating if the message was interrupted during generation. """ role: ChatRole content: Union[str, List[ChatContent]] id: str = Field(default_factory=lambda: f"msg_{int(time.time())}") type: Literal["message"] = "message" created_at: float = Field(default_factory=time.time) interrupted: bool = FalseRepresents a single message in the chat context.
Attributes
role:ChatRole- The role of the message sender (system, user, or assistant).
content:Union[str, List[ChatContent]]- The message content as text or list of content items.
id:str- Unique identifier for the message. Auto-generated if not provided.
- type (Literal["message"]): Type identifier, always "message".
created_at:float- Unix timestamp when the message was created.
interrupted:bool- Flag indicating if the message was interrupted during generation.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var content : str | List[str | ImageContent]var created_at : floatvar id : strvar interrupted : boolvar model_configvar role : ChatRolevar type : Literal['message']
class ChatRole (*args, **kwds)-
Expand source code
class ChatRole(str, Enum): """ Enumeration of chat roles for message classification. Defines the three standard roles used in chat conversations: - SYSTEM: Instructions and context for the AI assistant - USER: Messages from the human user - ASSISTANT: Responses from the AI assistant """ SYSTEM = "system" USER = "user" ASSISTANT = "assistant"Enumeration of chat roles for message classification.
Defines the three standard roles used in chat conversations: - SYSTEM: Instructions and context for the AI assistant - USER: Messages from the human user - ASSISTANT: Responses from the AI assistant
Ancestors
- builtins.str
- enum.Enum
Class variables
var ASSISTANTvar SYSTEMvar USER
class ConversationalGraphResponse (**data: Any)-
Expand source code
class ConversationalGraphResponse(BaseModel): """ Data model to hold Conversational Graph response data.""" response_to_user:str = Field(..., description="Response to the user by agent") extracted_values:List[ExtractedField] = Field(default_factory=list, description="List of extracted values from the user input") move_forward:bool = Field(False, description="If we want to Move forward to the next state") reasoning:str = Field("", description="Reasoning for the response") chosen_branch:str = Field(None, description="Chosen branch for the move forward") is_off_topic:bool = Field(False, description="Is the user input off topic") backtrack_to_state:str = Field(None, description="Backtrack to the state") current_state_id:str = Field(None, description="exact state_id of current state")Data model to hold Conversational Graph response data.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var backtrack_to_state : strvar chosen_branch : strvar current_state_id : strvar extracted_values : List[ExtractedField]var is_off_topic : boolvar model_configvar move_forward : boolvar reasoning : strvar response_to_user : str
class FallbackLLM (providers: List[LLM],
temporary_disable_sec: float = 60.0,
permanent_disable_after_attempts: int = 3)-
Expand source code
class FallbackLLM(LLM, FallbackBase): def __init__(self, providers: List[LLM], temporary_disable_sec: float = 60.0, permanent_disable_after_attempts: int = 3): LLM.__init__(self) FallbackBase.__init__(self, providers, "LLM", temporary_disable_sec=temporary_disable_sec, permanent_disable_after_attempts=permanent_disable_after_attempts) self._setup_event_listeners() def _setup_event_listeners(self): self.active_provider.on("error", self._on_provider_error) def _on_provider_error(self, error_msg): failed_p = self.active_provider asyncio.create_task(self._handle_async_error(str(error_msg), failed_p)) async def _handle_async_error(self, error_msg: str, failed_provider: Any): switched = await self._switch_provider(f"Async Error: {error_msg}", failed_provider=failed_provider) if not switched: self.emit("error", error_msg) async def _switch_provider(self, reason: str, failed_provider: Any = None): provider_to_cleanup = failed_provider if failed_provider else self.active_provider try: provider_to_cleanup.off("error", self._on_provider_error) except: pass active_before = self.active_provider switched = await super()._switch_provider(reason, failed_provider) active_after = self.active_provider if switched: if active_before != active_after: self.active_provider.on("error", self._on_provider_error) return True return False async def chat(self, messages: ChatContext, **kwargs) -> AsyncIterator[LLMResponse]: """ Attempts to chat with current provider. Loops until one succeeds or all fail. Checks for recovery of primary providers before starting. """ self.check_recovery() while True: current_provider = self.active_provider try: async for chunk in current_provider.chat(messages, **kwargs): yield chunk return except Exception as e: switched = await self._switch_provider(str(e), failed_provider=current_provider) if not switched: raise e async def cancel_current_generation(self) -> None: await self.active_provider.cancel_current_generation() async def aclose(self) -> None: for p in self.providers: await p.aclose() await super().aclose()Base class for LLM implementations.
Initialize the LLM base class.
Ancestors
- LLM
- EventEmitter
- typing.Generic
- FallbackBase
Methods
async def chat(self,
messages: ChatContext,
**kwargs) ‑> AsyncIterator[LLMResponse]-
Expand source code
async def chat(self, messages: ChatContext, **kwargs) -> AsyncIterator[LLMResponse]: """ Attempts to chat with current provider. Loops until one succeeds or all fail. Checks for recovery of primary providers before starting. """ self.check_recovery() while True: current_provider = self.active_provider try: async for chunk in current_provider.chat(messages, **kwargs): yield chunk return except Exception as e: switched = await self._switch_provider(str(e), failed_provider=current_provider) if not switched: raise eAttempts to chat with current provider. Loops until one succeeds or all fail. Checks for recovery of primary providers before starting.
Inherited members
class FunctionCall (**data: Any)-
Expand source code
class FunctionCall(BaseModel): """ Represents a function call in the chat context. Attributes: id (str): Unique identifier for the function call. Auto-generated if not provided. type (Literal["function_call"]): Type identifier, always "function_call". name (str): Name of the function to be called. arguments (str): JSON string containing the function arguments. call_id (str): Unique identifier linking this call to its output. """ id: str = Field(default_factory=lambda: f"call_{int(time.time())}") type: Literal["function_call"] = "function_call" name: str arguments: str call_id: strRepresents a function call in the chat context.
Attributes
id:str- Unique identifier for the function call. Auto-generated if not provided.
- type (Literal["function_call"]): Type identifier, always "function_call".
name:str- Name of the function to be called.
arguments:str- JSON string containing the function arguments.
call_id:str- Unique identifier linking this call to its output.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var arguments : strvar call_id : strvar id : strvar model_configvar name : strvar type : Literal['function_call']
class FunctionCallOutput (**data: Any)-
Expand source code
class FunctionCallOutput(BaseModel): """ Represents the output of a function call. Attributes: id (str): Unique identifier for the function output. Auto-generated if not provided. type (Literal["function_call_output"]): Type identifier, always "function_call_output". name (str): Name of the function that was called. call_id (str): Identifier linking this output to the original function call. output (str): The result or output from the function execution. is_error (bool): Flag indicating if the function execution failed. """ id: str = Field(default_factory=lambda: f"output_{int(time.time())}") type: Literal["function_call_output"] = "function_call_output" name: str call_id: str output: str is_error: bool = FalseRepresents the output of a function call.
Attributes
id:str- Unique identifier for the function output. Auto-generated if not provided.
- type (Literal["function_call_output"]): Type identifier, always "function_call_output".
name:str- Name of the function that was called.
call_id:str- Identifier linking this output to the original function call.
output:str- The result or output from the function execution.
is_error:bool- Flag indicating if the function execution failed.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var call_id : strvar id : strvar is_error : boolvar model_configvar name : strvar output : strvar type : Literal['function_call_output']
class ImageContent (**data: Any)-
Expand source code
class ImageContent(BaseModel): """ Represents image content in chat messages. Attributes: id (str): Unique identifier for the image. Auto-generated if not provided. type (Literal["image"]): Type identifier, always "image". image (Union[av.VideoFrame, str]): The image data as VideoFrame or URL string. inference_detail (Literal["auto", "high", "low"]): Detail level for LLM image analysis. encode_options (EncodeOptions): Configuration for image encoding and resizing. """ id: str = Field(default_factory=lambda: f"img_{int(time.time())}") type: Literal["image"] = "image" image: Union[av.VideoFrame, str] inference_detail: Literal["auto", "high", "low"] = "auto" encode_options: EncodeOptions = Field( default_factory=lambda: EncodeOptions( format="JPEG", quality=90, resize_options=ResizeOptions( width=320, height=240 ), ) ) class Config: arbitrary_types_allowed = True def to_data_url(self) -> str: """ Convert the image to a data URL format. Returns: str: A data URL string representing the image. """ if isinstance(self.image, str): return self.image encoded_image = images.encode(self.image, self.encode_options) b64_image = base64.b64encode(encoded_image).decode("utf-8") return f"data:image/{self.encode_options.format.lower()};base64,{b64_image}"Represents image content in chat messages.
Attributes
id:str- Unique identifier for the image. Auto-generated if not provided.
- type (Literal["image"]): Type identifier, always "image".
image:Union[av.VideoFrame, str]- The image data as VideoFrame or URL string.
- inference_detail (Literal["auto", "high", "low"]): Detail level for LLM image analysis.
encode_options:EncodeOptions- Configuration for image encoding and resizing.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var Configvar encode_options : EncodeOptionsvar id : strvar image : av.video.frame.VideoFrame | strvar inference_detail : Literal['auto', 'high', 'low']var model_configvar type : Literal['image']
Methods
def to_data_url(self) ‑> str-
Expand source code
def to_data_url(self) -> str: """ Convert the image to a data URL format. Returns: str: A data URL string representing the image. """ if isinstance(self.image, str): return self.image encoded_image = images.encode(self.image, self.encode_options) b64_image = base64.b64encode(encoded_image).decode("utf-8") return f"data:image/{self.encode_options.format.lower()};base64,{b64_image}"Convert the image to a data URL format.
Returns
str- A data URL string representing the image.
class LLM-
Expand source code
class LLM(EventEmitter[Literal["error"]]): """ Base class for LLM implementations. """ def __init__(self) -> None: """ Initialize the LLM base class. """ super().__init__() self._label = f"{type(self).__module__}.{type(self).__name__}" @property def label(self) -> str: """ Get the LLM provider label. Returns: str: A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM"). """ return self._label @abstractmethod async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Main method to interact with the LLM. Args: messages (ChatContext): The conversation context containing message history. tools (list[FunctionTool] | None, optional): List of available function tools for the LLM to use. **kwargs (Any): Additional arguments specific to the LLM provider implementation. Returns: AsyncIterator[LLMResponse]: An async iterator yielding LLMResponse objects as they're generated. Raises: NotImplementedError: This method must be implemented by subclasses. """ raise NotImplementedError @abstractmethod async def cancel_current_generation(self) -> None: """ Cancel the current LLM generation if active. Raises: NotImplementedError: This method must be implemented by subclasses. """ # override in subclasses pass async def aclose(self) -> None: """ Cleanup resources. """ logger.info(f"Cleaning up LLM: {self.label}") await self.cancel_current_generation() try: import gc gc.collect() logger.info(f"LLM garbage collection completed: {self.label}") except Exception as e: logger.error(f"Error during LLM garbage collection: {e}") logger.info(f"LLM cleanup completed: {self.label}") async def __aenter__(self) -> LLM: """ Async context manager entry point. """ return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """ Async context manager exit point. """ await self.aclose()Base class for LLM implementations.
Initialize the LLM base class.
Ancestors
- EventEmitter
- typing.Generic
Subclasses
Instance variables
prop label : str-
Expand source code
@property def label(self) -> str: """ Get the LLM provider label. Returns: str: A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM"). """ return self._labelGet the LLM provider label.
Returns
str- A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM").
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """ Cleanup resources. """ logger.info(f"Cleaning up LLM: {self.label}") await self.cancel_current_generation() try: import gc gc.collect() logger.info(f"LLM garbage collection completed: {self.label}") except Exception as e: logger.error(f"Error during LLM garbage collection: {e}") logger.info(f"LLM cleanup completed: {self.label}")Cleanup resources.
async def cancel_current_generation(self) ‑> None-
Expand source code
@abstractmethod async def cancel_current_generation(self) -> None: """ Cancel the current LLM generation if active. Raises: NotImplementedError: This method must be implemented by subclasses. """ # override in subclasses passCancel the current LLM generation if active.
Raises
NotImplementedError- This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
**kwargs: Any) ‑> AsyncIterator[LLMResponse]-
Expand source code
@abstractmethod async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Main method to interact with the LLM. Args: messages (ChatContext): The conversation context containing message history. tools (list[FunctionTool] | None, optional): List of available function tools for the LLM to use. **kwargs (Any): Additional arguments specific to the LLM provider implementation. Returns: AsyncIterator[LLMResponse]: An async iterator yielding LLMResponse objects as they're generated. Raises: NotImplementedError: This method must be implemented by subclasses. """ raise NotImplementedErrorMain method to interact with the LLM.
Args
messages:ChatContext- The conversation context containing message history.
tools:list[FunctionTool] | None, optional- List of available function tools for the LLM to use.
**kwargs:Any- Additional arguments specific to the LLM provider implementation.
Returns
AsyncIterator[LLMResponse]- An async iterator yielding LLMResponse objects as they're generated.
Raises
NotImplementedError- This method must be implemented by subclasses.
class LLMResponse (**data: Any)-
Expand source code
class LLMResponse(BaseModel): """ Data model to hold LLM response data. Attributes: content (str): The text content generated by the LLM. role (ChatRole): The role of the response (typically ASSISTANT). metadata (Optional[dict[str, Any]]): Additional response metadata from the LLM provider. """ content: str role: ChatRole metadata: Optional[dict[str, Any]] = NoneData model to hold LLM response data.
Attributes
content:str- The text content generated by the LLM.
role:ChatRole- The role of the response (typically ASSISTANT).
metadata:Optional[dict[str, Any]]- Additional response metadata from the LLM provider.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var content : strvar metadata : dict[str, typing.Any] | Nonevar model_configvar role : ChatRole
class ResponseChunk (content: str, metadata: dict[str, Any] | None = None, role: str | None = None)-
Expand source code
class ResponseChunk(str): def __new__(cls, content: str, metadata: dict[str, Any] | None = None, role: str | None = None): obj = super().__new__(cls, content or "") obj.metadata = metadata obj.role = role return obj @property def content(self) -> str: return str(self)str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
Ancestors
- builtins.str
Instance variables
prop content : str-
Expand source code
@property def content(self) -> str: return str(self)