Module agents.llm

Sub-modules

agents.llm.chat_context
agents.llm.fallback_llm
agents.llm.llm

Classes

class ChatContext (items: Optional[List[ChatItem]] = None)
Expand source code
class ChatContext:
    """
    Manages a conversation context for LLM interactions.
    """

    def __init__(self, items: Optional[List[ChatItem]] = None):
        """
        Initialize the chat context.

        Args:
            items (Optional[List[ChatItem]]): Initial list of chat items. If None, starts with empty context.
        """
        self._items: List[ChatItem] = items or []

    @classmethod
    def empty(cls) -> ChatContext:
        """
        Create an empty chat context.

        Returns:
            ChatContext: A new empty chat context instance.
        """
        return cls([])

    @property
    def items(self) -> List[ChatItem]:
        """
        Get all items in the context.

        Returns:
            List[ChatItem]: List of all conversation items (messages, function calls, outputs).
        """
        return self._items

    def add_message(
        self,
        role: ChatRole,
        content: Union[str, List[ChatContent]],
        message_id: Optional[str] = None,
        created_at: Optional[float] = None,
        replace: bool = False,
    ) -> ChatMessage:
        """
        Add a new message to the context.

        Args:
            role (ChatRole): The role of the message sender.
            content (Union[str, List[ChatContent]]): The message content as text or content items.
            message_id (Optional[str], optional): Custom message ID. Auto-generated if not provided.
            created_at (Optional[float], optional): Custom creation timestamp. Uses current time if not provided.
            replace (bool, optional): If True and role is SYSTEM, replaces the existing system message. Defaults to False.

        Returns:
            ChatMessage: The newly created and added message.
        """
        if replace and role == ChatRole.SYSTEM:
            self._items = [
                item for item in self._items
                if not (isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM)
            ]

        if isinstance(content, str):
            content = [content]

        message = ChatMessage(
            role=role,
            content=content,
            id=message_id or f"msg_{int(time.time())}",
            created_at=created_at or time.time(),
        )
        self._items.append(message)
        return message

    def add_function_call(
        self,
        name: str,
        arguments: str,
        call_id: Optional[str] = None
    ) -> FunctionCall:
        """
        Add a function call to the context.

        Args:
            name (str): Name of the function to be called.
            arguments (str): JSON string containing the function arguments.
            call_id (Optional[str], optional): Custom call ID. Auto-generated if not provided.

        Returns:
            FunctionCall: The newly created and added function call.
        """
        call = FunctionCall(
            name=name,
            arguments=arguments,
            call_id=call_id or f"call_{int(time.time())}"
        )
        self._items.append(call)
        return call

    def add_function_output(
        self,
        name: str,
        output: str,
        call_id: str,
        is_error: bool = False
    ) -> FunctionCallOutput:
        """
        Add a function output to the context.

        Args:
            name (str): Name of the function that was executed.
            output (str): The result or output from the function execution.
            call_id (str): ID linking this output to the original function call.
            is_error (bool, optional): Flag indicating if the function execution failed. Defaults to False.

        Returns:
            FunctionCallOutput: The newly created and added function output.
        """
        function_output = FunctionCallOutput(
            name=name,
            output=output,
            call_id=call_id,
            is_error=is_error
        )
        self._items.append(function_output)
        return function_output

    def get_by_id(self, item_id: str) -> Optional[ChatItem]:
        """
        Find an item by its ID.

        Args:
            item_id (str): The ID of the item to find.

        Returns:
            Optional[ChatItem]: The found item or None if not found.
        """
        return next(
            (item for item in self._items if item.id == item_id),
            None
        )

    def copy(
        self,
        *,
        exclude_function_calls: bool = False,
        exclude_system_messages: bool = False,
        tools: Optional[List[FunctionTool]] = None,
    ) -> ChatContext:
        """
        Create a filtered copy of the chat context.

        Args:
            exclude_function_calls (bool, optional): Whether to exclude function calls and outputs. Defaults to False.
            exclude_system_messages (bool, optional): Whether to exclude system messages. Defaults to False.
            tools (Optional[List[FunctionTool]], optional): List of available tools to filter function calls by. Defaults to None.

        Returns:
            ChatContext: A new ChatContext instance with the filtered items.
        """
        items = []
        valid_tool_names = {get_tool_info(tool).name for tool in (
            tools or []) if is_function_tool(tool)}

        for item in self._items:
            # Skip function calls if excluded
            if exclude_function_calls and isinstance(item, (FunctionCall, FunctionCallOutput)):
                continue

            # Skip system messages if excluded
            if exclude_system_messages and isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM:
                continue

            # Filter by valid tools if tools are provided
            if tools and isinstance(item, (FunctionCall, FunctionCallOutput)):
                if item.name not in valid_tool_names:
                    continue

            items.append(item)

        return ChatContext(items)

    def truncate(self, max_items: int) -> ChatContext:
        """
        Truncate the context to the last N items while preserving system message.

        Args:
            max_items (int): Maximum number of items to keep in the context.

        Returns:
            ChatContext: The current context instance after truncation.
        """
        system_msg = next(
            (item for item in self._items
             if isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM),
            None
        )

        new_items = self._items[-max_items:]

        while new_items and isinstance(new_items[0], (FunctionCall, FunctionCallOutput)):
            new_items.pop(0)

        if system_msg and system_msg not in new_items:
            new_items.insert(0, system_msg)

        self._items = new_items
        return self

    def to_dict(self) -> dict:
        """
        Convert the context to a dictionary representation.

        Returns:
            dict: Dictionary representation of the chat context.
        """
        return {
            "items": [
                {
                    "type": item.type,
                    "id": item.id,
                    **({"role": item.role.value, "content": item.content}
                       if isinstance(item, ChatMessage) else {}),
                    **({"name": item.name, "arguments": item.arguments, "call_id": item.call_id}
                       if isinstance(item, FunctionCall) else {}),
                    **({"name": item.name, "output": item.output, "call_id": item.call_id, "is_error": item.is_error}
                       if isinstance(item, FunctionCallOutput) else {})
                }
                for item in self._items
            ]
        }

    @classmethod
    def from_dict(cls, data: dict) -> ChatContext:
        """
        Create a ChatContext from a dictionary representation.

        Args:
            data (dict): Dictionary containing the serialized chat context data.

        Returns:
            ChatContext: A new ChatContext instance reconstructed from the data.
        """
        items = []
        for item_data in data["items"]:
            if item_data["type"] == "message":
                items.append(ChatMessage(
                    role=ChatRole(item_data["role"]),
                    content=item_data["content"],
                    id=item_data["id"]
                ))
            elif item_data["type"] == "function_call":
                items.append(FunctionCall(
                    name=item_data["name"],
                    arguments=item_data["arguments"],
                    call_id=item_data["call_id"],
                    id=item_data["id"]
                ))
            elif item_data["type"] == "function_call_output":
                items.append(FunctionCallOutput(
                    name=item_data["name"],
                    output=item_data["output"],
                    call_id=item_data["call_id"],
                    is_error=item_data.get("is_error", False),
                    id=item_data["id"]
                ))
        return cls(items)

    def cleanup(self) -> None:
        """
        Clear all chat context items and references to free memory.
        """
        logger.info(f"Cleaning up ChatContext with {len(self._items)} items")
        for item in self._items:
            if isinstance(item, ChatMessage):
                if isinstance(item.content, list):
                    for content_item in item.content:
                        if isinstance(content_item, ImageContent):
                            content_item.image = None
                item.content = None
            elif isinstance(item, FunctionCall):
                item.arguments = None
            elif isinstance(item, FunctionCallOutput):
                item.output = None
        self._items.clear()
        try:
            import gc
            gc.collect()
            logger.info("ChatContext garbage collection completed")
        except Exception as e:
            logger.error(f"Error during ChatContext garbage collection: {e}")
        
        logger.info("ChatContext cleanup completed")

