Module videosdk.plugins.openai.llm
Functions
def prepare_strict_schema(schema_dict)-
Expand source code
def prepare_strict_schema(schema_dict): if isinstance(schema_dict, dict): if schema_dict.get("type") == "object": schema_dict["additionalProperties"] = False if "properties" in schema_dict: all_props = list(schema_dict["properties"].keys()) schema_dict["required"] = all_props for key, value in schema_dict.items(): if isinstance(value, dict): prepare_strict_schema(value) elif isinstance(value, list): for item in value: if isinstance(item, dict): prepare_strict_schema(item) return schema_dict
Classes
class OpenAILLM (*,
api_key: str | None = None,
model: str = 'gpt-4o-mini',
base_url: str | None = None,
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_completion_tokens: int | None = None,
top_p: float | None = None,
frequency_penalty: float | None = None,
presence_penalty: float | None = None,
seed: int | None = None,
organization: str | None = None,
project: str | None = None,
parallel_tool_calls: bool | None = None,
timeout: httpx.Timeout | None = None,
extra_headers: dict | None = None,
extra_query: dict | None = None,
extra_body: dict | None = None,
client: openai.AsyncOpenAI | None = None,
max_retries: int = 0,
reasoning_effort: "Literal['none', 'low', 'medium', 'high'] | None" = None,
verbosity: "Literal['low', 'medium', 'high'] | None" = None)-
Expand source code
class OpenAILLM(LLM): def __init__( self, *, api_key: str | None = None, model: str = "gpt-4o-mini", base_url: str | None = None, temperature: float = 0.7, tool_choice: ToolChoice = "auto", max_completion_tokens: int | None = None, top_p: float | None = None, frequency_penalty: float | None = None, presence_penalty: float | None = None, seed: int | None = None, organization: str | None = None, project: str | None = None, parallel_tool_calls: bool | None = None, timeout: httpx.Timeout | None = None, extra_headers: dict | None = None, extra_query: dict | None = None, extra_body: dict | None = None, client: openai.AsyncOpenAI | None = None, max_retries: int = 0, reasoning_effort: Literal["none", "low", "medium", "high"] | None = None, verbosity: Literal["low", "medium", "high"] | None = None, ) -> None: """Initialize the OpenAI LLM plugin. Args: api_key: OpenAI API key. Falls back to OPENAI_API_KEY env var. model: Chat model name. Defaults to "gpt-4o-mini". base_url: Override the default OpenAI API base URL. temperature: Sampling temperature. Defaults to 0.7. tool_choice: Controls which (if any) tool is called. Defaults to "auto". max_completion_tokens: Maximum tokens in the completion. top_p: Nucleus sampling probability mass. frequency_penalty: Penalise repeated tokens by frequency. presence_penalty: Penalise tokens that have already appeared. seed: Seed for deterministic sampling. organization: OpenAI organisation ID. project: OpenAI project ID. parallel_tool_calls: Allow the model to call multiple tools in one turn. timeout: Custom httpx.Timeout for the underlying HTTP client. extra_headers: Additional HTTP headers forwarded to every API call. extra_query: Additional query-string parameters forwarded to every API call. extra_body: Additional JSON body fields forwarded to every API call. client: Optional pre-built ``openai.AsyncOpenAI`` instance to use instead of creating a new one. Useful for sharing a client across instances or for testing. When provided, ``api_key``, ``base_url``, ``organization``, ``project``, ``timeout``, and ``max_retries`` are ignored. max_retries: Number of automatic retries on transient errors. Defaults to 0. reasoning_effort: Controls reasoning depth for reasoning models. Supported values: "none", "low", "medium", "high". Defaults to None (uses the model's default). Only applied for reasoning / GPT-5 models. verbosity: Controls output verbosity for reasoning / GPT-5 models. Supported values: "low", "medium", "high". Defaults to None. """ super().__init__() self.model = model self.temperature = temperature self.tool_choice = tool_choice self.max_completion_tokens = max_completion_tokens self.top_p = top_p self.frequency_penalty = frequency_penalty self.presence_penalty = presence_penalty self.seed = seed self.parallel_tool_calls = parallel_tool_calls self.extra_headers = extra_headers self.extra_query = extra_query self.extra_body = extra_body self.reasoning_effort = reasoning_effort self.verbosity = verbosity self._cancelled = False self._owns_client = client is None if client is not None: self._client = client else: self.api_key = api_key or os.getenv("OPENAI_API_KEY") if not self.api_key: raise ValueError( "OpenAI API key must be provided either through api_key parameter " "or OPENAI_API_KEY environment variable" ) _timeout = timeout or httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0) self._client = openai.AsyncOpenAI( api_key=self.api_key, base_url=base_url or None, organization=organization or os.getenv("OPENAI_ORG_ID"), project=project or os.getenv("OPENAI_PROJECT_ID"), max_retries=max_retries, http_client=httpx.AsyncClient( timeout=_timeout, follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ), ) def _is_reasoning_model(self) -> bool: """Return True if the configured model is a reasoning / GPT-5 family model that requires special parameter handling.""" model_lower = self.model.lower() if model_lower.startswith(("o1", "o3", "o4")): return True if model_lower.startswith("gpt-5"): return True return False @staticmethod def azure( *, model: str = "gpt-4o-mini", azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, temperature: float = 0.7, tool_choice: ToolChoice = "auto", max_completion_tokens: int | None = None, top_p: float | None = None, frequency_penalty: float | None = None, presence_penalty: float | None = None, seed: int | None = None, parallel_tool_calls: bool | None = None, timeout: httpx.Timeout | None = None, extra_headers: dict | None = None, extra_query: dict | None = None, extra_body: dict | None = None, client: openai.AsyncAzureOpenAI | None = None, max_retries: int = 0, reasoning_effort: Literal["none", "low", "medium", "high"] | None = "none", verbosity: Literal["low", "medium", "high"] | None = "low", ) -> "OpenAILLM": """ Create a new instance of Azure OpenAI LLM. Automatically infers the following from environment variables when not provided: - ``api_key`` from ``AZURE_OPENAI_API_KEY`` - ``organization`` from ``OPENAI_ORG_ID`` - ``project`` from ``OPENAI_PROJECT_ID`` - ``azure_ad_token`` from ``AZURE_OPENAI_AD_TOKEN`` - ``api_version`` from ``OPENAI_API_VERSION`` - ``azure_endpoint`` from ``AZURE_OPENAI_ENDPOINT`` - ``azure_deployment`` from ``AZURE_OPENAI_DEPLOYMENT`` (falls back to ``model``) Pass ``client`` to supply a pre-built ``openai.AsyncAzureOpenAI`` instance. When ``client`` is provided, connection/credential params are ignored. """ if client is not None: instance = OpenAILLM( model=model, temperature=temperature, tool_choice=tool_choice, max_completion_tokens=max_completion_tokens, top_p=top_p, frequency_penalty=frequency_penalty, presence_penalty=presence_penalty, seed=seed, parallel_tool_calls=parallel_tool_calls, extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, client=client, reasoning_effort=reasoning_effort, verbosity=verbosity, ) return instance azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT") api_version = api_version or os.getenv("OPENAI_API_VERSION") api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN") organization = organization or os.getenv("OPENAI_ORG_ID") project = project or os.getenv("OPENAI_PROJECT_ID") if not azure_deployment: azure_deployment = model if not azure_endpoint: raise ValueError( "Azure endpoint must be provided either through azure_endpoint parameter " "or AZURE_OPENAI_ENDPOINT environment variable" ) if not api_key and not azure_ad_token: raise ValueError("Either API key or Azure AD token must be provided") _timeout = timeout or httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0) azure_client = openai.AsyncAzureOpenAI( max_retries=max_retries, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, organization=organization, project=project, base_url=base_url, timeout=_timeout, ) instance = OpenAILLM( model=model, temperature=temperature, tool_choice=tool_choice, max_completion_tokens=max_completion_tokens, top_p=top_p, frequency_penalty=frequency_penalty, presence_penalty=presence_penalty, seed=seed, parallel_tool_calls=parallel_tool_calls, extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, client=azure_client, reasoning_effort=reasoning_effort, verbosity=verbosity, ) return instance async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, conversational_graph: Any | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Implement chat functionality using OpenAI's chat completion API. Args: messages: ChatContext containing conversation history. tools: Optional list of function tools available to the model. **kwargs: Additional arguments forwarded to the OpenAI API. Yields: LLMResponse objects containing the model's responses. """ self._cancelled = False def _format_content(content: Union[str, List[ChatContent]]): if isinstance(content, str): return content formatted_parts = [] for part in content: if isinstance(part, str): formatted_parts.append({"type": "text", "text": part}) elif isinstance(part, ImageContent): image_url_data = {"url": part.to_data_url()} if part.inference_detail != "auto": image_url_data["detail"] = part.inference_detail formatted_parts.append( { "type": "image_url", "image_url": image_url_data, } ) return formatted_parts is_reasoning = self._is_reasoning_model() # Build the messages list using the modern tool_calls / tool role format. # FunctionCall → assistant turn with tool_calls; FunctionCallOutput → tool role. openai_messages = [] i = 0 items = messages.items while i < len(items): msg = items[i] if msg is None: i += 1 continue if isinstance(msg, ChatMessage): role = msg.role.value if is_reasoning and role == "system": role = "developer" openai_messages.append({ "role": role, "content": _format_content(msg.content), **({"name": msg.name} if hasattr(msg, "name") and msg.name else {}), }) i += 1 elif isinstance(msg, FunctionCall): # Collect all consecutive FunctionCall items (parallel tool calls) into one # assistant message with a tool_calls list. tool_calls = [] while i < len(items) and isinstance(items[i], FunctionCall): fc = items[i] tool_calls.append({ "id": fc.call_id, "type": "function", "function": { "name": fc.name, "arguments": fc.arguments, }, }) i += 1 openai_messages.append({ "role": "assistant", "content": None, "tool_calls": tool_calls, }) elif isinstance(msg, FunctionCallOutput): openai_messages.append({ "role": "tool", "tool_call_id": msg.call_id, "content": msg.output, }) i += 1 else: i += 1 completion_params: dict = { "model": self.model, "messages": openai_messages, "stream": True, "stream_options": {"include_usage": True}, } if is_reasoning: if self.max_completion_tokens is not None: completion_params["max_completion_tokens"] = self.max_completion_tokens if self.reasoning_effort is not None: completion_params["reasoning_effort"] = self.reasoning_effort if self.verbosity is not None: completion_params["text"] = {"format": {"type": "text"}, "verbosity": self.verbosity} else: completion_params["temperature"] = self.temperature if self.max_completion_tokens is not None: completion_params["max_completion_tokens"] = self.max_completion_tokens if self.top_p is not None: completion_params["top_p"] = self.top_p if self.frequency_penalty is not None: completion_params["frequency_penalty"] = self.frequency_penalty if self.presence_penalty is not None: completion_params["presence_penalty"] = self.presence_penalty if self.seed is not None: completion_params["seed"] = self.seed if conversational_graph: completion_params["response_format"] = { "type": "json_schema", "json_schema": { "name": "conversational_graph_response", "strict": True, "schema": conversational_graph_schema } } # Modern tools API (replaces deprecated functions/function_call) if tools: formatted_tools = [] for tool in tools: if not is_function_tool(tool): continue try: tool_schema = build_openai_schema(tool) formatted_tools.append({"type": "function", "function": tool_schema}) except Exception as e: self.emit("error", f"Failed to format tool {tool}: {e}") continue if formatted_tools: completion_params["tools"] = formatted_tools # tool_choice: "auto"|"required"|"none" or {"type":"function","function":{"name":"..."}} if isinstance(self.tool_choice, dict): completion_params["tool_choice"] = self.tool_choice else: completion_params["tool_choice"] = self.tool_choice if self.parallel_tool_calls is not None: completion_params["parallel_tool_calls"] = self.parallel_tool_calls # Pass-through overrides from caller completion_params.update(kwargs) # Passthrough extra headers / query / body create_kwargs: dict = {} if self.extra_headers: create_kwargs["extra_headers"] = self.extra_headers if self.extra_query: create_kwargs["extra_query"] = self.extra_query if self.extra_body: create_kwargs["extra_body"] = self.extra_body response_stream = None try: response_stream = await self._client.chat.completions.create( **completion_params, **create_kwargs ) current_content = "" # Accumulate streamed tool call fragments keyed by delta index pending_tool_calls: dict[int, dict] = {} streaming_state = { "in_response": False, "response_start_index": -1, "yielded_content_length": 0 } usage_metadata: dict = { "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, "prompt_cached_tokens": 0, "reasoning_tokens": 0, "request_id": None, "model": self.model, } async for chunk in response_stream: if self._cancelled: break if hasattr(chunk, 'usage') and chunk.usage is not None: usage_metadata["prompt_tokens"] = chunk.usage.prompt_tokens or 0 usage_metadata["completion_tokens"] = chunk.usage.completion_tokens or 0 usage_metadata["total_tokens"] = chunk.usage.total_tokens or 0 usage_metadata["request_id"] = getattr(chunk, "id", None) usage_metadata["model"] = getattr(chunk, "model", self.model) if hasattr(chunk.usage, 'prompt_tokens_details') and chunk.usage.prompt_tokens_details: usage_metadata["prompt_cached_tokens"] = getattr( chunk.usage.prompt_tokens_details, 'cached_tokens', 0 ) or 0 if hasattr(chunk.usage, 'completion_tokens_details') and chunk.usage.completion_tokens_details: usage_metadata["reasoning_tokens"] = getattr( chunk.usage.completion_tokens_details, 'reasoning_tokens', 0 ) or 0 yield LLMResponse(content="", role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata}) if not chunk.choices: continue delta = chunk.choices[0].delta finish_reason = chunk.choices[0].finish_reason # Accumulate tool call fragments per index if delta.tool_calls: for tc in delta.tool_calls: idx = tc.index if idx not in pending_tool_calls: pending_tool_calls[idx] = { "id": tc.id or "", "name": (tc.function.name or "") if tc.function else "", "arguments": (tc.function.arguments or "") if tc.function else "", } else: if tc.function: if tc.function.name: pending_tool_calls[idx]["name"] += tc.function.name if tc.function.arguments: pending_tool_calls[idx]["arguments"] += tc.function.arguments # Emit all accumulated tool calls once the model signals it is done if finish_reason == "tool_calls" and pending_tool_calls: for tc_data in sorted(pending_tool_calls.values(), key=lambda x: x["id"]): try: args = json.loads(tc_data["arguments"]) except json.JSONDecodeError: self.emit("error", f"Failed to parse tool call arguments: {tc_data['arguments']}") args = {} yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={ "function_call": {"name": tc_data["name"], "arguments": args, "id": tc_data["id"]}, "usage": usage_metadata, } ) pending_tool_calls = {} elif delta.content is not None: current_content += delta.content if conversational_graph: for content_chunk in conversational_graph.stream_conversational_graph_response( current_content, streaming_state ): yield LLMResponse( content=content_chunk, role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata}, ) else: yield LLMResponse( content=delta.content, role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata}, ) # Flush any tool calls not yet emitted (stream ended without explicit finish_reason) if pending_tool_calls and not self._cancelled: for tc_data in sorted(pending_tool_calls.values(), key=lambda x: x["id"]): try: args = json.loads(tc_data["arguments"]) except json.JSONDecodeError: self.emit("error", f"Failed to parse tool call arguments: {tc_data['arguments']}") args = {} yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={ "function_call": {"name": tc_data["name"], "arguments": args, "id": tc_data["id"]}, "usage": usage_metadata, } ) if current_content and not self._cancelled and conversational_graph: try: parsed_json = json.loads(current_content.strip()) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata, "graph_response": parsed_json} ) except json.JSONDecodeError: yield LLMResponse( content=current_content, role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata} ) except Exception as e: if not self._cancelled: self.emit("error", e) raise finally: if response_stream is not None: try: await response_stream.close() except Exception: pass async def cancel_current_generation(self) -> None: self._cancelled = True async def aclose(self) -> None: """Cleanup resources. Only closes the underlying HTTP client if this instance owns it.""" await self.cancel_current_generation() if self._owns_client and self._client: await self._client.close() await super().aclose()Base class for LLM implementations.
Initialize the OpenAI LLM plugin.
Args
api_key- OpenAI API key. Falls back to OPENAI_API_KEY env var.
model- Chat model name. Defaults to "gpt-4o-mini".
base_url- Override the default OpenAI API base URL.
temperature- Sampling temperature. Defaults to 0.7.
tool_choice- Controls which (if any) tool is called. Defaults to "auto".
max_completion_tokens- Maximum tokens in the completion.
top_p- Nucleus sampling probability mass.
frequency_penalty- Penalise repeated tokens by frequency.
presence_penalty- Penalise tokens that have already appeared.
seed- Seed for deterministic sampling.
organization- OpenAI organisation ID.
project- OpenAI project ID.
parallel_tool_calls- Allow the model to call multiple tools in one turn.
timeout- Custom httpx.Timeout for the underlying HTTP client.
extra_headers- Additional HTTP headers forwarded to every API call.
extra_query- Additional query-string parameters forwarded to every API call.
extra_body- Additional JSON body fields forwarded to every API call.
client- Optional pre-built
openai.AsyncOpenAIinstance to use instead of creating a new one. Useful for sharing a client across instances or for testing. When provided,api_key,base_url,organization,project,timeout, andmax_retriesare ignored. max_retries- Number of automatic retries on transient errors. Defaults to 0.
reasoning_effort- Controls reasoning depth for reasoning models. Supported values: "none", "low", "medium", "high". Defaults to None (uses the model's default). Only applied for reasoning / GPT-5 models.
verbosity- Controls output verbosity for reasoning / GPT-5 models. Supported values: "low", "medium", "high". Defaults to None.
Ancestors
- videosdk.agents.llm.llm.LLM
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Static methods
def azure(*,
model: str = 'gpt-4o-mini',
azure_endpoint: str | None = None,
azure_deployment: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
azure_ad_token: str | None = None,
organization: str | None = None,
project: str | None = None,
base_url: str | None = None,
temperature: float = 0.7,
tool_choice: ToolChoice = 'auto',
max_completion_tokens: int | None = None,
top_p: float | None = None,
frequency_penalty: float | None = None,
presence_penalty: float | None = None,
seed: int | None = None,
parallel_tool_calls: bool | None = None,
timeout: httpx.Timeout | None = None,
extra_headers: dict | None = None,
extra_query: dict | None = None,
extra_body: dict | None = None,
client: openai.AsyncAzureOpenAI | None = None,
max_retries: int = 0,
reasoning_effort: "Literal['none', 'low', 'medium', 'high'] | None" = 'none',
verbosity: "Literal['low', 'medium', 'high'] | None" = 'low') ‑> OpenAILLM-
Expand source code
@staticmethod def azure( *, model: str = "gpt-4o-mini", azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, temperature: float = 0.7, tool_choice: ToolChoice = "auto", max_completion_tokens: int | None = None, top_p: float | None = None, frequency_penalty: float | None = None, presence_penalty: float | None = None, seed: int | None = None, parallel_tool_calls: bool | None = None, timeout: httpx.Timeout | None = None, extra_headers: dict | None = None, extra_query: dict | None = None, extra_body: dict | None = None, client: openai.AsyncAzureOpenAI | None = None, max_retries: int = 0, reasoning_effort: Literal["none", "low", "medium", "high"] | None = "none", verbosity: Literal["low", "medium", "high"] | None = "low", ) -> "OpenAILLM": """ Create a new instance of Azure OpenAI LLM. Automatically infers the following from environment variables when not provided: - ``api_key`` from ``AZURE_OPENAI_API_KEY`` - ``organization`` from ``OPENAI_ORG_ID`` - ``project`` from ``OPENAI_PROJECT_ID`` - ``azure_ad_token`` from ``AZURE_OPENAI_AD_TOKEN`` - ``api_version`` from ``OPENAI_API_VERSION`` - ``azure_endpoint`` from ``AZURE_OPENAI_ENDPOINT`` - ``azure_deployment`` from ``AZURE_OPENAI_DEPLOYMENT`` (falls back to ``model``) Pass ``client`` to supply a pre-built ``openai.AsyncAzureOpenAI`` instance. When ``client`` is provided, connection/credential params are ignored. """ if client is not None: instance = OpenAILLM( model=model, temperature=temperature, tool_choice=tool_choice, max_completion_tokens=max_completion_tokens, top_p=top_p, frequency_penalty=frequency_penalty, presence_penalty=presence_penalty, seed=seed, parallel_tool_calls=parallel_tool_calls, extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, client=client, reasoning_effort=reasoning_effort, verbosity=verbosity, ) return instance azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") azure_deployment = azure_deployment or os.getenv("AZURE_OPENAI_DEPLOYMENT") api_version = api_version or os.getenv("OPENAI_API_VERSION") api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") azure_ad_token = azure_ad_token or os.getenv("AZURE_OPENAI_AD_TOKEN") organization = organization or os.getenv("OPENAI_ORG_ID") project = project or os.getenv("OPENAI_PROJECT_ID") if not azure_deployment: azure_deployment = model if not azure_endpoint: raise ValueError( "Azure endpoint must be provided either through azure_endpoint parameter " "or AZURE_OPENAI_ENDPOINT environment variable" ) if not api_key and not azure_ad_token: raise ValueError("Either API key or Azure AD token must be provided") _timeout = timeout or httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0) azure_client = openai.AsyncAzureOpenAI( max_retries=max_retries, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, organization=organization, project=project, base_url=base_url, timeout=_timeout, ) instance = OpenAILLM( model=model, temperature=temperature, tool_choice=tool_choice, max_completion_tokens=max_completion_tokens, top_p=top_p, frequency_penalty=frequency_penalty, presence_penalty=presence_penalty, seed=seed, parallel_tool_calls=parallel_tool_calls, extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, client=azure_client, reasoning_effort=reasoning_effort, verbosity=verbosity, ) return instanceCreate a new instance of Azure OpenAI LLM.
Automatically infers the following from environment variables when not provided: -
api_keyfromAZURE_OPENAI_API_KEY-organizationfromOPENAI_ORG_ID-projectfromOPENAI_PROJECT_ID-azure_ad_tokenfromAZURE_OPENAI_AD_TOKEN-api_versionfromOPENAI_API_VERSION-azure_endpointfromAZURE_OPENAI_ENDPOINT-azure_deploymentfromAZURE_OPENAI_DEPLOYMENT(falls back tomodel)Pass
clientto supply a pre-builtopenai.AsyncAzureOpenAIinstance. Whenclientis provided, connection/credential params are ignored.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources. Only closes the underlying HTTP client if this instance owns it.""" await self.cancel_current_generation() if self._owns_client and self._client: await self._client.close() await super().aclose()Cleanup resources. Only closes the underlying HTTP client if this instance owns it.
async def cancel_current_generation(self) ‑> None-
Expand source code
async def cancel_current_generation(self) -> None: self._cancelled = TrueCancel the current LLM generation if active.
Raises
NotImplementedError- This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
conversational_graph: Any | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]-
Expand source code
async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, conversational_graph: Any | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Implement chat functionality using OpenAI's chat completion API. Args: messages: ChatContext containing conversation history. tools: Optional list of function tools available to the model. **kwargs: Additional arguments forwarded to the OpenAI API. Yields: LLMResponse objects containing the model's responses. """ self._cancelled = False def _format_content(content: Union[str, List[ChatContent]]): if isinstance(content, str): return content formatted_parts = [] for part in content: if isinstance(part, str): formatted_parts.append({"type": "text", "text": part}) elif isinstance(part, ImageContent): image_url_data = {"url": part.to_data_url()} if part.inference_detail != "auto": image_url_data["detail"] = part.inference_detail formatted_parts.append( { "type": "image_url", "image_url": image_url_data, } ) return formatted_parts is_reasoning = self._is_reasoning_model() # Build the messages list using the modern tool_calls / tool role format. # FunctionCall → assistant turn with tool_calls; FunctionCallOutput → tool role. openai_messages = [] i = 0 items = messages.items while i < len(items): msg = items[i] if msg is None: i += 1 continue if isinstance(msg, ChatMessage): role = msg.role.value if is_reasoning and role == "system": role = "developer" openai_messages.append({ "role": role, "content": _format_content(msg.content), **({"name": msg.name} if hasattr(msg, "name") and msg.name else {}), }) i += 1 elif isinstance(msg, FunctionCall): # Collect all consecutive FunctionCall items (parallel tool calls) into one # assistant message with a tool_calls list. tool_calls = [] while i < len(items) and isinstance(items[i], FunctionCall): fc = items[i] tool_calls.append({ "id": fc.call_id, "type": "function", "function": { "name": fc.name, "arguments": fc.arguments, }, }) i += 1 openai_messages.append({ "role": "assistant", "content": None, "tool_calls": tool_calls, }) elif isinstance(msg, FunctionCallOutput): openai_messages.append({ "role": "tool", "tool_call_id": msg.call_id, "content": msg.output, }) i += 1 else: i += 1 completion_params: dict = { "model": self.model, "messages": openai_messages, "stream": True, "stream_options": {"include_usage": True}, } if is_reasoning: if self.max_completion_tokens is not None: completion_params["max_completion_tokens"] = self.max_completion_tokens if self.reasoning_effort is not None: completion_params["reasoning_effort"] = self.reasoning_effort if self.verbosity is not None: completion_params["text"] = {"format": {"type": "text"}, "verbosity": self.verbosity} else: completion_params["temperature"] = self.temperature if self.max_completion_tokens is not None: completion_params["max_completion_tokens"] = self.max_completion_tokens if self.top_p is not None: completion_params["top_p"] = self.top_p if self.frequency_penalty is not None: completion_params["frequency_penalty"] = self.frequency_penalty if self.presence_penalty is not None: completion_params["presence_penalty"] = self.presence_penalty if self.seed is not None: completion_params["seed"] = self.seed if conversational_graph: completion_params["response_format"] = { "type": "json_schema", "json_schema": { "name": "conversational_graph_response", "strict": True, "schema": conversational_graph_schema } } # Modern tools API (replaces deprecated functions/function_call) if tools: formatted_tools = [] for tool in tools: if not is_function_tool(tool): continue try: tool_schema = build_openai_schema(tool) formatted_tools.append({"type": "function", "function": tool_schema}) except Exception as e: self.emit("error", f"Failed to format tool {tool}: {e}") continue if formatted_tools: completion_params["tools"] = formatted_tools # tool_choice: "auto"|"required"|"none" or {"type":"function","function":{"name":"..."}} if isinstance(self.tool_choice, dict): completion_params["tool_choice"] = self.tool_choice else: completion_params["tool_choice"] = self.tool_choice if self.parallel_tool_calls is not None: completion_params["parallel_tool_calls"] = self.parallel_tool_calls # Pass-through overrides from caller completion_params.update(kwargs) # Passthrough extra headers / query / body create_kwargs: dict = {} if self.extra_headers: create_kwargs["extra_headers"] = self.extra_headers if self.extra_query: create_kwargs["extra_query"] = self.extra_query if self.extra_body: create_kwargs["extra_body"] = self.extra_body response_stream = None try: response_stream = await self._client.chat.completions.create( **completion_params, **create_kwargs ) current_content = "" # Accumulate streamed tool call fragments keyed by delta index pending_tool_calls: dict[int, dict] = {} streaming_state = { "in_response": False, "response_start_index": -1, "yielded_content_length": 0 } usage_metadata: dict = { "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, "prompt_cached_tokens": 0, "reasoning_tokens": 0, "request_id": None, "model": self.model, } async for chunk in response_stream: if self._cancelled: break if hasattr(chunk, 'usage') and chunk.usage is not None: usage_metadata["prompt_tokens"] = chunk.usage.prompt_tokens or 0 usage_metadata["completion_tokens"] = chunk.usage.completion_tokens or 0 usage_metadata["total_tokens"] = chunk.usage.total_tokens or 0 usage_metadata["request_id"] = getattr(chunk, "id", None) usage_metadata["model"] = getattr(chunk, "model", self.model) if hasattr(chunk.usage, 'prompt_tokens_details') and chunk.usage.prompt_tokens_details: usage_metadata["prompt_cached_tokens"] = getattr( chunk.usage.prompt_tokens_details, 'cached_tokens', 0 ) or 0 if hasattr(chunk.usage, 'completion_tokens_details') and chunk.usage.completion_tokens_details: usage_metadata["reasoning_tokens"] = getattr( chunk.usage.completion_tokens_details, 'reasoning_tokens', 0 ) or 0 yield LLMResponse(content="", role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata}) if not chunk.choices: continue delta = chunk.choices[0].delta finish_reason = chunk.choices[0].finish_reason # Accumulate tool call fragments per index if delta.tool_calls: for tc in delta.tool_calls: idx = tc.index if idx not in pending_tool_calls: pending_tool_calls[idx] = { "id": tc.id or "", "name": (tc.function.name or "") if tc.function else "", "arguments": (tc.function.arguments or "") if tc.function else "", } else: if tc.function: if tc.function.name: pending_tool_calls[idx]["name"] += tc.function.name if tc.function.arguments: pending_tool_calls[idx]["arguments"] += tc.function.arguments # Emit all accumulated tool calls once the model signals it is done if finish_reason == "tool_calls" and pending_tool_calls: for tc_data in sorted(pending_tool_calls.values(), key=lambda x: x["id"]): try: args = json.loads(tc_data["arguments"]) except json.JSONDecodeError: self.emit("error", f"Failed to parse tool call arguments: {tc_data['arguments']}") args = {} yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={ "function_call": {"name": tc_data["name"], "arguments": args, "id": tc_data["id"]}, "usage": usage_metadata, } ) pending_tool_calls = {} elif delta.content is not None: current_content += delta.content if conversational_graph: for content_chunk in conversational_graph.stream_conversational_graph_response( current_content, streaming_state ): yield LLMResponse( content=content_chunk, role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata}, ) else: yield LLMResponse( content=delta.content, role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata}, ) # Flush any tool calls not yet emitted (stream ended without explicit finish_reason) if pending_tool_calls and not self._cancelled: for tc_data in sorted(pending_tool_calls.values(), key=lambda x: x["id"]): try: args = json.loads(tc_data["arguments"]) except json.JSONDecodeError: self.emit("error", f"Failed to parse tool call arguments: {tc_data['arguments']}") args = {} yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={ "function_call": {"name": tc_data["name"], "arguments": args, "id": tc_data["id"]}, "usage": usage_metadata, } ) if current_content and not self._cancelled and conversational_graph: try: parsed_json = json.loads(current_content.strip()) yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata, "graph_response": parsed_json} ) except json.JSONDecodeError: yield LLMResponse( content=current_content, role=ChatRole.ASSISTANT, metadata={"usage": usage_metadata} ) except Exception as e: if not self._cancelled: self.emit("error", e) raise finally: if response_stream is not None: try: await response_stream.close() except Exception: passImplement chat functionality using OpenAI's chat completion API.
Args
messages- ChatContext containing conversation history.
tools- Optional list of function tools available to the model.
**kwargs- Additional arguments forwarded to the OpenAI API.
Yields
LLMResponse objects containing the model's responses.