Package videosdk.plugins.anthropic
Sub-modules
videosdk.plugins.anthropic.llm
Classes
class AnthropicLLM (*,
api_key: str | None = None,
model: str = 'claude-sonnet-4-20250514',
base_url: str | None = None,
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_tokens: int = 1024,
top_k: int | None = None,
top_p: float | None = None,
caching: "Literal['ephemeral'] | None" = None,
parallel_tool_calls: bool | None = None,
thinking: dict | None = None,
client: anthropic.AsyncClient | None = None,
max_retries: int = 0)-
Expand source code
class AnthropicLLM(LLM): def __init__( self, *, api_key: str | None = None, model: str = "claude-sonnet-4-20250514", base_url: str | None = None, temperature: float = 0.7, tool_choice: ToolChoice = "auto", max_tokens: int = 1024, top_k: int | None = None, top_p: float | None = None, caching: Literal["ephemeral"] | None = None, parallel_tool_calls: bool | None = None, thinking: dict | None = None, client: anthropic.AsyncClient | None = None, max_retries: int = 0, ) -> None: """Initialize the Anthropic LLM. Args: api_key: Anthropic API key. Falls back to ``ANTHROPIC_API_KEY`` env var. model: Claude model name. Defaults to "claude-sonnet-4-20250514". base_url: Override the default Anthropic API base URL. temperature: Sampling temperature. Defaults to 0.7. tool_choice: Controls which (if any) tool is called. Defaults to "auto". max_tokens: Maximum tokens in the response. Defaults to 1024. top_k: Top-K tokens considered during sampling. top_p: Nucleus sampling probability mass. caching: Set to ``"ephemeral"`` to enable Anthropic prompt caching. When enabled, ``cache_control`` markers are applied to the system prompt, tool schemas, and recent conversation turns, and cache token counts are surfaced in usage metadata. parallel_tool_calls: Allow (``True``) or disallow (``False``) the model from issuing multiple tool calls in a single turn. Maps to ``disable_parallel_tool_use`` in the Anthropic API. thinking: Extended-thinking configuration dict, e.g. ``{"type": "enabled", "budget_tokens": 1024}``. When set, the ``interleaved-thinking-2025-05-14`` beta path is used. client: Optional pre-built ``anthropic.AsyncClient`` instance. When provided, ``api_key``, ``base_url``, ``timeout``, and ``max_retries`` are ignored. The caller retains ownership and is responsible for closing the client. max_retries: Number of automatic retries on transient errors. Defaults to 0. """ super().__init__() self.model = model self.temperature = temperature self.tool_choice = tool_choice self.max_tokens = max_tokens self.top_k = top_k self.top_p = top_p self.caching = caching self.parallel_tool_calls = parallel_tool_calls self.thinking = thinking self._cancelled = False self._owns_client = client is None if client is not None: self._client = client else: self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY") if not self.api_key: raise ValueError( "Anthropic API key must be provided either through api_key parameter " "or ANTHROPIC_API_KEY environment variable" ) self._client = anthropic.AsyncClient( api_key=self.api_key, base_url=base_url or None, max_retries=max_retries, http_client=httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=1000, max_keepalive_connections=100, keepalive_expiry=120, ), ), ) async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, conversational_graph: Any | None = None, **kwargs: Any, ) -> AsyncIterator[LLMResponse]: """ Implement chat functionality using Anthropic's Claude API. Args: messages: ChatContext containing conversation history. tools: Optional list of function tools available to the model. **kwargs: Additional arguments forwarded to the Anthropic API. Yields: LLMResponse objects containing the model's responses. """ self._cancelled = False response_stream = None try: anthropic_messages, system_content = self._convert_messages_to_anthropic_format( messages ) completion_params: dict = { "model": self.model, "messages": anthropic_messages, "max_tokens": self.max_tokens, "temperature": self.temperature, "stream": True, } if system_content: # system can be a plain string or a list of content blocks if self.caching == "ephemeral": completion_params["system"] = [ { "type": "text", "text": system_content, "cache_control": {"type": "ephemeral"}, } ] else: completion_params["system"] = system_content if self.top_k is not None: completion_params["top_k"] = self.top_k if self.top_p is not None: completion_params["top_p"] = self.top_p formatted_tools: list[dict] = [] if tools: seen_tool_names: set[str] = set() for tool in tools: if not is_function_tool(tool): continue try: openai_schema = build_openai_schema(tool) tool_name = openai_schema["name"] if tool_name in seen_tool_names: continue seen_tool_names.add(tool_name) formatted_tools.append( { "name": tool_name, "description": openai_schema["description"], "input_schema": openai_schema["parameters"], } ) except Exception as e: self.emit("error", f"Failed to format tool {tool}: {e}") continue if formatted_tools: # Apply cache_control to the last tool schema when caching is on if self.caching == "ephemeral": formatted_tools[-1]["cache_control"] = {"type": "ephemeral"} completion_params["tools"] = formatted_tools # Build tool_choice dict tool_choice_dict: dict = {} if isinstance(self.tool_choice, dict) and self.tool_choice.get("type") == "function": # Specific tool by name: OpenAI-style → Anthropic-style tool_choice_dict = { "type": "tool", "name": self.tool_choice["function"]["name"], } elif self.tool_choice == "required": tool_choice_dict = {"type": "any"} elif self.tool_choice == "auto": tool_choice_dict = {"type": "auto"} elif self.tool_choice == "none": del completion_params["tools"] tool_choice_dict = {} else: tool_choice_dict = {"type": "auto"} if tool_choice_dict: if self.parallel_tool_calls is not None: tool_choice_dict["disable_parallel_tool_use"] = not self.parallel_tool_calls completion_params["tool_choice"] = tool_choice_dict # Prompt caching: mark last user/assistant messages if self.caching == "ephemeral" and completion_params.get("messages"): msgs = completion_params["messages"] last_user_idx = last_asst_idx = -1 for idx, m in enumerate(msgs): if m["role"] == "user": last_user_idx = idx elif m["role"] == "assistant": last_asst_idx = idx for idx in (last_user_idx, last_asst_idx): if idx < 0: continue content = msgs[idx]["content"] if isinstance(content, str): msgs[idx]["content"] = [ {"type": "text", "text": content, "cache_control": {"type": "ephemeral"}} ] elif isinstance(content, list) and content: content[-1]["cache_control"] = {"type": "ephemeral"} completion_params.update(kwargs) # Build extra_headers extra_headers: dict = {} if conversational_graph: extra_headers["anthropic-beta"] = "structured-outputs-2025-11-13" if self.thinking: existing_betas = extra_headers.get("anthropic-beta", "") thinking_beta = "interleaved-thinking-2025-05-14" extra_headers["anthropic-beta"] = ( f"{existing_betas},{thinking_beta}" if existing_betas else thinking_beta ) completion_params["thinking"] = self.thinking if extra_headers: completion_params["extra_headers"] = extra_headers # Use beta API path when extended thinking is active if self.thinking: response_stream = await self._client.beta.messages.create(**completion_params) else: response_stream = await self._client.messages.create(**completion_params) current_content = "" current_tool_call: dict | None = None current_tool_call_id: str | None = None current_tool_arguments = "" usage_metadata: dict = { "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, "prompt_cached_tokens": 0, "cache_creation_tokens": 0, "cache_read_tokens": 0, } streaming_state = { "in_response": False, "response_start_index": -1, "yielded_content_length": 0, } async for event in response_stream: if self._cancelled: break if event.type == "message_start": usage_metadata["prompt_tokens"] = event.message.usage.input_tokens usage_metadata["prompt_cached_tokens"] = ( getattr(event.message.usage, "cache_read_input_tokens", 0) or 0 ) usage_metadata["cache_creation_tokens"] = ( getattr(event.message.usage, "cache_creation_input_tokens", 0) or 0 ) usage_metadata["cache_read_tokens"] = ( getattr(event.message.usage, "cache_read_input_tokens", 0) or 0 ) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata} ) elif event.type == "message_delta": if hasattr(event, "usage"): usage_metadata["completion_tokens"] = event.usage.output_tokens usage_metadata["total_tokens"] = ( usage_metadata["prompt_tokens"] + usage_metadata["completion_tokens"] ) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata} ) if event.type == "content_block_start": if event.content_block.type == "tool_use": current_tool_call_id = event.content_block.id current_tool_call = { "name": event.content_block.name, "arguments": "", } current_tool_arguments = "" elif event.type == "content_block_delta": delta = event.delta if delta.type == "text_delta": current_content += delta.text if conversational_graph: for content_chunk in conversational_graph.stream_conversational_graph_response( current_content, streaming_state ): yield LLMResponse( content=content_chunk, role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata}, ) else: yield LLMResponse( content=delta.text, role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata}, ) elif delta.type == "input_json_delta": if current_tool_call is not None: current_tool_arguments += delta.partial_json elif event.type == "content_block_stop": if current_tool_call is not None and current_tool_call_id is not None: try: parsed_args = ( json.loads(current_tool_arguments) if current_tool_arguments else {} ) current_tool_call["arguments"] = parsed_args except json.JSONDecodeError: current_tool_call["arguments"] = {} yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={ "function_call": { "id": current_tool_call_id, "name": current_tool_call["name"], "arguments": current_tool_call["arguments"], "call_id": current_tool_call_id, }, "usage": usage_metadata, }, ) current_tool_call = None current_tool_call_id = None current_tool_arguments = "" if not self._cancelled: yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata} ) if current_content and not self._cancelled and conversational_graph: try: parsed_json = json.loads(current_content.strip()) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"graph_response": parsed_json, "usage": usage_metadata}, ) except json.JSONDecodeError: yield LLMResponse( content=current_content, role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata}, ) except anthropic.APIError as e: if not self._cancelled: self.emit("error", e) raise except Exception as e: if not self._cancelled: self.emit("error", e) raise finally: if response_stream is not None: try: await response_stream.close() except Exception: pass async def cancel_current_generation(self) -> None: self._cancelled = True def _convert_messages_to_anthropic_format( self, messages: ChatContext ) -> tuple[list[dict], str | None]: """Convert ChatContext to Anthropic message format with role alternation enforced.""" def _format_content(content: Union[str, List[ChatContent]]): if isinstance(content, str): return content has_images = any(isinstance(p, ImageContent) for p in content) if not has_images and len(content) == 1 and isinstance(content[0], str): return content[0] formatted_parts = [] image_parts = [p for p in content if isinstance(p, ImageContent)] text_parts = [p for p in content if isinstance(p, str)] for part in image_parts: data_url = part.to_data_url() if data_url.startswith("data:"): header, b64_data = data_url.split(",", 1) media_type = header.split(";")[0].split(":")[1] formatted_parts.append( { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": b64_data, }, } ) else: formatted_parts.append( {"type": "image", "source": {"type": "url", "url": data_url}} ) for part in text_parts: formatted_parts.append({"type": "text", "text": part}) return formatted_parts anthropic_messages: list[dict] = [] system_content: str | None = None pending_tool_results: dict[str, FunctionCallOutput] = {} for item in messages.items: if isinstance(item, ChatMessage): if item.role == ChatRole.SYSTEM: if isinstance(item.content, list): system_content = next( (str(p) for p in item.content if isinstance(p, str)), "" ) else: system_content = str(item.content) continue else: anthropic_messages.append( {"role": item.role.value, "content": _format_content(item.content)} ) elif isinstance(item, FunctionCall): # Collect consecutive FunctionCall items into a single assistant message # so we properly represent parallel tool use in the history. if ( anthropic_messages and anthropic_messages[-1]["role"] == "assistant" and isinstance(anthropic_messages[-1]["content"], list) and any( p.get("type") == "tool_use" for p in anthropic_messages[-1]["content"] ) ): anthropic_messages[-1]["content"].append( { "type": "tool_use", "id": item.call_id, "name": item.name, "input": ( json.loads(item.arguments) if isinstance(item.arguments, str) else item.arguments ), } ) else: anthropic_messages.append( { "role": "assistant", "content": [ { "type": "tool_use", "id": item.call_id, "name": item.name, "input": ( json.loads(item.arguments) if isinstance(item.arguments, str) else item.arguments ), } ], } ) elif isinstance(item, FunctionCallOutput): pending_tool_results[item.call_id] = item # Pair tool results with their preceding tool_use turns final_messages: list[dict] = [] i = 0 while i < len(anthropic_messages): msg = anthropic_messages[i] final_messages.append(msg) if isinstance(msg.get("content"), list) and any( part.get("type") == "tool_use" for part in msg["content"] ): tool_result_blocks = [] for part in msg["content"]: if part.get("type") != "tool_use": continue call_id = part["id"] if call_id in pending_tool_results: tool_result = pending_tool_results.pop(call_id) tool_result_blocks.append( { "type": "tool_result", "tool_use_id": call_id, "content": tool_result.output, "is_error": tool_result.is_error, } ) if tool_result_blocks: final_messages.append({"role": "user", "content": tool_result_blocks}) i += 1 # Enforce Anthropic's strict role-alternation requirement by merging consecutive # messages that share the same role into a single message. merged: list[dict] = [] for msg in final_messages: if not merged or merged[-1]["role"] != msg["role"]: merged.append({"role": msg["role"], "content": msg["content"]}) else: prev_content = merged[-1]["content"] new_content = msg["content"] # Normalise both to lists of content blocks before extending if isinstance(prev_content, str): prev_content = [{"type": "text", "text": prev_content}] if isinstance(new_content, str): new_content = [{"type": "text", "text": new_content}] merged[-1]["content"] = prev_content + new_content return merged, system_content async def aclose(self) -> None: """Cleanup resources. Only closes the underlying HTTP client if this instance owns it.""" await self.cancel_current_generation() if self._owns_client and self._client: await self._client.close() await super().aclose()Base class for LLM implementations.
Initialize the Anthropic LLM.
Args
api_key- Anthropic API key. Falls back to
ANTHROPIC_API_KEYenv var. model- Claude model name. Defaults to "claude-sonnet-4-20250514".
base_url- Override the default Anthropic API base URL.
temperature- Sampling temperature. Defaults to 0.7.
tool_choice- Controls which (if any) tool is called. Defaults to "auto".
max_tokens- Maximum tokens in the response. Defaults to 1024.
top_k- Top-K tokens considered during sampling.
top_p- Nucleus sampling probability mass.
caching- Set to
"ephemeral"to enable Anthropic prompt caching. When enabled,cache_controlmarkers are applied to the system prompt, tool schemas, and recent conversation turns, and cache token counts are surfaced in usage metadata. parallel_tool_calls- Allow (
True) or disallow (False) the model from issuing multiple tool calls in a single turn. Maps todisable_parallel_tool_usein the Anthropic API. thinking- Extended-thinking configuration dict, e.g.
{"type": "enabled", "budget_tokens": 1024}. When set, theinterleaved-thinking-2025-05-14beta path is used. client- Optional pre-built
anthropic.AsyncClientinstance. When provided,api_key,base_url,timeout, andmax_retriesare ignored. The caller retains ownership and is responsible for closing the client. max_retries- Number of automatic retries on transient errors. Defaults to 0.
Ancestors
- videosdk.agents.llm.llm.LLM
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources. Only closes the underlying HTTP client if this instance owns it.""" await self.cancel_current_generation() if self._owns_client and self._client: await self._client.close() await super().aclose()Cleanup resources. Only closes the underlying HTTP client if this instance owns it.
async def cancel_current_generation(self) ‑> None-
Expand source code
async def cancel_current_generation(self) -> None: self._cancelled = TrueCancel the current LLM generation if active.
Raises
NotImplementedError- This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
conversational_graph: Any | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]-
Expand source code
async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, conversational_graph: Any | None = None, **kwargs: Any, ) -> AsyncIterator[LLMResponse]: """ Implement chat functionality using Anthropic's Claude API. Args: messages: ChatContext containing conversation history. tools: Optional list of function tools available to the model. **kwargs: Additional arguments forwarded to the Anthropic API. Yields: LLMResponse objects containing the model's responses. """ self._cancelled = False response_stream = None try: anthropic_messages, system_content = self._convert_messages_to_anthropic_format( messages ) completion_params: dict = { "model": self.model, "messages": anthropic_messages, "max_tokens": self.max_tokens, "temperature": self.temperature, "stream": True, } if system_content: # system can be a plain string or a list of content blocks if self.caching == "ephemeral": completion_params["system"] = [ { "type": "text", "text": system_content, "cache_control": {"type": "ephemeral"}, } ] else: completion_params["system"] = system_content if self.top_k is not None: completion_params["top_k"] = self.top_k if self.top_p is not None: completion_params["top_p"] = self.top_p formatted_tools: list[dict] = [] if tools: seen_tool_names: set[str] = set() for tool in tools: if not is_function_tool(tool): continue try: openai_schema = build_openai_schema(tool) tool_name = openai_schema["name"] if tool_name in seen_tool_names: continue seen_tool_names.add(tool_name) formatted_tools.append( { "name": tool_name, "description": openai_schema["description"], "input_schema": openai_schema["parameters"], } ) except Exception as e: self.emit("error", f"Failed to format tool {tool}: {e}") continue if formatted_tools: # Apply cache_control to the last tool schema when caching is on if self.caching == "ephemeral": formatted_tools[-1]["cache_control"] = {"type": "ephemeral"} completion_params["tools"] = formatted_tools # Build tool_choice dict tool_choice_dict: dict = {} if isinstance(self.tool_choice, dict) and self.tool_choice.get("type") == "function": # Specific tool by name: OpenAI-style → Anthropic-style tool_choice_dict = { "type": "tool", "name": self.tool_choice["function"]["name"], } elif self.tool_choice == "required": tool_choice_dict = {"type": "any"} elif self.tool_choice == "auto": tool_choice_dict = {"type": "auto"} elif self.tool_choice == "none": del completion_params["tools"] tool_choice_dict = {} else: tool_choice_dict = {"type": "auto"} if tool_choice_dict: if self.parallel_tool_calls is not None: tool_choice_dict["disable_parallel_tool_use"] = not self.parallel_tool_calls completion_params["tool_choice"] = tool_choice_dict # Prompt caching: mark last user/assistant messages if self.caching == "ephemeral" and completion_params.get("messages"): msgs = completion_params["messages"] last_user_idx = last_asst_idx = -1 for idx, m in enumerate(msgs): if m["role"] == "user": last_user_idx = idx elif m["role"] == "assistant": last_asst_idx = idx for idx in (last_user_idx, last_asst_idx): if idx < 0: continue content = msgs[idx]["content"] if isinstance(content, str): msgs[idx]["content"] = [ {"type": "text", "text": content, "cache_control": {"type": "ephemeral"}} ] elif isinstance(content, list) and content: content[-1]["cache_control"] = {"type": "ephemeral"} completion_params.update(kwargs) # Build extra_headers extra_headers: dict = {} if conversational_graph: extra_headers["anthropic-beta"] = "structured-outputs-2025-11-13" if self.thinking: existing_betas = extra_headers.get("anthropic-beta", "") thinking_beta = "interleaved-thinking-2025-05-14" extra_headers["anthropic-beta"] = ( f"{existing_betas},{thinking_beta}" if existing_betas else thinking_beta ) completion_params["thinking"] = self.thinking if extra_headers: completion_params["extra_headers"] = extra_headers # Use beta API path when extended thinking is active if self.thinking: response_stream = await self._client.beta.messages.create(**completion_params) else: response_stream = await self._client.messages.create(**completion_params) current_content = "" current_tool_call: dict | None = None current_tool_call_id: str | None = None current_tool_arguments = "" usage_metadata: dict = { "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, "prompt_cached_tokens": 0, "cache_creation_tokens": 0, "cache_read_tokens": 0, } streaming_state = { "in_response": False, "response_start_index": -1, "yielded_content_length": 0, } async for event in response_stream: if self._cancelled: break if event.type == "message_start": usage_metadata["prompt_tokens"] = event.message.usage.input_tokens usage_metadata["prompt_cached_tokens"] = ( getattr(event.message.usage, "cache_read_input_tokens", 0) or 0 ) usage_metadata["cache_creation_tokens"] = ( getattr(event.message.usage, "cache_creation_input_tokens", 0) or 0 ) usage_metadata["cache_read_tokens"] = ( getattr(event.message.usage, "cache_read_input_tokens", 0) or 0 ) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata} ) elif event.type == "message_delta": if hasattr(event, "usage"): usage_metadata["completion_tokens"] = event.usage.output_tokens usage_metadata["total_tokens"] = ( usage_metadata["prompt_tokens"] + usage_metadata["completion_tokens"] ) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata} ) if event.type == "content_block_start": if event.content_block.type == "tool_use": current_tool_call_id = event.content_block.id current_tool_call = { "name": event.content_block.name, "arguments": "", } current_tool_arguments = "" elif event.type == "content_block_delta": delta = event.delta if delta.type == "text_delta": current_content += delta.text if conversational_graph: for content_chunk in conversational_graph.stream_conversational_graph_response( current_content, streaming_state ): yield LLMResponse( content=content_chunk, role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata}, ) else: yield LLMResponse( content=delta.text, role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata}, ) elif delta.type == "input_json_delta": if current_tool_call is not None: current_tool_arguments += delta.partial_json elif event.type == "content_block_stop": if current_tool_call is not None and current_tool_call_id is not None: try: parsed_args = ( json.loads(current_tool_arguments) if current_tool_arguments else {} ) current_tool_call["arguments"] = parsed_args except json.JSONDecodeError: current_tool_call["arguments"] = {} yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={ "function_call": { "id": current_tool_call_id, "name": current_tool_call["name"], "arguments": current_tool_call["arguments"], "call_id": current_tool_call_id, }, "usage": usage_metadata, }, ) current_tool_call = None current_tool_call_id = None current_tool_arguments = "" if not self._cancelled: yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata} ) if current_content and not self._cancelled and conversational_graph: try: parsed_json = json.loads(current_content.strip()) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"graph_response": parsed_json, "usage": usage_metadata}, ) except json.JSONDecodeError: yield LLMResponse( content=current_content, role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata}, ) except anthropic.APIError as e: if not self._cancelled: self.emit("error", e) raise except Exception as e: if not self._cancelled: self.emit("error", e) raise finally: if response_stream is not None: try: await response_stream.close() except Exception: passImplement chat functionality using Anthropic's Claude API.
Args
messages- ChatContext containing conversation history.
tools- Optional list of function tools available to the model.
**kwargs- Additional arguments forwarded to the Anthropic API.
Yields
LLMResponse objects containing the model's responses.