Package videosdk.plugins.langchain

Sub-modules

videosdk.plugins.langchain.graph
videosdk.plugins.langchain.llm

Classes

class LangChainLLM (llm: BaseChatModel, tools: list | None = None, max_tool_iterations: int = 10)
Expand source code
class LangChainLLM(LLM):
    """VideoSDK LLM adapter for any LangChain ``BaseChatModel``.

    **Two usage modes:**

    Mode A — VideoSDK tools (``@function_tool`` methods on the Agent class):
        Pass no tools at init. VideoSDK's ContentGeneration automatically passes
        ``@function_tool`` methods to ``chat(tools=...)``.  This adapter converts
        them to LangChain stubs for schema binding, then emits tool call metadata
        back to VideoSDK which handles dispatch and re-calls ``chat()`` with the
        result — exactly like ``OpenAILLM`` and ``GoogleLLM``.

    Mode B — LangChain-native tools (Tavily, Wikipedia, custom ``@tool`` functions):
        Pass LangChain tools at init via ``tools=[...]``.  The full tool-calling
        loop (call model → execute tool → feed result → repeat) runs entirely
        inside this adapter.  The voice pipeline only sees the final text stream.

    Both modes can also be used together on the same instance.

    Args:
        llm: Any LangChain ``BaseChatModel`` instance.
        tools: Optional list of LangChain tools executed internally (Mode B).
        max_tool_iterations: Safety cap on consecutive internal tool-call rounds.

    Example — VideoSDK tools (Mode A)::

        from videosdk.plugins.langchain import LangChainLLM
        from langchain_openai import ChatOpenAI

        llm = LangChainLLM(llm=ChatOpenAI(model="gpt-4o-mini"))

        # Tools come from @function_tool methods on the Agent subclass —
        # no extra configuration needed here.

    Example — LangChain tools (Mode B)::

        from videosdk.plugins.langchain import LangChainLLM
        from langchain_openai import ChatOpenAI
        from langchain_community.tools.tavily_search import TavilySearchResults

        llm = LangChainLLM(
            llm=ChatOpenAI(model="gpt-4o-mini"),
            tools=[TavilySearchResults(max_results=3)],
        )
    """

    def __init__(
        self,
        llm: BaseChatModel,
        tools: list | None = None,
        max_tool_iterations: int = 10,
    ) -> None:
        super().__init__()
        self._base_llm = llm
        self._langchain_tools: list = tools or []
        self._max_tool_iterations = max_tool_iterations
        self._cancelled = False
        self._lc_tool_map: dict[str, Any] = {t.name: t for t in self._langchain_tools}
        self._llm_with_lc_tools = (
            self._base_llm.bind_tools(self._langchain_tools)
            if self._langchain_tools
            else self._base_llm
        )

    def _convert_messages(self, ctx: ChatContext) -> List[BaseMessage]:
        """Convert a VideoSDK ChatContext into a LangChain message list."""
        lc_messages: List[BaseMessage] = []
        for item in ctx.items:
            if item is None:
                continue
            if isinstance(item, ChatMessage):
                content = item.content
                if isinstance(content, list):
                    content = " ".join(str(c) for c in content if isinstance(c, str))
                if item.role == ChatRole.SYSTEM:
                    lc_messages.append(SystemMessage(content=content))
                elif item.role == ChatRole.USER:
                    lc_messages.append(HumanMessage(content=content))
                elif item.role == ChatRole.ASSISTANT:
                    lc_messages.append(AIMessage(content=content))
            elif isinstance(item, FunctionCall):
                try:
                    args = (
                        json.loads(item.arguments)
                        if isinstance(item.arguments, str)
                        else item.arguments
                    )
                except json.JSONDecodeError:
                    args = {}
                lc_messages.append(
                    AIMessage(
                        content="",
                        tool_calls=[{"id": item.call_id, "name": item.name, "args": args, "type": "tool_call"}],
                    )
                )
            elif isinstance(item, FunctionCallOutput):
                lc_messages.append(
                    ToolMessage(content=item.output, tool_call_id=item.call_id, name=item.name)
                )
        return lc_messages

    async def _stream_round(
        self, llm: BaseChatModel, lc_messages: List[BaseMessage], **kwargs: Any
    ):
        """Stream one model call. Yields (content_chunk, tool_call_chunks) per chunk."""
        async for chunk in llm.astream(lc_messages, **kwargs):
            if self._cancelled:
                return
            if not isinstance(chunk, AIMessageChunk):
                continue
            yield chunk


    async def _chat_videosdk_tools(
        self,
        lc_messages: List[BaseMessage],
        videosdk_tool_names: set[str],
        llm: BaseChatModel,
        **kwargs: Any,
    ) -> AsyncIterator[LLMResponse]:
        """Single-shot stream: text chunks yielded, tool calls emitted as metadata."""
        pending_tool_calls: dict[int, dict] = {}

        async for chunk in self._stream_round(llm, lc_messages, **kwargs):
            if chunk.content:
                yield LLMResponse(content=chunk.content, role=ChatRole.ASSISTANT)

            if chunk.tool_call_chunks:
                for tc in chunk.tool_call_chunks:
                    idx = tc.get("index", 0)
                    if idx not in pending_tool_calls:
                        pending_tool_calls[idx] = {
                            "id": tc.get("id") or "",
                            "name": tc.get("name") or "",
                            "args": tc.get("args") or "",
                        }
                    else:
                        existing = pending_tool_calls[idx]
                        if tc.get("name"):
                            existing["name"] += tc["name"]
                        if tc.get("args"):
                            existing["args"] += tc["args"]
                        if tc.get("id") and not existing["id"]:
                            existing["id"] = tc["id"]

        for tc_data in pending_tool_calls.values():
            try:
                args = json.loads(tc_data["args"]) if tc_data["args"] else {}
            except json.JSONDecodeError:
                args = {}
            yield LLMResponse(
                content="",
                role=ChatRole.ASSISTANT,
                metadata={"function_call": {"name": tc_data["name"], "arguments": args, "id": tc_data["id"]}},
            )

    async def _chat_lc_tools_loop(
        self,
        lc_messages: List[BaseMessage],
        llm: BaseChatModel,
        **kwargs: Any,
    ) -> AsyncIterator[LLMResponse]:
        """Full internal tool-calling loop until a text-only response is produced."""
        for iteration in range(self._max_tool_iterations):
            if self._cancelled:
                return

            full_content = ""
            pending_tool_calls: dict[int, dict] = {}

            async for chunk in self._stream_round(llm, lc_messages, **kwargs):
                if chunk.content:
                    full_content += chunk.content
                    yield LLMResponse(content=chunk.content, role=ChatRole.ASSISTANT)
                if chunk.tool_call_chunks:
                    for tc in chunk.tool_call_chunks:
                        idx = tc.get("index", 0)
                        if idx not in pending_tool_calls:
                            pending_tool_calls[idx] = {
                                "id": tc.get("id") or "",
                                "name": tc.get("name") or "",
                                "args": tc.get("args") or "",
                            }
                        else:
                            existing = pending_tool_calls[idx]
                            if tc.get("name"):
                                existing["name"] += tc["name"]
                            if tc.get("args"):
                                existing["args"] += tc["args"]
                            if tc.get("id") and not existing["id"]:
                                existing["id"] = tc["id"]

            if not pending_tool_calls:
                break

            parsed: list[dict] = []
            for tc_data in pending_tool_calls.values():
                try:
                    args = json.loads(tc_data["args"]) if tc_data["args"] else {}
                except json.JSONDecodeError:
                    args = {}
                parsed.append({"id": tc_data["id"], "name": tc_data["name"], "args": args, "type": "tool_call"})

            lc_messages.append(AIMessage(content=full_content, tool_calls=parsed))

            for tc in parsed:
                if self._cancelled:
                    return
                tool = self._lc_tool_map.get(tc["name"])
                if tool is None:
                    result = f"Tool '{tc['name']}' not found."
                    logger.warning(result)
                else:
                    try:
                        result = await tool.ainvoke(tc["args"])
                    except Exception as exc:
                        result = f"Tool execution error: {exc}"
                        logger.error("Error executing tool %s: %s", tc["name"], exc)

                lc_messages.append(
                    ToolMessage(content=str(result), tool_call_id=tc["id"], name=tc["name"])
                )

            logger.debug("LangChainLLM tool iteration %d complete", iteration + 1)

    async def chat(
        self,
        messages: ChatContext,
        tools: list[FunctionTool] | None = None,
        conversational_graph: Any | None = None,
        **kwargs: Any,
    ) -> AsyncIterator[LLMResponse]:
        """Stream a response from the LangChain model.

        - If VideoSDK ``@function_tool`` tools are passed (via ``tools``): the model
          receives their schemas, and any tool calls are emitted as ``LLMResponse``
          metadata for VideoSDK ContentGeneration to dispatch (Mode A).
        - If only LangChain-native tools were given at init (``tools`` is empty):
          the internal tool-calling loop runs until a text answer is produced (Mode B).

        Args:
            messages: VideoSDK ChatContext with full conversation history.
            tools: VideoSDK FunctionTools from the Agent's ``@function_tool`` methods.
            conversational_graph: Accepted for API compatibility; not used by LangChain.
            **kwargs: Forwarded to the underlying model's ``astream`` call.

        Yields:
            LLMResponse chunks — text content and/or tool call metadata.
        """
        self._cancelled = False
        lc_messages = self._convert_messages(messages)

        try:
            videosdk_tools = [vt for vt in (tools or []) if is_function_tool(vt)]
            if videosdk_tools:
                videosdk_tool_names = set()
                stub_tools: list[StructuredTool] = []
                for vt in videosdk_tools:
                    info = get_tool_info(vt)
                    schema = build_openai_schema(vt)
                    videosdk_tool_names.add(info.name)
                    stub_tools.append(
                        _make_stub_tool(
                            name=info.name,
                            description=info.description or info.name,
                            parameters=schema.get("parameters", {}),
                        )
                    )
                active_llm = self._base_llm.bind_tools(stub_tools + self._langchain_tools)
                async for chunk in self._chat_videosdk_tools(
                    lc_messages, videosdk_tool_names, active_llm, **kwargs
                ):
                    yield chunk
                return

            async for chunk in self._chat_lc_tools_loop(
                lc_messages, self._llm_with_lc_tools, **kwargs
            ):
                yield chunk

        except Exception as exc:
            if not self._cancelled:
                self.emit("error", exc)
            raise

    async def cancel_current_generation(self) -> None:
        self._cancelled = True

    async def aclose(self) -> None:
        await super().aclose()

