Package videosdk.plugins.anthropic

Sub-modules

videosdk.plugins.anthropic.llm

Classes

class AnthropicLLM (*,
api_key: str | None = None,
model: str = 'claude-sonnet-4-20250514',
base_url: str | None = None,
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_tokens: int = 1024,
top_k: int | None = None,
top_p: float | None = None)
Expand source code
class AnthropicLLM(LLM):

    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "claude-sonnet-4-20250514",
        base_url: str | None = None,
        temperature: float = 0.7,
        tool_choice: ToolChoice = "auto",
        max_tokens: int = 1024,
        top_k: int | None = None,
        top_p: float | None = None,
    ) -> None:
        """Initialize the Anthropic LLM

        Args:
            api_key (str | None, optional): Anthropic API key. Uses ANTHROPIC_API_KEY environment variable if not provided. Defaults to None.
            model (str): The anthropic model to use for the LLM, e.g. "claude-sonnet-4-20250514". Defaults to "claude-sonnet-4-20250514".
            base_url (str | None, optional): The base URL to use for the LLM. Defaults to None.
            temperature (float): The temperature to use for the LLM, e.g. 0.7. Defaults to 0.7.
            tool_choice (ToolChoice): The tool choice to use for the LLM, e.g. "auto". Defaults to "auto".
            max_tokens (int): The maximum number of tokens to use for the LLM, e.g. 1024. Defaults to 1024.
            top_k (int | None, optional): The top K to use for the LLM. Defaults to None.
            top_p (float | None, optional): The top P to use for the LLM. Defaults to None.
        """
        super().__init__()

        self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Anthropic API key must be provided either through api_key parameter or ANTHROPIC_API_KEY environment variable")

        self.model = model
        self.temperature = temperature
        self.tool_choice = tool_choice
        self.max_tokens = max_tokens
        self.top_k = top_k
        self.top_p = top_p
        self._cancelled = False

        self._client = anthropic.AsyncClient(
            api_key=self.api_key,
            base_url=base_url or None,
            http_client=httpx.AsyncClient(
                timeout=httpx.Timeout(
                    connect=15.0, read=5.0, write=5.0, pool=5.0),
                follow_redirects=True,
                limits=httpx.Limits(
                    max_connections=50,
                    max_keepalive_connections=50,
                    keepalive_expiry=120,
                ),
            ),
        )

    async def chat(
        self,
        messages: ChatContext,
        tools: list[FunctionTool] | None = None,
        **kwargs: Any
    ) -> AsyncIterator[LLMResponse]:
        """
        Implement chat functionality using Anthropic's Claude API

        Args:
            messages: ChatContext containing conversation history
            tools: Optional list of function tools available to the model
            **kwargs: Additional arguments passed to the Anthropic API

        Yields:
            LLMResponse objects containing the model's responses
        """
        self._cancelled = False

        try:
            anthropic_messages, system_content = self._convert_messages_to_anthropic_format(
                messages)
            completion_params = {
                "model": self.model,
                "messages": anthropic_messages,
                "max_tokens": self.max_tokens,
                "temperature": self.temperature,
                "stream": True,
            }

            if system_content:
                completion_params["system"] = system_content

            if self.top_k is not None:
                completion_params["top_k"] = self.top_k
            if self.top_p is not None:
                completion_params["top_p"] = self.top_p

            if tools:
                formatted_tools = []
                seen_tool_names = set()

                for tool in tools:
                    if not is_function_tool(tool):
                        continue
                    try:
                        openai_schema = build_openai_schema(tool)
                        tool_name = openai_schema["name"]

                        if tool_name in seen_tool_names:
                            continue

                        seen_tool_names.add(tool_name)
                        anthropic_tool = {
                            "name": tool_name,
                            "description": openai_schema["description"],
                            "input_schema": openai_schema["parameters"]
                        }
                        formatted_tools.append(anthropic_tool)
                    except Exception as e:
                        self.emit(
                            "error", f"Failed to format tool {tool}: {e}")
                        continue

                if formatted_tools:
                    completion_params["tools"] = formatted_tools

                    if self.tool_choice == "required":
                        completion_params["tool_choice"] = {"type": "any"}
                    elif self.tool_choice == "auto":
                        completion_params["tool_choice"] = {"type": "auto"}
                    elif self.tool_choice == "none":
                        del completion_params["tools"]

            completion_params.update(kwargs)

            response_stream = await self._client.messages.create(**completion_params)

            current_content = ""
            current_tool_call = None
            current_tool_call_id = None
            current_tool_arguments = ""

            async for event in response_stream:
                if self._cancelled:
                    break

                if event.type == "content_block_start":
                    if event.content_block.type == "tool_use":
                        current_tool_call_id = event.content_block.id
                        current_tool_call = {
                            "name": event.content_block.name,
                            "arguments": ""
                        }
                        current_tool_arguments = ""

                elif event.type == "content_block_delta":
                    delta = event.delta
                    if delta.type == "text_delta":
                        current_content = delta.text
                        yield LLMResponse(
                            content=current_content,
                            role=ChatRole.ASSISTANT
                        )
                    elif delta.type == "input_json_delta":
                        if current_tool_call:
                            current_tool_arguments += delta.partial_json

                elif event.type == "content_block_stop":
                    if current_tool_call and current_tool_call_id:
                        try:
                            parsed_args = json.loads(
                                current_tool_arguments) if current_tool_arguments else {}
                            current_tool_call["arguments"] = parsed_args
                        except json.JSONDecodeError:
                            current_tool_call["arguments"] = {}

                        yield LLMResponse(
                            content="",
                            role=ChatRole.ASSISTANT,
                            metadata={
                                "function_call": {
                                    "id": current_tool_call_id,
                                    "name": current_tool_call["name"],
                                    "arguments": current_tool_call["arguments"],
                                    "call_id": current_tool_call_id
                                }
                            }
                        )
                        current_tool_call = None
                        current_tool_call_id = None
                        current_tool_arguments = ""

        except anthropic.APIError as e:
            if not self._cancelled:
                self.emit("error", e)
            raise
        except Exception as e:
            if not self._cancelled:
                self.emit("error", e)
            raise

    async def cancel_current_generation(self) -> None:
        self._cancelled = True

    def _convert_messages_to_anthropic_format(self, messages: ChatContext) -> tuple[list[dict], str | None]:
        """Internal Method: Convert ChatContext to Anthropic message format"""

        def _format_content(content: Union[str, List[ChatContent]]):
            if isinstance(content, str):
                return content

            has_images = any(isinstance(p, ImageContent) for p in content)

            if not has_images and len(content) == 1 and isinstance(content[0], str):
                return content[0]

            formatted_parts = []
            image_parts = [p for p in content if isinstance(p, ImageContent)]
            text_parts = [p for p in content if isinstance(p, str)]

            for part in image_parts:
                data_url = part.to_data_url()

                if data_url.startswith("data:"):
                    header, b64_data = data_url.split(",", 1)
                    media_type = header.split(";")[0].split(":")[1]
                    formatted_parts.append(
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": media_type,
                                "data": b64_data,
                            },
                        }
                    )
                else:
                    formatted_parts.append(
                        {
                            "type": "image",
                            "source": {"type": "url", "url": data_url},
                        }
                    )

            for part in text_parts:
                formatted_parts.append({"type": "text", "text": part})

            return formatted_parts

        anthropic_messages = []
        system_content = None

        for item in messages.items:
            if isinstance(item, ChatMessage):
                if item.role == ChatRole.SYSTEM:
                    if isinstance(item.content, list):
                        system_content = next(
                            (str(p)
                             for p in item.content if isinstance(p, str)), ""
                        )
                    else:
                        system_content = str(item.content)
                    continue
                else:
                    anthropic_messages.append(
                        {"role": item.role.value,
                            "content": _format_content(item.content)}
                    )
            elif isinstance(item, FunctionCall):
                anthropic_messages.append({
                    "role": "assistant",
                    "content": [
                        {
                            "type": "tool_use",
                            "id": item.call_id,
                            "name": item.name,
                            "input": json.loads(item.arguments) if isinstance(item.arguments, str) else item.arguments
                        }
                    ]
                })
            elif isinstance(item, FunctionCallOutput):
                anthropic_messages.append({
                    "role": "user",
                    "content": [
                        {
                            "type": "tool_result",
                            "tool_use_id": item.call_id,
                            "content": item.output,
                            "is_error": item.is_error
                        }
                    ]
                })

        return anthropic_messages, system_content

    async def aclose(self) -> None:
        """Internal Method: Cleanup resources by closing the HTTP client"""
        await self.cancel_current_generation()
        if self._client:
            await self._client.close()