Manages a conversation context for LLM interactions.

Initialize the chat context.

Args

items : Optional[List[ChatItem]]
Initial list of chat items. If None, starts with empty context.

Static methods

def empty() ‑> ChatContext

Create an empty chat context.

Returns

ChatContext
A new empty chat context instance.
def from_dict(data: dict) ‑> ChatContext

Create a ChatContext from a dictionary representation.

Args

data : dict
Dictionary containing the serialized chat context data.

Returns

ChatContext
A new ChatContext instance reconstructed from the data.

Instance variables

prop items : List[ChatItem]
Expand source code
@property
def items(self) -> List[ChatItem]:
    """
    Get all items in the context.

    Returns:
        List[ChatItem]: List of all conversation items (messages, function calls, outputs).
    """
    return self._items

Get all items in the context.

Returns

List[ChatItem]
List of all conversation items (messages, function calls, outputs).

Methods

def add_function_call(self, name: str, arguments: str, call_id: Optional[str] = None) ‑> FunctionCall
Expand source code
def add_function_call(
    self,
    name: str,
    arguments: str,
    call_id: Optional[str] = None
) -> FunctionCall:
    """
    Add a function call to the context.

    Args:
        name (str): Name of the function to be called.
        arguments (str): JSON string containing the function arguments.
        call_id (Optional[str], optional): Custom call ID. Auto-generated if not provided.

    Returns:
        FunctionCall: The newly created and added function call.
    """
    call = FunctionCall(
        name=name,
        arguments=arguments,
        call_id=call_id or f"call_{int(time.time())}"
    )
    self._items.append(call)
    return call

