Module videosdk.plugins.openai.llm
Classes
class OpenAILLM (*,
api_key: str | None = None,
model: str = 'gpt-4o-mini',
base_url: str | None = None,
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_completion_tokens: int | None = None)-
Expand source code
class OpenAILLM(LLM): def __init__( self, *, api_key: str | None = None, model: str = "gpt-4o-mini", base_url: str | None = None, temperature: float = 0.7, tool_choice: ToolChoice = "auto", max_completion_tokens: int | None = None, ) -> None: """Initialize the OpenAI LLM plugin. Args: api_key (Optional[str], optional): OpenAI API key. Defaults to None. model (str): The model to use for the LLM plugin. Defaults to "gpt-4o". base_url (Optional[str], optional): The base URL for the OpenAI API. Defaults to None. temperature (float): The temperature to use for the LLM plugin. Defaults to 0.7. tool_choice (ToolChoice): The tool choice to use for the LLM plugin. Defaults to "auto". max_completion_tokens (Optional[int], optional): The maximum completion tokens to use for the LLM plugin. Defaults to None. """ super().__init__() self.api_key = api_key or os.getenv("OPENAI_API_KEY") if not self.api_key: raise ValueError("OpenAI API key must be provided either through api_key parameter or OPENAI_API_KEY environment variable") self.model = model self.temperature = temperature self.tool_choice = tool_choice self.max_completion_tokens = max_completion_tokens self._cancelled = False self._client = openai.AsyncOpenAI( api_key=self.api_key, base_url=base_url or None, max_retries=0, http_client=httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ), ) @staticmethod def azure( *, model: str = "gpt-4o-mini", azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, temperature: float = 0.7, tool_choice: ToolChoice = "auto", max_completion_tokens: int | None = None, timeout: httpx.Timeout | None = None, ) -> "OpenAILLM": """ Create a new instance of Azure OpenAI LLM. This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name) """ azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT") api_version = api_version or os.getenv("OPENAI_API_VERSION") api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN") organization = organization or os.getenv("OPENAI_ORG_ID") project = project or os.getenv("OPENAI_PROJECT_ID") if not azure_deployment: azure_deployment = model if not azure_endpoint: raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable") if not api_key and not azure_ad_token: raise ValueError("Either API key or Azure AD token must be provided") azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, organization=organization, project=project, base_url=base_url, timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), ) instance = OpenAILLM( model=model, temperature=temperature, tool_choice=tool_choice, max_completion_tokens=max_completion_tokens, ) instance._client = azure_client return instance async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Implement chat functionality using OpenAI's chat completion API Args: messages: ChatContext containing conversation history tools: Optional list of function tools available to the model **kwargs: Additional arguments passed to the OpenAI API Yields: LLMResponse objects containing the model's responses """ self._cancelled = False def _format_content(content: Union[str, List[ChatContent]]): if isinstance(content, str): return content formatted_parts = [] for part in content: if isinstance(part, str): formatted_parts.append({"type": "text", "text": part}) elif isinstance(part, ImageContent): image_url_data = {"url": part.to_data_url()} if part.inference_detail != "auto": image_url_data["detail"] = part.inference_detail formatted_parts.append( { "type": "image_url", "image_url": image_url_data, } ) return formatted_parts completion_params = { "model": self.model, "messages": [ { "role": msg.role.value, "content": _format_content(msg.content), **({"name": msg.name} if hasattr(msg, "name") else {}), } if isinstance(msg, ChatMessage) else { "role": "assistant", "content": None, "function_call": {"name": msg.name, "arguments": msg.arguments}, } if isinstance(msg, FunctionCall) else { "role": "function", "name": msg.name, "content": msg.output, } if isinstance(msg, FunctionCallOutput) else None for msg in messages.items if msg is not None ], "temperature": self.temperature, "stream": True, "max_tokens": self.max_completion_tokens, } if tools: formatted_tools = [] for tool in tools: if not is_function_tool(tool): continue try: tool_schema = build_openai_schema(tool) formatted_tools.append(tool_schema) except Exception as e: self.emit("error", f"Failed to format tool {tool}: {e}") continue if formatted_tools: completion_params["functions"] = formatted_tools completion_params["function_call"] = self.tool_choice completion_params.update(kwargs) try: response_stream = await self._client.chat.completions.create(**completion_params) current_content = "" current_function_call = None async for chunk in response_stream: if self._cancelled: break if not chunk.choices: continue delta = chunk.choices[0].delta if delta.function_call: if current_function_call is None: current_function_call = { "name": delta.function_call.name or "", "arguments": delta.function_call.arguments or "" } else: if delta.function_call.name: current_function_call["name"] += delta.function_call.name if delta.function_call.arguments: current_function_call["arguments"] += delta.function_call.arguments elif current_function_call is not None: try: args = json.loads(current_function_call["arguments"]) current_function_call["arguments"] = args except json.JSONDecodeError: self.emit("error", f"Failed to parse function arguments: {current_function_call['arguments']}") current_function_call["arguments"] = {} yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"function_call": current_function_call} ) current_function_call = None elif delta.content is not None: current_content = delta.content yield LLMResponse( content=current_content, role=ChatRole.ASSISTANT ) except Exception as e: if not self._cancelled: self.emit("error", e) raise async def cancel_current_generation(self) -> None: self._cancelled = True async def aclose(self) -> None: """Cleanup resources by closing the HTTP client""" await self.cancel_current_generation() if self._client: await self._client.close() await super().aclose()Base class for LLM implementations.
Initialize the OpenAI LLM plugin.
Args
api_key:Optional[str], optional- OpenAI API key. Defaults to None.
model:str- The model to use for the LLM plugin. Defaults to "gpt-4o".
base_url:Optional[str], optional- The base URL for the OpenAI API. Defaults to None.
temperature:float- The temperature to use for the LLM plugin. Defaults to 0.7.
tool_choice:ToolChoice- The tool choice to use for the LLM plugin. Defaults to "auto".
max_completion_tokens:Optional[int], optional- The maximum completion tokens to use for the LLM plugin. Defaults to None.
Ancestors
- videosdk.agents.llm.llm.LLM
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Static methods
def azure(*,
model: str = 'gpt-4o-mini',
azure_endpoint: str | None = None,
azure_deployment: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
azure_ad_token: str | None = None,
organization: str | None = None,
project: str | None = None,
base_url: str | None = None,
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_completion_tokens: int | None = None,
timeout: httpx.Timeout | None = None) ‑> OpenAILLM-
Expand source code
@staticmethod def azure( *, model: str = "gpt-4o-mini", azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, temperature: float = 0.7, tool_choice: ToolChoice = "auto", max_completion_tokens: int | None = None, timeout: httpx.Timeout | None = None, ) -> "OpenAILLM": """ Create a new instance of Azure OpenAI LLM. This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` - `azure_deployment` from `AZURE_OPENAI_DEPLOYMENT` (if not provided, uses `model` as deployment name) """ azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT") api_version = api_version or os.getenv("OPENAI_API_VERSION") api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN") organization = organization or os.getenv("OPENAI_ORG_ID") project = project or os.getenv("OPENAI_PROJECT_ID") if not azure_deployment: azure_deployment = model if not azure_endpoint: raise ValueError("Azure endpoint must be provided either through azure_endpoint parameter or AZURE_OPENAI_ENDPOINT environment variable") if not api_key and not azure_ad_token: raise ValueError("Either API key or Azure AD token must be provided") azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, organization=organization, project=project, base_url=base_url, timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), ) instance = OpenAILLM( model=model, temperature=temperature, tool_choice=tool_choice, max_completion_tokens=max_completion_tokens, ) instance._client = azure_client return instanceCreate a new instance of Azure OpenAI LLM.
This automatically infers the following arguments from their corresponding environment variables if they are not provided: -
api_keyfromAZURE_OPENAI_API_KEY-organizationfromOPENAI_ORG_ID-projectfromOPENAI_PROJECT_ID-azure_ad_tokenfromAZURE_OPENAI_AD_TOKEN-api_versionfromOPENAI_API_VERSION-azure_endpointfromAZURE_OPENAI_ENDPOINT-azure_deploymentfromAZURE_OPENAI_DEPLOYMENT(if not provided, usesmodelas deployment name)
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources by closing the HTTP client""" await self.cancel_current_generation() if self._client: await self._client.close() await super().aclose()Cleanup resources by closing the HTTP client
async def cancel_current_generation(self) ‑> None-
Expand source code
async def cancel_current_generation(self) -> None: self._cancelled = TrueCancel the current LLM generation if active.
Raises
NotImplementedError- This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]-
Expand source code
async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Implement chat functionality using OpenAI's chat completion API Args: messages: ChatContext containing conversation history tools: Optional list of function tools available to the model **kwargs: Additional arguments passed to the OpenAI API Yields: LLMResponse objects containing the model's responses """ self._cancelled = False def _format_content(content: Union[str, List[ChatContent]]): if isinstance(content, str): return content formatted_parts = [] for part in content: if isinstance(part, str): formatted_parts.append({"type": "text", "text": part}) elif isinstance(part, ImageContent): image_url_data = {"url": part.to_data_url()} if part.inference_detail != "auto": image_url_data["detail"] = part.inference_detail formatted_parts.append( { "type": "image_url", "image_url": image_url_data, } ) return formatted_parts completion_params = { "model": self.model, "messages": [ { "role": msg.role.value, "content": _format_content(msg.content), **({"name": msg.name} if hasattr(msg, "name") else {}), } if isinstance(msg, ChatMessage) else { "role": "assistant", "content": None, "function_call": {"name": msg.name, "arguments": msg.arguments}, } if isinstance(msg, FunctionCall) else { "role": "function", "name": msg.name, "content": msg.output, } if isinstance(msg, FunctionCallOutput) else None for msg in messages.items if msg is not None ], "temperature": self.temperature, "stream": True, "max_tokens": self.max_completion_tokens, } if tools: formatted_tools = [] for tool in tools: if not is_function_tool(tool): continue try: tool_schema = build_openai_schema(tool) formatted_tools.append(tool_schema) except Exception as e: self.emit("error", f"Failed to format tool {tool}: {e}") continue if formatted_tools: completion_params["functions"] = formatted_tools completion_params["function_call"] = self.tool_choice completion_params.update(kwargs) try: response_stream = await self._client.chat.completions.create(**completion_params) current_content = "" current_function_call = None async for chunk in response_stream: if self._cancelled: break if not chunk.choices: continue delta = chunk.choices[0].delta if delta.function_call: if current_function_call is None: current_function_call = { "name": delta.function_call.name or "", "arguments": delta.function_call.arguments or "" } else: if delta.function_call.name: current_function_call["name"] += delta.function_call.name if delta.function_call.arguments: current_function_call["arguments"] += delta.function_call.arguments elif current_function_call is not None: try: args = json.loads(current_function_call["arguments"]) current_function_call["arguments"] = args except json.JSONDecodeError: self.emit("error", f"Failed to parse function arguments: {current_function_call['arguments']}") current_function_call["arguments"] = {} yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"function_call": current_function_call} ) current_function_call = None elif delta.content is not None: current_content = delta.content yield LLMResponse( content=current_content, role=ChatRole.ASSISTANT ) except Exception as e: if not self._cancelled: self.emit("error", e) raiseImplement chat functionality using OpenAI's chat completion API
Args
messages- ChatContext containing conversation history
tools- Optional list of function tools available to the model
**kwargs- Additional arguments passed to the OpenAI API
Yields
LLMResponse objects containing the model's responses