Module videosdk.plugins.langchain.llm

Classes

class LangChainLLM (llm: BaseChatModel, tools: list | None = None, max_tool_iterations: int = 10)
Expand source code
class LangChainLLM(LLM):
    """VideoSDK LLM adapter for any LangChain ``BaseChatModel``.

    **Two usage modes:**

    Mode A — VideoSDK tools (``@function_tool`` methods on the Agent class):
        Pass no tools at init. VideoSDK's ContentGeneration automatically passes
        ``@function_tool`` methods to ``chat(tools=...)``.  This adapter converts
        them to LangChain stubs for schema binding, then emits tool call metadata
        back to VideoSDK which handles dispatch and re-calls ``chat()`` with the
        result — exactly like ``OpenAILLM`` and ``GoogleLLM``.

    Mode B — LangChain-native tools (Tavily, Wikipedia, custom ``@tool`` functions):
        Pass LangChain tools at init via ``tools=[...]``.  The full tool-calling
        loop (call model → execute tool → feed result → repeat) runs entirely
        inside this adapter.  The voice pipeline only sees the final text stream.

    Both modes can also be used together on the same instance.

    Args:
        llm: Any LangChain ``BaseChatModel`` instance.
        tools: Optional list of LangChain tools executed internally (Mode B).
        max_tool_iterations: Safety cap on consecutive internal tool-call rounds.

    Example — VideoSDK tools (Mode A)::

        from videosdk.plugins.langchain import LangChainLLM
        from langchain_openai import ChatOpenAI

        llm = LangChainLLM(llm=ChatOpenAI(model="gpt-4o-mini"))

        # Tools come from @function_tool methods on the Agent subclass —
        # no extra configuration needed here.

    Example — LangChain tools (Mode B)::

        from videosdk.plugins.langchain import LangChainLLM
        from langchain_openai import ChatOpenAI
        from langchain_community.tools.tavily_search import TavilySearchResults

        llm = LangChainLLM(
            llm=ChatOpenAI(model="gpt-4o-mini"),
            tools=[TavilySearchResults(max_results=3)],
        )
    """

    def __init__(
        self,
        llm: BaseChatModel,
        tools: list | None = None,
        max_tool_iterations: int = 10,
    ) -> None:
        super().__init__()
        self._base_llm = llm
        self._langchain_tools: list = tools or []
        self._max_tool_iterations = max_tool_iterations
        self._cancelled = False
        self._lc_tool_map: dict[str, Any] = {t.name: t for t in self._langchain_tools}
        self._llm_with_lc_tools = (
            self._base_llm.bind_tools(self._langchain_tools)
            if self._langchain_tools
            else self._base_llm
        )

    def _convert_messages(self, ctx: ChatContext) -> List[BaseMessage]:
        """Convert a VideoSDK ChatContext into a LangChain message list."""
        lc_messages: List[BaseMessage] = []
        for item in ctx.items:
            if item is None:
                continue
            if isinstance(item, ChatMessage):
                content = item.content
                if isinstance(content, list):
                    content = " ".join(str(c) for c in content if isinstance(c, str))
                if item.role == ChatRole.SYSTEM:
                    lc_messages.append(SystemMessage(content=content))
                elif item.role == ChatRole.USER:
                    lc_messages.append(HumanMessage(content=content))
                elif item.role == ChatRole.ASSISTANT:
                    lc_messages.append(AIMessage(content=content))
            elif isinstance(item, FunctionCall):
                try:
                    args = (
                        json.loads(item.arguments)
                        if isinstance(item.arguments, str)
                        else item.arguments
                    )
                except json.JSONDecodeError:
                    args = {}
                lc_messages.append(
                    AIMessage(
                        content="",
                        tool_calls=[{"id": item.call_id, "name": item.name, "args": args, "type": "tool_call"}],
                    )
                )
            elif isinstance(item, FunctionCallOutput):
                lc_messages.append(
                    ToolMessage(content=item.output, tool_call_id=item.call_id, name=item.name)
                )
        return lc_messages

    async def _stream_round(
        self, llm: BaseChatModel, lc_messages: List[BaseMessage], **kwargs: Any
    ):
        """Stream one model call. Yields (content_chunk, tool_call_chunks) per chunk."""
        async for chunk in llm.astream(lc_messages, **kwargs):
            if self._cancelled:
                return
            if not isinstance(chunk, AIMessageChunk):
                continue
            yield chunk


    async def _chat_videosdk_tools(
        self,
        lc_messages: List[BaseMessage],
        videosdk_tool_names: set[str],
        llm: BaseChatModel,
        **kwargs: Any,
    ) -> AsyncIterator[LLMResponse]:
        """Single-shot stream: text chunks yielded, tool calls emitted as metadata."""
        pending_tool_calls: dict[int, dict] = {}

        async for chunk in self._stream_round(llm, lc_messages, **kwargs):
            if chunk.content:
                yield LLMResponse(content=chunk.content, role=ChatRole.ASSISTANT)

            if chunk.tool_call_chunks:
                for tc in chunk.tool_call_chunks:
                    idx = tc.get("index", 0)
                    if idx not in pending_tool_calls:
                        pending_tool_calls[idx] = {
                            "id": tc.get("id") or "",
                            "name": tc.get("name") or "",
                            "args": tc.get("args") or "",
                        }
                    else:
                        existing = pending_tool_calls[idx]
                        if tc.get("name"):
                            existing["name"] += tc["name"]
                        if tc.get("args"):
                            existing["args"] += tc["args"]
                        if tc.get("id") and not existing["id"]:
                            existing["id"] = tc["id"]

        for tc_data in pending_tool_calls.values():
            try:
                args = json.loads(tc_data["args"]) if tc_data["args"] else {}
            except json.JSONDecodeError:
                args = {}
            yield LLMResponse(
                content="",
                role=ChatRole.ASSISTANT,
                metadata={"function_call": {"name": tc_data["name"], "arguments": args, "id": tc_data["id"]}},
            )

    async def _chat_lc_tools_loop(
        self,
        lc_messages: List[BaseMessage],
        llm: BaseChatModel,
        **kwargs: Any,
    ) -> AsyncIterator[LLMResponse]:
        """Full internal tool-calling loop until a text-only response is produced."""
        for iteration in range(self._max_tool_iterations):
            if self._cancelled:
                return

            full_content = ""
            pending_tool_calls: dict[int, dict] = {}

            async for chunk in self._stream_round(llm, lc_messages, **kwargs):
                if chunk.content:
                    full_content += chunk.content
                    yield LLMResponse(content=chunk.content, role=ChatRole.ASSISTANT)
                if chunk.tool_call_chunks:
                    for tc in chunk.tool_call_chunks:
                        idx = tc.get("index", 0)
                        if idx not in pending_tool_calls:
                            pending_tool_calls[idx] = {
                                "id": tc.get("id") or "",
                                "name": tc.get("name") or "",
                                "args": tc.get("args") or "",
                            }
                        else:
                            existing = pending_tool_calls[idx]
                            if tc.get("name"):
                                existing["name"] += tc["name"]
                            if tc.get("args"):
                                existing["args"] += tc["args"]
                            if tc.get("id") and not existing["id"]:
                                existing["id"] = tc["id"]

            if not pending_tool_calls:
                break

            parsed: list[dict] = []
            for tc_data in pending_tool_calls.values():
                try:
                    args = json.loads(tc_data["args"]) if tc_data["args"] else {}
                except json.JSONDecodeError:
                    args = {}
                parsed.append({"id": tc_data["id"], "name": tc_data["name"], "args": args, "type": "tool_call"})

            lc_messages.append(AIMessage(content=full_content, tool_calls=parsed))

            for tc in parsed:
                if self._cancelled:
                    return
                tool = self._lc_tool_map.get(tc["name"])
                if tool is None:
                    result = f"Tool '{tc['name']}' not found."
                    logger.warning(result)
                else:
                    try:
                        result = await tool.ainvoke(tc["args"])
                    except Exception as exc:
                        result = f"Tool execution error: {exc}"
                        logger.error("Error executing tool %s: %s", tc["name"], exc)

                lc_messages.append(
                    ToolMessage(content=str(result), tool_call_id=tc["id"], name=tc["name"])
                )

            logger.debug("LangChainLLM tool iteration %d complete", iteration + 1)

    async def chat(
        self,
        messages: ChatContext,
        tools: list[FunctionTool] | None = None,
        conversational_graph: Any | None = None,
        **kwargs: Any,
    ) -> AsyncIterator[LLMResponse]:
        """Stream a response from the LangChain model.

        - If VideoSDK ``@function_tool`` tools are passed (via ``tools``): the model
          receives their schemas, and any tool calls are emitted as ``LLMResponse``
          metadata for VideoSDK ContentGeneration to dispatch (Mode A).
        - If only LangChain-native tools were given at init (``tools`` is empty):
          the internal tool-calling loop runs until a text answer is produced (Mode B).

        Args:
            messages: VideoSDK ChatContext with full conversation history.
            tools: VideoSDK FunctionTools from the Agent's ``@function_tool`` methods.
            conversational_graph: Accepted for API compatibility; not used by LangChain.
            **kwargs: Forwarded to the underlying model's ``astream`` call.

        Yields:
            LLMResponse chunks — text content and/or tool call metadata.
        """
        self._cancelled = False
        lc_messages = self._convert_messages(messages)

        try:
            videosdk_tools = [vt for vt in (tools or []) if is_function_tool(vt)]
            if videosdk_tools:
                videosdk_tool_names = set()
                stub_tools: list[StructuredTool] = []
                for vt in videosdk_tools:
                    info = get_tool_info(vt)
                    schema = build_openai_schema(vt)
                    videosdk_tool_names.add(info.name)
                    stub_tools.append(
                        _make_stub_tool(
                            name=info.name,
                            description=info.description or info.name,
                            parameters=schema.get("parameters", {}),
                        )
                    )
                active_llm = self._base_llm.bind_tools(stub_tools + self._langchain_tools)
                async for chunk in self._chat_videosdk_tools(
                    lc_messages, videosdk_tool_names, active_llm, **kwargs
                ):
                    yield chunk
                return

            async for chunk in self._chat_lc_tools_loop(
                lc_messages, self._llm_with_lc_tools, **kwargs
            ):
                yield chunk

        except Exception as exc:
            if not self._cancelled:
                self.emit("error", exc)
            raise

    async def cancel_current_generation(self) -> None:
        self._cancelled = True

    async def aclose(self) -> None:
        await super().aclose()