Add a function call to the context.

Args

name : str
Name of the function to be called.
arguments : str
JSON string containing the function arguments.
call_id : Optional[str], optional
Custom call ID. Auto-generated if not provided.

Returns

FunctionCall
The newly created and added function call.
def add_function_output(self, name: str, output: str, call_id: str, is_error: bool = False) ‑> FunctionCallOutput
Expand source code
def add_function_output(
    self,
    name: str,
    output: str,
    call_id: str,
    is_error: bool = False
) -> FunctionCallOutput:
    """
    Add a function output to the context.

    Args:
        name (str): Name of the function that was executed.
        output (str): The result or output from the function execution.
        call_id (str): ID linking this output to the original function call.
        is_error (bool, optional): Flag indicating if the function execution failed. Defaults to False.

    Returns:
        FunctionCallOutput: The newly created and added function output.
    """
    function_output = FunctionCallOutput(
        name=name,
        output=output,
        call_id=call_id,
        is_error=is_error
    )
    self._items.append(function_output)
    return function_output

Add a function output to the context.

Args

name : str
Name of the function that was executed.
output : str
The result or output from the function execution.
call_id : str
ID linking this output to the original function call.
is_error : bool, optional
Flag indicating if the function execution failed. Defaults to False.

Returns

FunctionCallOutput
The newly created and added function output.
def add_message(self,
role: ChatRole,
content: Union[str, List[ChatContent]],
message_id: Optional[str] = None,
created_at: Optional[float] = None,
replace: bool = False) ‑> ChatMessage
Expand source code
def add_message(
    self,
    role: ChatRole,
    content: Union[str, List[ChatContent]],
    message_id: Optional[str] = None,
    created_at: Optional[float] = None,
    replace: bool = False,
) -> ChatMessage:
    """
    Add a new message to the context.

    Args:
        role (ChatRole): The role of the message sender.
        content (Union[str, List[ChatContent]]): The message content as text or content items.
        message_id (Optional[str], optional): Custom message ID. Auto-generated if not provided.
        created_at (Optional[float], optional): Custom creation timestamp. Uses current time if not provided.
        replace (bool, optional): If True and role is SYSTEM, replaces the existing system message. Defaults to False.

    Returns:
        ChatMessage: The newly created and added message.
    """
    if replace and role == ChatRole.SYSTEM:
        self._items = [
            item for item in self._items
            if not (isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM)
        ]

    if isinstance(content, str):
        content = [content]

    message = ChatMessage(
        role=role,
        content=content,
        id=message_id or f"msg_{int(time.time())}",
        created_at=created_at or time.time(),
    )
    self._items.append(message)
    return message

Add a new message to the context.

Args

role : ChatRole
The role of the message sender.
content : Union[str, List[ChatContent]]
The message content as text or content items.
message_id : Optional[str], optional
Custom message ID. Auto-generated if not provided.
created_at : Optional[float], optional
Custom creation timestamp. Uses current time if not provided.
replace : bool, optional
If True and role is SYSTEM, replaces the existing system message. Defaults to False.

Returns