VideoSDK LLM adapter for any LangChain BaseChatModel.

Two usage modes:

Mode A — VideoSDK tools (@function_tool methods on the Agent class): Pass no tools at init. VideoSDK's ContentGeneration automatically passes @function_tool methods to chat(tools=...). This adapter converts them to LangChain stubs for schema binding, then emits tool call metadata back to VideoSDK which handles dispatch and re-calls chat() with the result — exactly like OpenAILLM and GoogleLLM.

Mode B — LangChain-native tools (Tavily, Wikipedia, custom @tool functions): Pass LangChain tools at init via tools=[...]. The full tool-calling loop (call model → execute tool → feed result → repeat) runs entirely inside this adapter. The voice pipeline only sees the final text stream.

Both modes can also be used together on the same instance.

Args

llm
Any LangChain BaseChatModel instance.
tools
Optional list of LangChain tools executed internally (Mode B).
max_tool_iterations
Safety cap on consecutive internal tool-call rounds.

Example — VideoSDK tools (Mode A)::

from videosdk.plugins.langchain import LangChainLLM
from langchain_openai import ChatOpenAI

llm = LangChainLLM(llm=ChatOpenAI(model="gpt-4o-mini"))

# Tools come from @function_tool methods on the Agent subclass —
# no extra configuration needed here.