VideoSDK LLM adapter for any LangChain BaseChatModel.

Two usage modes:

Mode A — VideoSDK tools (@function_tool methods on the Agent class): Pass no tools at init. VideoSDK's ContentGeneration automatically passes @function_tool methods to chat(tools=...). This adapter converts them to LangChain stubs for schema binding, then emits tool call metadata back to VideoSDK which handles dispatch and re-calls chat() with the result — exactly like OpenAILLM and GoogleLLM.

Mode B — LangChain-native tools (Tavily, Wikipedia, custom @tool functions): Pass LangChain tools at init via tools=[...]. The full tool-calling loop (call model → execute tool → feed result → repeat) runs entirely inside this adapter. The voice pipeline only sees the final text stream.

Both modes can also be used together on the same instance.

Args

llm
Any LangChain BaseChatModel instance.
tools
Optional list of LangChain tools executed internally (Mode B).
max_tool_iterations
Safety cap on consecutive internal tool-call rounds.

Example — VideoSDK tools (Mode A)::

from videosdk.plugins.langchain import LangChainLLM
from langchain_openai import ChatOpenAI

llm = LangChainLLM(llm=ChatOpenAI(model="gpt-4o-mini"))

# Tools come from @function_tool methods on the Agent subclass —
# no extra configuration needed here.