ChatMessage
The newly created and added message.
def cleanup(self) ‑> None
Expand source code
def cleanup(self) -> None:
    """
    Clear all chat context items and references to free memory.
    """
    logger.info(f"Cleaning up ChatContext with {len(self._items)} items")
    for item in self._items:
        if isinstance(item, ChatMessage):
            if isinstance(item.content, list):
                for content_item in item.content:
                    if isinstance(content_item, ImageContent):
                        content_item.image = None
            item.content = None
        elif isinstance(item, FunctionCall):
            item.arguments = None
        elif isinstance(item, FunctionCallOutput):
            item.output = None
    self._items.clear()
    try:
        import gc
        gc.collect()
        logger.info("ChatContext garbage collection completed")
    except Exception as e:
        logger.error(f"Error during ChatContext garbage collection: {e}")
    
    logger.info("ChatContext cleanup completed")

Clear all chat context items and references to free memory.

def copy(self,
*,
exclude_function_calls: bool = False,
exclude_system_messages: bool = False,
tools: Optional[List[FunctionTool]] = None) ‑> ChatContext
Expand source code
def copy(
    self,
    *,
    exclude_function_calls: bool = False,
    exclude_system_messages: bool = False,
    tools: Optional[List[FunctionTool]] = None,
) -> ChatContext:
    """
    Create a filtered copy of the chat context.

    Args:
        exclude_function_calls (bool, optional): Whether to exclude function calls and outputs. Defaults to False.
        exclude_system_messages (bool, optional): Whether to exclude system messages. Defaults to False.
        tools (Optional[List[FunctionTool]], optional): List of available tools to filter function calls by. Defaults to None.

    Returns:
        ChatContext: A new ChatContext instance with the filtered items.
    """
    items = []
    valid_tool_names = {get_tool_info(tool).name for tool in (
        tools or []) if is_function_tool(tool)}

    for item in self._items:
        # Skip function calls if excluded
        if exclude_function_calls and isinstance(item, (FunctionCall, FunctionCallOutput)):
            continue

        # Skip system messages if excluded
        if exclude_system_messages and isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM:
            continue

        # Filter by valid tools if tools are provided
        if tools and isinstance(item, (FunctionCall, FunctionCallOutput)):
            if item.name not in valid_tool_names:
                continue

        items.append(item)

    return ChatContext(items)

Create a filtered copy of the chat context.

Args

exclude_function_calls : bool, optional
Whether to exclude function calls and outputs. Defaults to False.
exclude_system_messages : bool, optional
Whether to exclude system messages. Defaults to False.
tools : Optional[List[FunctionTool]], optional
List of available tools to filter function calls by. Defaults to None.

Returns

ChatContext
A new ChatContext instance with the filtered items.
def get_by_id(self, item_id: str) ‑> ChatMessage | FunctionCall | FunctionCallOutput | None
Expand source code
def get_by_id(self, item_id: str) -> Optional[ChatItem]:
    """
    Find an item by its ID.

    Args:
        item_id (str): The ID of the item to find.

    Returns:
        Optional[ChatItem]: The found item or None if not found.
    """
    return next(
        (item for item in self._items if item.id == item_id),
        None
    )

Find an item by its ID.

Args

item_id : str
The ID of the item to find.

Returns

Optional[ChatItem]
The found item or None if not found.
def to_dict(self) ‑> dict
Expand source code
def to_dict(self) -> dict:
    """
    Convert the context to a dictionary representation.

    Returns:
        dict: Dictionary representation of the chat context.
    """
    return {
        "items": [
            {
                "type": item.type,
                "id": item.id,
                **({"role": item.role.value, "content": item.content}
                   if isinstance(item, ChatMessage) else {}),
                **({"name": item.name, "arguments": item.arguments, "call_id": item.call_id}
                   if isinstance(item, FunctionCall) else {}),
                **({"name": item.name, "output": item.output, "call_id": item.call_id, "is_error": item.is_error}
                   if isinstance(item, FunctionCallOutput) else {})
            }
            for item in self._items
        ]
    }

Convert the context to a dictionary representation.

Returns

dict
Dictionary representation of the chat context.
def truncate(self, max_items: int) ‑> ChatContext
Expand source code
def truncate(self, max_items: int) -> ChatContext:
    """
    Truncate the context to the last N items while preserving system message.

    Args:
        max_items (int): Maximum number of items to keep in the context.

    Returns:
        ChatContext: The current context instance after truncation.
    """
    system_msg = next(
        (item for item in self._items
         if isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM),
        None
    )

    new_items = self._items[-max_items:]

    while new_items and isinstance(new_items[0], (FunctionCall, FunctionCallOutput)):
        new_items.pop(0)

    if system_msg and system_msg not in new_items:
        new_items.insert(0, system_msg)

    self._items = new_items
    return self