Example — LangChain tools (Mode B)::

from videosdk.plugins.langchain import LangChainLLM
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

llm = LangChainLLM(
    llm=ChatOpenAI(model="gpt-4o-mini"),
    tools=[TavilySearchResults(max_results=3)],
)

Initialize the LLM base class.

Ancestors

  • videosdk.agents.llm.llm.LLM
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await super().aclose()

Cleanup resources.

async def cancel_current_generation(self) ‑> None
Expand source code
async def cancel_current_generation(self) -> None:
    self._cancelled = True

Cancel the current LLM generation if active.

Raises

NotImplementedError
This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
conversational_graph: Any | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]
Expand source code
async def chat(
    self,
    messages: ChatContext,
    tools: list[FunctionTool] | None = None,
    conversational_graph: Any | None = None,
    **kwargs: Any,
) -> AsyncIterator[LLMResponse]:
    """Stream a response from the LangChain model.

    - If VideoSDK ``@function_tool`` tools are passed (via ``tools``): the model
      receives their schemas, and any tool calls are emitted as ``LLMResponse``
      metadata for VideoSDK ContentGeneration to dispatch (Mode A).
    - If only LangChain-native tools were given at init (``tools`` is empty):
      the internal tool-calling loop runs until a text answer is produced (Mode B).

    Args:
        messages: VideoSDK ChatContext with full conversation history.
        tools: VideoSDK FunctionTools from the Agent's ``@function_tool`` methods.
        conversational_graph: Accepted for API compatibility; not used by LangChain.
        **kwargs: Forwarded to the underlying model's ``astream`` call.

    Yields:
        LLMResponse chunks — text content and/or tool call metadata.
    """
    self._cancelled = False
    lc_messages = self._convert_messages(messages)

    try:
        videosdk_tools = [vt for vt in (tools or []) if is_function_tool(vt)]
        if videosdk_tools:
            videosdk_tool_names = set()
            stub_tools: list[StructuredTool] = []
            for vt in videosdk_tools:
                info = get_tool_info(vt)
                schema = build_openai_schema(vt)
                videosdk_tool_names.add(info.name)
                stub_tools.append(
                    _make_stub_tool(
                        name=info.name,
                        description=info.description or info.name,
                        parameters=schema.get("parameters", {}),
                    )
                )
            active_llm = self._base_llm.bind_tools(stub_tools + self._langchain_tools)
            async for chunk in self._chat_videosdk_tools(
                lc_messages, videosdk_tool_names, active_llm, **kwargs
            ):
                yield chunk
            return

        async for chunk in self._chat_lc_tools_loop(
            lc_messages, self._llm_with_lc_tools, **kwargs
        ):
            yield chunk

    except Exception as exc:
        if not self._cancelled:
            self.emit("error", exc)
        raise