Example — LangChain tools (Mode B)::

from videosdk.plugins.langchain import LangChainLLM
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

llm = LangChainLLM(
    llm=ChatOpenAI(model="gpt-4o-mini"),
    tools=[TavilySearchResults(max_results=3)],
)

Initialize the LLM base class.

Ancestors

  • videosdk.agents.llm.llm.LLM
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await super().aclose()

Cleanup resources.

async def cancel_current_generation(self) ‑> None
Expand source code
async def cancel_current_generation(self) -> None:
    self._cancelled = True

Cancel the current LLM generation if active.

Raises

NotImplementedError
This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
conversational_graph: Any | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]
Expand source code
async def chat(
    self,
    messages: ChatContext,
    tools: list[FunctionTool] | None = None,
    conversational_graph: Any | None = None,
    **kwargs: Any,
) -> AsyncIterator[LLMResponse]:
    """Stream a response from the LangChain model.

    - If VideoSDK ``@function_tool`` tools are passed (via ``tools``): the model
      receives their schemas, and any tool calls are emitted as ``LLMResponse``
      metadata for VideoSDK ContentGeneration to dispatch (Mode A).
    - If only LangChain-native tools were given at init (``tools`` is empty):
      the internal tool-calling loop runs until a text answer is produced (Mode B).

    Args:
        messages: VideoSDK ChatContext with full conversation history.
        tools: VideoSDK FunctionTools from the Agent's ``@function_tool`` methods.
        conversational_graph: Accepted for API compatibility; not used by LangChain.
        **kwargs: Forwarded to the underlying model's ``astream`` call.

    Yields:
        LLMResponse chunks — text content and/or tool call metadata.
    """
    self._cancelled = False
    lc_messages = self._convert_messages(messages)

    try:
        videosdk_tools = [vt for vt in (tools or []) if is_function_tool(vt)]
        if videosdk_tools:
            videosdk_tool_names = set()
            stub_tools: list[StructuredTool] = []
            for vt in videosdk_tools:
                info = get_tool_info(vt)
                schema = build_openai_schema(vt)
                videosdk_tool_names.add(info.name)
                stub_tools.append(
                    _make_stub_tool(
                        name=info.name,
                        description=info.description or info.name,
                        parameters=schema.get("parameters", {}),
                    )
                )
            active_llm = self._base_llm.bind_tools(stub_tools + self._langchain_tools)
            async for chunk in self._chat_videosdk_tools(
                lc_messages, videosdk_tool_names, active_llm, **kwargs
            ):
                yield chunk
            return

        async for chunk in self._chat_lc_tools_loop(
            lc_messages, self._llm_with_lc_tools, **kwargs
        ):
            yield chunk

    except Exception as exc:
        if not self._cancelled:
            self.emit("error", exc)
        raise

Stream a response from the LangChain model.

  • If VideoSDK @function_tool tools are passed (via tools): the model receives their schemas, and any tool calls are emitted as LLMResponse metadata for VideoSDK ContentGeneration to dispatch (Mode A).
  • If only LangChain-native tools were given at init (tools is empty): the internal tool-calling loop runs until a text answer is produced (Mode B).

Args

messages
VideoSDK ChatContext with full conversation history.
tools
VideoSDK FunctionTools from the Agent's @function_tool methods.
conversational_graph
Accepted for API compatibility; not used by LangChain.
**kwargs
Forwarded to the underlying model's astream call.

Yields

LLMResponse chunks — text content and/or tool call metadata.