Truncate the context to the last N items while preserving system message.

Args

max_items : int
Maximum number of items to keep in the context.

Returns

ChatContext
The current context instance after truncation.
class ChatMessage (**data: Any)
Expand source code
class ChatMessage(BaseModel):
    """
    Represents a single message in the chat context.

    Attributes:
        role (ChatRole): The role of the message sender (system, user, or assistant).
        content (Union[str, List[ChatContent]]): The message content as text or list of content items.
        id (str): Unique identifier for the message. Auto-generated if not provided.
        type (Literal["message"]): Type identifier, always "message".
        created_at (float): Unix timestamp when the message was created.
        interrupted (bool): Flag indicating if the message was interrupted during generation.
    """
    role: ChatRole
    content: Union[str, List[ChatContent]]
    id: str = Field(default_factory=lambda: f"msg_{int(time.time())}")
    type: Literal["message"] = "message"
    created_at: float = Field(default_factory=time.time)
    interrupted: bool = False

Represents a single message in the chat context.

Attributes

role : ChatRole
The role of the message sender (system, user, or assistant).
content : Union[str, List[ChatContent]]
The message content as text or list of content items.
id : str
Unique identifier for the message. Auto-generated if not provided.
type (Literal["message"]): Type identifier, always "message".
created_at : float
Unix timestamp when the message was created.
interrupted : bool
Flag indicating if the message was interrupted during generation.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var content : str | List[str | ImageContent]
var created_at : float
var id : str
var interrupted : bool
var model_config
var roleChatRole
var type : Literal['message']
class ChatRole (*args, **kwds)
Expand source code
class ChatRole(str, Enum):
    """
    Enumeration of chat roles for message classification.

    Defines the three standard roles used in chat conversations:
    - SYSTEM: Instructions and context for the AI assistant
    - USER: Messages from the human user
    - ASSISTANT: Responses from the AI assistant
    """
    SYSTEM = "system"
    USER = "user"
    ASSISTANT = "assistant"

Enumeration of chat roles for message classification.

Defines the three standard roles used in chat conversations: - SYSTEM: Instructions and context for the AI assistant - USER: Messages from the human user - ASSISTANT: Responses from the AI assistant

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var ASSISTANT
var SYSTEM
var USER
class ConversationalGraphResponse (**data: Any)
Expand source code
class ConversationalGraphResponse(BaseModel):
    """ Data model to hold Conversational Graph response data."""
    
    response_to_user:str = Field(..., description="Response to the user by agent")
    extracted_values:List[ExtractedField] = Field(default_factory=list, description="List of extracted values from the user input")
    move_forward:bool = Field(False, description="If we want to Move forward to the next state")
    reasoning:str = Field("", description="Reasoning for the response")
    chosen_branch:str = Field(None, description="Chosen branch for the move forward")
    is_off_topic:bool = Field(False, description="Is the user input off topic")
    backtrack_to_state:str = Field(None, description="Backtrack to the state")
    current_state_id:str = Field(None, description="exact state_id of current state")