Stream a response from the LangChain model.

  • If VideoSDK @function_tool tools are passed (via tools): the model receives their schemas, and any tool calls are emitted as LLMResponse metadata for VideoSDK ContentGeneration to dispatch (Mode A).
  • If only LangChain-native tools were given at init (tools is empty): the internal tool-calling loop runs until a text answer is produced (Mode B).

Args

messages
VideoSDK ChatContext with full conversation history.
tools
VideoSDK FunctionTools from the Agent's @function_tool methods.
conversational_graph
Accepted for API compatibility; not used by LangChain.
**kwargs
Forwarded to the underlying model's astream call.

Yields

LLMResponse chunks — text content and/or tool call metadata.

class LangGraphLLM (graph: Any,
output_node: str | None = None,
config: dict | None = None,
stream_mode: str | list[str] = 'messages',
subgraphs: bool = False,
context: Any | None = None)
Expand source code
class LangGraphLLM(LLM):
    """VideoSDK LLM adapter that wraps a compiled LangGraph StateGraph.

    The entire graph — its nodes, edges, tool nodes, conditional routing,
    and internal state — runs as the "LLM" from VideoSDK's perspective.
    The STT → LLM → TTS pipeline stays unchanged; this adapter bridges
    VideoSDK's ChatContext into the graph's message state and streams the
    graph's AI output back as ``LLMResponse`` chunks.

    Args:
        graph: A compiled LangGraph graph (``StateGraph.compile()``).
            Must accept ``{"messages": list[BaseMessage]}`` as input and
            use ``MessagesState`` (or a compatible schema).
        output_node: If provided, only text chunks emitted by this node
            name are forwarded to the voice pipeline.  Use this to
            restrict output to a dedicated synthesis/response node and
            suppress intermediate researcher/planner node text.
            Only applies when ``stream_mode="messages"``.
            Defaults to ``None`` (all AI text chunks are forwarded).
        config: Optional LangGraph ``RunnableConfig`` dict (e.g. for
            thread IDs, recursion limits, or custom callbacks).
        stream_mode: LangGraph streaming mode.  ``"messages"`` (default)
            streams ``AIMessageChunk`` tokens; ``"custom"`` streams
            arbitrary objects emitted by graph nodes via ``graph.send()``.
            Pass a list to enable both simultaneously.
        subgraphs: If ``True``, stream tokens from nested subgraphs as
            well as the top-level graph.  Requires LangGraph ≥ 0.2.
        context: Optional LangGraph 2.0 context object injected into the
            graph at runtime.  Useful for passing dependencies (database
            connections, session objects) that should not live in state.

    Example — basic usage::

        from videosdk.plugins.langchain import LangGraphLLM

        graph = build_my_research_graph()   # returns CompiledStateGraph
        llm = LangGraphLLM(graph=graph, output_node="synthesizer")

    Example — nested subgraphs::

        llm = LangGraphLLM(graph=graph, subgraphs=True)

    Example — custom streaming (nodes emit strings via graph.send)::

        llm = LangGraphLLM(graph=graph, stream_mode="custom")
    """

    def __init__(
        self,
        graph: Any, 
        output_node: str | None = None,
        config: dict | None = None,
        stream_mode: str | list[str] = "messages",
        subgraphs: bool = False,
        context: Any | None = None,
    ) -> None:
        super().__init__()
        modes = {stream_mode} if isinstance(stream_mode, str) else set(stream_mode)
        unsupported = modes - _SUPPORTED_MODES
        if unsupported:
            raise ValueError(
                f"Unsupported stream_mode(s): {unsupported}. "
                f"Supported: {_SUPPORTED_MODES}"
            )
        self._graph = graph
        self._output_node = output_node
        self._config = config or {}
        self._stream_mode = stream_mode
        self._subgraphs = subgraphs
        self._context = context
        self._cancelled = False

    def _convert_messages(self, ctx: ChatContext) -> List[BaseMessage]:
        """Convert a VideoSDK ChatContext into a LangChain message list."""
        lc_messages: List[BaseMessage] = []

        for item in ctx.items:
            if item is None:
                continue

            if isinstance(item, ChatMessage):
                content = item.content
                if isinstance(content, list):
                    content = " ".join(
                        str(c) for c in content if isinstance(c, str)
                    )
                if item.role == ChatRole.SYSTEM:
                    lc_messages.append(SystemMessage(content=content))
                elif item.role == ChatRole.USER:
                    lc_messages.append(HumanMessage(content=content))
                elif item.role == ChatRole.ASSISTANT:
                    lc_messages.append(AIMessage(content=content))

            elif isinstance(item, FunctionCall):
                try:
                    args = (
                        json.loads(item.arguments)
                        if isinstance(item.arguments, str)
                        else item.arguments
                    )
                except json.JSONDecodeError:
                    args = {}
                lc_messages.append(
                    AIMessage(
                        content="",
                        tool_calls=[
                            {
                                "id": item.call_id,
                                "name": item.name,
                                "args": args,
                                "type": "tool_call",
                            }
                        ],
                    )
                )

            elif isinstance(item, FunctionCallOutput):
                lc_messages.append(
                    ToolMessage(
                        content=item.output,
                        tool_call_id=item.call_id,
                        name=item.name,
                    )
                )

        return lc_messages


    def _extract_message_chunk(self, item: Any) -> BaseMessageChunk | str | None:
        """Normalise a raw item from ``graph.astream(stream_mode='messages')``.

        LangGraph can yield tokens in several shapes depending on version and
        whether ``subgraphs=True`` is used:

        - ``(token, metadata)``                     — standard single-graph
        - ``(namespace, (token, metadata))``         — with subgraphs=True
        - ``(mode, (token, metadata))``              — multi-mode list variant
        - ``(namespace, mode, (token, metadata))``   — future-proof extension
        """
        if isinstance(item, (BaseMessageChunk, str)):
            return item

        if not isinstance(item, tuple):
            return None

        if len(item) == 2 and not isinstance(item[1], tuple):
            return item[0]

        if len(item) == 2 and isinstance(item[1], tuple):
            inner = item[1]
            if len(inner) == 2:
                return inner[0]

        if len(item) == 3 and isinstance(item[2], tuple):
            inner = item[2]
            if len(inner) == 2:
                return inner[0]

        return None

    def _chunk_to_response(self, raw: Any) -> LLMResponse | None:
        """Convert a raw token/payload to an LLMResponse, or None to discard."""
        content: str | None = None

        if isinstance(raw, str):
            content = raw
        elif isinstance(raw, BaseMessageChunk):
            content = raw.content if isinstance(raw.content, str) else None
        elif isinstance(raw, dict):
            c = raw.get("content")
            content = c if isinstance(c, str) else None
        elif hasattr(raw, "content"):
            c = raw.content
            content = c if isinstance(c, str) else None

        if not content:
            return None
        return LLMResponse(content=content, role=ChatRole.ASSISTANT)


    async def chat(
        self,
        messages: ChatContext,
        tools: list | None = None,
        conversational_graph: Any | None = None,
        **kwargs: Any,
    ) -> AsyncIterator[LLMResponse]:
        """Run the LangGraph graph and stream its AI text output.

        The full conversation history from ``messages`` is injected as the
        initial ``{"messages": [...]}`` graph state.  The graph runs through
        all its nodes and this method streams the output back to the
        VideoSDK voice pipeline.

        Args:
            messages: VideoSDK ChatContext with full conversation history.
            tools: Ignored — tool execution is handled inside the graph.
            conversational_graph: Accepted for API compatibility; not used.
            **kwargs: Forwarded to ``graph.astream()``.

        Yields:
            LLMResponse text chunks as the graph streams its output.
        """
        self._cancelled = False
        lc_messages = self._convert_messages(messages)

        run_config = self._config or None
        is_multi_mode = isinstance(self._stream_mode, list)

        try:

            astream_kwargs: dict[str, Any] = {
                "stream_mode": self._stream_mode,
            }
            if run_config:
                astream_kwargs["config"] = run_config
            if self._subgraphs:
                astream_kwargs["subgraphs"] = True
            if self._context is not None:
                astream_kwargs["context"] = self._context
            astream_kwargs.update(kwargs)

            try:
                aiter = self._graph.astream({"messages": lc_messages}, **astream_kwargs)
            except TypeError:
                safe_kwargs: dict[str, Any] = {"stream_mode": self._stream_mode}
                if run_config:
                    safe_kwargs["config"] = run_config
                safe_kwargs.update(kwargs)
                aiter = self._graph.astream({"messages": lc_messages}, **safe_kwargs)

            async for item in aiter:
                if self._cancelled:
                    return

                if is_multi_mode and isinstance(item, tuple) and len(item) == 2:
                    mode, data = item
                    if not isinstance(mode, str):
                        continue
                    if mode == "custom":
                        resp = self._chunk_to_response(data)
                        if resp:
                            yield resp
                    elif mode == "messages":
                        token = self._extract_message_chunk(data)
                        if token is None:
                            continue
                        if not isinstance(token, AIMessageChunk) or not token.content:
                            continue
                        if self._output_node is not None:
                            meta = data[1] if isinstance(data, tuple) and len(data) == 2 else {}
                            node = meta.get("langgraph_node", "") if isinstance(meta, dict) else ""
                            if node != self._output_node:
                                continue
                        resp = self._chunk_to_response(token)
                        if resp:
                            yield resp
                    continue

                if self._stream_mode == "custom":
                    resp = self._chunk_to_response(item)
                    if resp:
                        yield resp

                elif self._stream_mode == "messages":
                    token = self._extract_message_chunk(item)
                    if token is None:
                        continue
                    if not isinstance(token, AIMessageChunk) or not token.content:
                        continue
                    if self._output_node is not None:
                        meta = item[1] if isinstance(item, tuple) and len(item) == 2 else {}
                        node = meta.get("langgraph_node", "") if isinstance(meta, dict) else ""
                        if node != self._output_node:
                            continue
                    resp = self._chunk_to_response(token)
                    if resp:
                        yield resp

        except Exception as exc:
            if not self._cancelled:
                self.emit("error", exc)
            raise
        
    async def cancel_current_generation(self) -> None:
        self._cancelled = True

    async def aclose(self) -> None:
        await super().aclose()