Base class for LLM implementations.

Initialize the Anthropic LLM

Args

api_key : str | None, optional
Anthropic API key. Uses ANTHROPIC_API_KEY environment variable if not provided. Defaults to None.
model : str
The anthropic model to use for the LLM, e.g. "claude-sonnet-4-20250514". Defaults to "claude-sonnet-4-20250514".
base_url : str | None, optional
The base URL to use for the LLM. Defaults to None.
temperature : float
The temperature to use for the LLM, e.g. 0.7. Defaults to 0.7.
tool_choice : ToolChoice
The tool choice to use for the LLM, e.g. "auto". Defaults to "auto".
max_tokens : int
The maximum number of tokens to use for the LLM, e.g. 1024. Defaults to 1024.
top_k : int | None, optional
The top K to use for the LLM. Defaults to None.
top_p : float | None, optional
The top P to use for the LLM. Defaults to None.

Ancestors

  • videosdk.agents.llm.llm.LLM
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Internal Method: Cleanup resources by closing the HTTP client"""
    await self.cancel_current_generation()
    if self._client:
        await self._client.close()

Internal Method: Cleanup resources by closing the HTTP client

async def cancel_current_generation(self) ‑> None
Expand source code
async def cancel_current_generation(self) -> None:
    self._cancelled = True

Cancel the current LLM generation if active.

Raises

NotImplementedError
This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]
Expand source code
async def chat(
    self,
    messages: ChatContext,
    tools: list[FunctionTool] | None = None,
    **kwargs: Any
) -> AsyncIterator[LLMResponse]:
    """
    Implement chat functionality using Anthropic's Claude API

    Args:
        messages: ChatContext containing conversation history
        tools: Optional list of function tools available to the model
        **kwargs: Additional arguments passed to the Anthropic API

    Yields:
        LLMResponse objects containing the model's responses
    """
    self._cancelled = False

    try:
        anthropic_messages, system_content = self._convert_messages_to_anthropic_format(
            messages)
        completion_params = {
            "model": self.model,
            "messages": anthropic_messages,
            "max_tokens": self.max_tokens,
            "temperature": self.temperature,
            "stream": True,
        }

        if system_content:
            completion_params["system"] = system_content

        if self.top_k is not None:
            completion_params["top_k"] = self.top_k
        if self.top_p is not None:
            completion_params["top_p"] = self.top_p

        if tools:
            formatted_tools = []
            seen_tool_names = set()

            for tool in tools:
                if not is_function_tool(tool):
                    continue
                try:
                    openai_schema = build_openai_schema(tool)
                    tool_name = openai_schema["name"]

                    if tool_name in seen_tool_names:
                        continue

                    seen_tool_names.add(tool_name)
                    anthropic_tool = {
                        "name": tool_name,
                        "description": openai_schema["description"],
                        "input_schema": openai_schema["parameters"]
                    }
                    formatted_tools.append(anthropic_tool)
                except Exception as e:
                    self.emit(
                        "error", f"Failed to format tool {tool}: {e}")
                    continue

            if formatted_tools:
                completion_params["tools"] = formatted_tools

                if self.tool_choice == "required":
                    completion_params["tool_choice"] = {"type": "any"}
                elif self.tool_choice == "auto":
                    completion_params["tool_choice"] = {"type": "auto"}
                elif self.tool_choice == "none":
                    del completion_params["tools"]

        completion_params.update(kwargs)

        response_stream = await self._client.messages.create(**completion_params)

        current_content = ""
        current_tool_call = None
        current_tool_call_id = None
        current_tool_arguments = ""

        async for event in response_stream:
            if self._cancelled:
                break

            if event.type == "content_block_start":
                if event.content_block.type == "tool_use":
                    current_tool_call_id = event.content_block.id
                    current_tool_call = {
                        "name": event.content_block.name,
                        "arguments": ""
                    }
                    current_tool_arguments = ""

            elif event.type == "content_block_delta":
                delta = event.delta
                if delta.type == "text_delta":
                    current_content = delta.text
                    yield LLMResponse(
                        content=current_content,
                        role=ChatRole.ASSISTANT
                    )
                elif delta.type == "input_json_delta":
                    if current_tool_call:
                        current_tool_arguments += delta.partial_json

            elif event.type == "content_block_stop":
                if current_tool_call and current_tool_call_id:
                    try:
                        parsed_args = json.loads(
                            current_tool_arguments) if current_tool_arguments else {}
                        current_tool_call["arguments"] = parsed_args
                    except json.JSONDecodeError:
                        current_tool_call["arguments"] = {}

                    yield LLMResponse(
                        content="",
                        role=ChatRole.ASSISTANT,
                        metadata={
                            "function_call": {
                                "id": current_tool_call_id,
                                "name": current_tool_call["name"],
                                "arguments": current_tool_call["arguments"],
                                "call_id": current_tool_call_id
                            }
                        }
                    )
                    current_tool_call = None
                    current_tool_call_id = None
                    current_tool_arguments = ""

    except anthropic.APIError as e:
        if not self._cancelled:
            self.emit("error", e)
        raise
    except Exception as e:
        if not self._cancelled:
            self.emit("error", e)
        raise

Implement chat functionality using Anthropic's Claude API

Args

messages
ChatContext containing conversation history
tools
Optional list of function tools available to the model
**kwargs
Additional arguments passed to the Anthropic API

Yields

LLMResponse objects containing the model's responses