Data model to hold Conversational Graph response data.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var backtrack_to_state : str
var chosen_branch : str
var current_state_id : str
var extracted_values : List[ExtractedField]
var is_off_topic : bool
var model_config
var move_forward : bool
var reasoning : str
var response_to_user : str
class FallbackLLM (providers: List[LLM],
temporary_disable_sec: float = 60.0,
permanent_disable_after_attempts: int = 3)
Expand source code
class FallbackLLM(LLM, FallbackBase):
    def __init__(self, providers: List[LLM], temporary_disable_sec: float = 60.0, permanent_disable_after_attempts: int = 3):
        LLM.__init__(self)
        FallbackBase.__init__(self, providers, "LLM", temporary_disable_sec=temporary_disable_sec, permanent_disable_after_attempts=permanent_disable_after_attempts)
        self._setup_event_listeners()

    def _setup_event_listeners(self):
        self.active_provider.on("error", self._on_provider_error)

    def _on_provider_error(self, error_msg):
        failed_p = self.active_provider
        asyncio.create_task(self._handle_async_error(str(error_msg), failed_p))

    async def _handle_async_error(self, error_msg: str, failed_provider: Any):
        switched = await self._switch_provider(f"Async Error: {error_msg}", failed_provider=failed_provider)
        if not switched:
            self.emit("error", error_msg)

    async def _switch_provider(self, reason: str, failed_provider: Any = None):
        provider_to_cleanup = failed_provider if failed_provider else self.active_provider
        try:
            provider_to_cleanup.off("error", self._on_provider_error)
        except: pass

        active_before = self.active_provider
        switched = await super()._switch_provider(reason, failed_provider)
        active_after = self.active_provider
        
        if switched:
            if active_before != active_after:
                self.active_provider.on("error", self._on_provider_error)
            return True
        return False

    async def chat(self, messages: ChatContext, **kwargs) -> AsyncIterator[LLMResponse]:
        """
        Attempts to chat with current provider. 
        Loops until one succeeds or all fail.
        Checks for recovery of primary providers before starting.
        """
        self.check_recovery()

        while True:
            current_provider = self.active_provider
            try:
                async for chunk in current_provider.chat(messages, **kwargs):
                    yield chunk
                return 
            except Exception as e:
                switched = await self._switch_provider(str(e), failed_provider=current_provider)
                if not switched:
                    raise e

    async def cancel_current_generation(self) -> None:
        await self.active_provider.cancel_current_generation()

    async def aclose(self) -> None:
        for p in self.providers:
            await p.aclose()
        await super().aclose()

Base class for LLM implementations.

Initialize the LLM base class.

Ancestors

Methods

async def chat(self,
messages: ChatContext,
**kwargs) ‑> AsyncIterator[LLMResponse]
Expand source code
async def chat(self, messages: ChatContext, **kwargs) -> AsyncIterator[LLMResponse]:
    """
    Attempts to chat with current provider. 
    Loops until one succeeds or all fail.
    Checks for recovery of primary providers before starting.
    """
    self.check_recovery()

    while True:
        current_provider = self.active_provider
        try:
            async for chunk in current_provider.chat(messages, **kwargs):
                yield chunk
            return 
        except Exception as e:
            switched = await self._switch_provider(str(e), failed_provider=current_provider)
            if not switched:
                raise e

Attempts to chat with current provider. Loops until one succeeds or all fail. Checks for recovery of primary providers before starting.

Inherited members

class FunctionCall (**data: Any)
Expand source code
class FunctionCall(BaseModel):
    """
    Represents a function call in the chat context.


    Attributes:
        id (str): Unique identifier for the function call. Auto-generated if not provided.
        type (Literal["function_call"]): Type identifier, always "function_call".
        name (str): Name of the function to be called.
        arguments (str): JSON string containing the function arguments.
        call_id (str): Unique identifier linking this call to its output.
    """
    id: str = Field(default_factory=lambda: f"call_{int(time.time())}")
    type: Literal["function_call"] = "function_call"
    name: str
    arguments: str
    call_id: str

Represents a function call in the chat context.

Attributes

id : str
Unique identifier for the function call. Auto-generated if not provided.
type (Literal["function_call"]): Type identifier, always "function_call".
name : str
Name of the function to be called.
arguments : str
JSON string containing the function arguments.
call_id : str
Unique identifier linking this call to its output.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var arguments : str
var call_id : str
var id : str
var model_config
var name : str
var type : Literal['function_call']
class FunctionCallOutput (**data: Any)
Expand source code
class FunctionCallOutput(BaseModel):
    """
    Represents the output of a function call.

    Attributes:
        id (str): Unique identifier for the function output. Auto-generated if not provided.
        type (Literal["function_call_output"]): Type identifier, always "function_call_output".
        name (str): Name of the function that was called.
        call_id (str): Identifier linking this output to the original function call.
        output (str): The result or output from the function execution.
        is_error (bool): Flag indicating if the function execution failed.
    """
    id: str = Field(default_factory=lambda: f"output_{int(time.time())}")
    type: Literal["function_call_output"] = "function_call_output"
    name: str
    call_id: str
    output: str
    is_error: bool = False

Represents the output of a function call.

Attributes