VideoSDK LLM adapter that wraps a compiled LangGraph StateGraph.

The entire graph — its nodes, edges, tool nodes, conditional routing, and internal state — runs as the "LLM" from VideoSDK's perspective. The STT → LLM → TTS pipeline stays unchanged; this adapter bridges VideoSDK's ChatContext into the graph's message state and streams the graph's AI output back as LLMResponse chunks.

Args

graph
A compiled LangGraph graph (StateGraph.compile()). Must accept {"messages": list[BaseMessage]} as input and use MessagesState (or a compatible schema).
output_node
If provided, only text chunks emitted by this node name are forwarded to the voice pipeline. Use this to restrict output to a dedicated synthesis/response node and suppress intermediate researcher/planner node text. Only applies when stream_mode="messages". Defaults to None (all AI text chunks are forwarded).
config
Optional LangGraph RunnableConfig dict (e.g. for thread IDs, recursion limits, or custom callbacks).
stream_mode
LangGraph streaming mode. "messages" (default) streams AIMessageChunk tokens; "custom" streams arbitrary objects emitted by graph nodes via graph.send(). Pass a list to enable both simultaneously.
subgraphs
If True, stream tokens from nested subgraphs as well as the top-level graph. Requires LangGraph ≥ 0.2.
context
Optional LangGraph 2.0 context object injected into the graph at runtime. Useful for passing dependencies (database connections, session objects) that should not live in state.