id : str
Unique identifier for the function output. Auto-generated if not provided.
type (Literal["function_call_output"]): Type identifier, always "function_call_output".
name : str
Name of the function that was called.
call_id : str
Identifier linking this output to the original function call.
output : str
The result or output from the function execution.
is_error : bool
Flag indicating if the function execution failed.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var call_id : str
var id : str
var is_error : bool
var model_config
var name : str
var output : str
var type : Literal['function_call_output']
class ImageContent (**data: Any)
Expand source code
class ImageContent(BaseModel):
    """
    Represents image content in chat messages.

    Attributes:
        id (str): Unique identifier for the image. Auto-generated if not provided.
        type (Literal["image"]): Type identifier, always "image".
        image (Union[av.VideoFrame, str]): The image data as VideoFrame or URL string.
        inference_detail (Literal["auto", "high", "low"]): Detail level for LLM image analysis.
        encode_options (EncodeOptions): Configuration for image encoding and resizing.
    """
    id: str = Field(default_factory=lambda: f"img_{int(time.time())}")
    type: Literal["image"] = "image"
    image: Union[av.VideoFrame, str]
    inference_detail: Literal["auto", "high", "low"] = "auto"
    encode_options: EncodeOptions = Field(
        default_factory=lambda: EncodeOptions(
            format="JPEG",
            quality=90,
            resize_options=ResizeOptions(
                width=320, height=240
            ),
        )
    )

    class Config:
        arbitrary_types_allowed = True

    def to_data_url(self) -> str:
        """
        Convert the image to a data URL format.

        Returns:
            str: A data URL string representing the image.
        """
        if isinstance(self.image, str):
            return self.image

        encoded_image = images.encode(self.image, self.encode_options)
        b64_image = base64.b64encode(encoded_image).decode("utf-8")
        return f"data:image/{self.encode_options.format.lower()};base64,{b64_image}"

Represents image content in chat messages.

Attributes

id : str
Unique identifier for the image. Auto-generated if not provided.
type (Literal["image"]): Type identifier, always "image".
image : Union[av.VideoFrame, str]
The image data as VideoFrame or URL string.
inference_detail (Literal["auto", "high", "low"]): Detail level for LLM image analysis.
encode_options : EncodeOptions
Configuration for image encoding and resizing.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var Config
var encode_optionsEncodeOptions
var id : str
var image : av.video.frame.VideoFrame | str
var inference_detail : Literal['auto', 'high', 'low']
var model_config
var type : Literal['image']

Methods

def to_data_url(self) ‑> str
Expand source code
def to_data_url(self) -> str:
    """
    Convert the image to a data URL format.

    Returns:
        str: A data URL string representing the image.
    """
    if isinstance(self.image, str):
        return self.image

    encoded_image = images.encode(self.image, self.encode_options)
    b64_image = base64.b64encode(encoded_image).decode("utf-8")
    return f"data:image/{self.encode_options.format.lower()};base64,{b64_image}"

Convert the image to a data URL format.

Returns

str
A data URL string representing the image.
class LLM
Expand source code
class LLM(EventEmitter[Literal["error"]]):
    """
    Base class for LLM implementations.
    """

    def __init__(self) -> None:
        """
        Initialize the LLM base class.
        """
        super().__init__()
        self._label = f"{type(self).__module__}.{type(self).__name__}"

    @property
    def label(self) -> str:
        """
        Get the LLM provider label.

        Returns:
            str: A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM").
        """
        return self._label

    @abstractmethod
    async def chat(
        self,
        messages: ChatContext,
        tools: list[FunctionTool] | None = None,
        **kwargs: Any
    ) -> AsyncIterator[LLMResponse]:
        """
        Main method to interact with the LLM.

        Args:
            messages (ChatContext): The conversation context containing message history.
            tools (list[FunctionTool] | None, optional): List of available function tools for the LLM to use.
            **kwargs (Any): Additional arguments specific to the LLM provider implementation.

        Returns:
            AsyncIterator[LLMResponse]: An async iterator yielding LLMResponse objects as they're generated.

        Raises:
            NotImplementedError: This method must be implemented by subclasses.
        """
        raise NotImplementedError

    @abstractmethod
    async def cancel_current_generation(self) -> None:
        """
        Cancel the current LLM generation if active.

        Raises:
            NotImplementedError: This method must be implemented by subclasses.
        """
        # override in subclasses
        pass

    async def aclose(self) -> None:
        """
        Cleanup resources.
        """
        logger.info(f"Cleaning up LLM: {self.label}")
        
        await self.cancel_current_generation()
        
        try:
            import gc
            gc.collect()
            logger.info(f"LLM garbage collection completed: {self.label}")
        except Exception as e:
            logger.error(f"Error during LLM garbage collection: {e}")
        
        logger.info(f"LLM cleanup completed: {self.label}")

    async def __aenter__(self) -> LLM:
        """
        Async context manager entry point.
        """
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """
        Async context manager exit point.
        """
        await self.aclose()

Base class for LLM implementations.

Initialize the LLM base class.

Ancestors

Subclasses

Instance variables

prop label : str
Expand source code
@property
def label(self) -> str:
    """
    Get the LLM provider label.

    Returns:
        str: A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM").
    """
    return self._label

Get the LLM provider label.

Returns

str
A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM").

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """
    Cleanup resources.
    """
    logger.info(f"Cleaning up LLM: {self.label}")
    
    await self.cancel_current_generation()
    
    try:
        import gc
        gc.collect()
        logger.info(f"LLM garbage collection completed: {self.label}")
    except Exception as e:
        logger.error(f"Error during LLM garbage collection: {e}")
    
    logger.info(f"LLM cleanup completed: {self.label}")

Cleanup resources.

async def cancel_current_generation(self) ‑> None
Expand source code
@abstractmethod
async def cancel_current_generation(self) -> None:
    """
    Cancel the current LLM generation if active.

    Raises:
        NotImplementedError: This method must be implemented by subclasses.
    """
    # override in subclasses
    pass

Cancel the current LLM generation if active.

Raises

NotImplementedError
This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
**kwargs: Any) ‑> AsyncIterator[LLMResponse]
Expand source code
@abstractmethod
async def chat(
    self,
    messages: ChatContext,
    tools: list[FunctionTool] | None = None,
    **kwargs: Any
) -> AsyncIterator[LLMResponse]:
    """
    Main method to interact with the LLM.

    Args:
        messages (ChatContext): The conversation context containing message history.
        tools (list[FunctionTool] | None, optional): List of available function tools for the LLM to use.
        **kwargs (Any): Additional arguments specific to the LLM provider implementation.

    Returns:
        AsyncIterator[LLMResponse]: An async iterator yielding LLMResponse objects as they're generated.

    Raises:
        NotImplementedError: This method must be implemented by subclasses.
    """
    raise NotImplementedError

Main method to interact with the LLM.

Args

messages : ChatContext
The conversation context containing message history.
tools : list[FunctionTool] | None, optional
List of available function tools for the LLM to use.
**kwargs : Any
Additional arguments specific to the LLM provider implementation.

Returns

AsyncIterator[LLMResponse]
An async iterator yielding LLMResponse objects as they're generated.

Raises

NotImplementedError
This method must be implemented by subclasses.
class LLMResponse (**data: Any)
Expand source code
class LLMResponse(BaseModel):
    """
    Data model to hold LLM response data.

    Attributes:
        content (str): The text content generated by the LLM.
        role (ChatRole): The role of the response (typically ASSISTANT).
        metadata (Optional[dict[str, Any]]): Additional response metadata from the LLM provider.
    """
    content: str
    role: ChatRole
    metadata: Optional[dict[str, Any]] = None

Data model to hold LLM response data.

Attributes

content : str
The text content generated by the LLM.
role : ChatRole
The role of the response (typically ASSISTANT).
metadata : Optional[dict[str, Any]]
Additional response metadata from the LLM provider.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var content : str
var metadata : dict[str, typing.Any] | None
var model_config
var roleChatRole
class ResponseChunk (content: str, metadata: dict[str, Any] | None = None, role: str | None = None)
Expand source code
class ResponseChunk(str):
    def __new__(cls, content: str, metadata: dict[str, Any] | None = None, role: str | None = None):
        obj = super().__new__(cls, content or "")
        obj.metadata = metadata
        obj.role = role
        return obj

    @property
    def content(self) -> str:
        return str(self)

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Ancestors

  • builtins.str

Instance variables

prop content : str
Expand source code
@property
def content(self) -> str:
    return str(self)