Example — basic usage::

from videosdk.plugins.langchain import LangGraphLLM

graph = build_my_research_graph()   # returns CompiledStateGraph
llm = LangGraphLLM(graph=graph, output_node="synthesizer")

Example — nested subgraphs::

llm = LangGraphLLM(graph=graph, subgraphs=True)

Example — custom streaming (nodes emit strings via graph.send)::

llm = LangGraphLLM(graph=graph, stream_mode="custom")

Initialize the LLM base class.

Ancestors

  • videosdk.agents.llm.llm.LLM
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await super().aclose()

Cleanup resources.

async def cancel_current_generation(self) ‑> None
Expand source code
async def cancel_current_generation(self) -> None:
    self._cancelled = True

Cancel the current LLM generation if active.

Raises

NotImplementedError
This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list | None = None,
conversational_graph: Any | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]
Expand source code
async def chat(
    self,
    messages: ChatContext,
    tools: list | None = None,
    conversational_graph: Any | None = None,
    **kwargs: Any,
) -> AsyncIterator[LLMResponse]:
    """Run the LangGraph graph and stream its AI text output.

    The full conversation history from ``messages`` is injected as the
    initial ``{"messages": [...]}`` graph state.  The graph runs through
    all its nodes and this method streams the output back to the
    VideoSDK voice pipeline.

    Args:
        messages: VideoSDK ChatContext with full conversation history.
        tools: Ignored — tool execution is handled inside the graph.
        conversational_graph: Accepted for API compatibility; not used.
        **kwargs: Forwarded to ``graph.astream()``.

    Yields:
        LLMResponse text chunks as the graph streams its output.
    """
    self._cancelled = False
    lc_messages = self._convert_messages(messages)

    run_config = self._config or None
    is_multi_mode = isinstance(self._stream_mode, list)

    try:

        astream_kwargs: dict[str, Any] = {
            "stream_mode": self._stream_mode,
        }
        if run_config:
            astream_kwargs["config"] = run_config
        if self._subgraphs:
            astream_kwargs["subgraphs"] = True
        if self._context is not None:
            astream_kwargs["context"] = self._context
        astream_kwargs.update(kwargs)

        try:
            aiter = self._graph.astream({"messages": lc_messages}, **astream_kwargs)
        except TypeError:
            safe_kwargs: dict[str, Any] = {"stream_mode": self._stream_mode}
            if run_config:
                safe_kwargs["config"] = run_config
            safe_kwargs.update(kwargs)
            aiter = self._graph.astream({"messages": lc_messages}, **safe_kwargs)

        async for item in aiter:
            if self._cancelled:
                return

            if is_multi_mode and isinstance(item, tuple) and len(item) == 2:
                mode, data = item
                if not isinstance(mode, str):
                    continue
                if mode == "custom":
                    resp = self._chunk_to_response(data)
                    if resp:
                        yield resp
                elif mode == "messages":
                    token = self._extract_message_chunk(data)
                    if token is None:
                        continue
                    if not isinstance(token, AIMessageChunk) or not token.content:
                        continue
                    if self._output_node is not None:
                        meta = data[1] if isinstance(data, tuple) and len(data) == 2 else {}
                        node = meta.get("langgraph_node", "") if isinstance(meta, dict) else ""
                        if node != self._output_node:
                            continue
                    resp = self._chunk_to_response(token)
                    if resp:
                        yield resp
                continue

            if self._stream_mode == "custom":
                resp = self._chunk_to_response(item)
                if resp:
                    yield resp

            elif self._stream_mode == "messages":
                token = self._extract_message_chunk(item)
                if token is None:
                    continue
                if not isinstance(token, AIMessageChunk) or not token.content:
                    continue
                if self._output_node is not None:
                    meta = item[1] if isinstance(item, tuple) and len(item) == 2 else {}
                    node = meta.get("langgraph_node", "") if isinstance(meta, dict) else ""
                    if node != self._output_node:
                        continue
                resp = self._chunk_to_response(token)
                if resp:
                    yield resp

    except Exception as exc:
        if not self._cancelled:
            self.emit("error", exc)
        raise

Run the LangGraph graph and stream its AI text output.

The full conversation history from messages is injected as the initial {"messages": [...]} graph state. The graph runs through all its nodes and this method streams the output back to the VideoSDK voice pipeline.

Args

messages
VideoSDK ChatContext with full conversation history.
tools
Ignored — tool execution is handled inside the graph.
conversational_graph
Accepted for API compatibility; not used.
**kwargs
Forwarded to graph.astream().

Yields

LLMResponse text chunks as the graph streams its output.