Module videosdk.plugins.langchain.graph
Classes
class LangGraphLLM (graph: Any,
output_node: str | None = None,
config: dict | None = None,
stream_mode: str | list[str] = 'messages',
subgraphs: bool = False,
context: Any | None = None)-
Expand source code
class LangGraphLLM(LLM): """VideoSDK LLM adapter that wraps a compiled LangGraph StateGraph. The entire graph — its nodes, edges, tool nodes, conditional routing, and internal state — runs as the "LLM" from VideoSDK's perspective. The STT → LLM → TTS pipeline stays unchanged; this adapter bridges VideoSDK's ChatContext into the graph's message state and streams the graph's AI output back as ``LLMResponse`` chunks. Args: graph: A compiled LangGraph graph (``StateGraph.compile()``). Must accept ``{"messages": list[BaseMessage]}`` as input and use ``MessagesState`` (or a compatible schema). output_node: If provided, only text chunks emitted by this node name are forwarded to the voice pipeline. Use this to restrict output to a dedicated synthesis/response node and suppress intermediate researcher/planner node text. Only applies when ``stream_mode="messages"``. Defaults to ``None`` (all AI text chunks are forwarded). config: Optional LangGraph ``RunnableConfig`` dict (e.g. for thread IDs, recursion limits, or custom callbacks). stream_mode: LangGraph streaming mode. ``"messages"`` (default) streams ``AIMessageChunk`` tokens; ``"custom"`` streams arbitrary objects emitted by graph nodes via ``graph.send()``. Pass a list to enable both simultaneously. subgraphs: If ``True``, stream tokens from nested subgraphs as well as the top-level graph. Requires LangGraph ≥ 0.2. context: Optional LangGraph 2.0 context object injected into the graph at runtime. Useful for passing dependencies (database connections, session objects) that should not live in state. Example — basic usage:: from videosdk.plugins.langchain import LangGraphLLM graph = build_my_research_graph() # returns CompiledStateGraph llm = LangGraphLLM(graph=graph, output_node="synthesizer") Example — nested subgraphs:: llm = LangGraphLLM(graph=graph, subgraphs=True) Example — custom streaming (nodes emit strings via graph.send):: llm = LangGraphLLM(graph=graph, stream_mode="custom") """ def __init__( self, graph: Any, output_node: str | None = None, config: dict | None = None, stream_mode: str | list[str] = "messages", subgraphs: bool = False, context: Any | None = None, ) -> None: super().__init__() modes = {stream_mode} if isinstance(stream_mode, str) else set(stream_mode) unsupported = modes - _SUPPORTED_MODES if unsupported: raise ValueError( f"Unsupported stream_mode(s): {unsupported}. " f"Supported: {_SUPPORTED_MODES}" ) self._graph = graph self._output_node = output_node self._config = config or {} self._stream_mode = stream_mode self._subgraphs = subgraphs self._context = context self._cancelled = False def _convert_messages(self, ctx: ChatContext) -> List[BaseMessage]: """Convert a VideoSDK ChatContext into a LangChain message list.""" lc_messages: List[BaseMessage] = [] for item in ctx.items: if item is None: continue if isinstance(item, ChatMessage): content = item.content if isinstance(content, list): content = " ".join( str(c) for c in content if isinstance(c, str) ) if item.role == ChatRole.SYSTEM: lc_messages.append(SystemMessage(content=content)) elif item.role == ChatRole.USER: lc_messages.append(HumanMessage(content=content)) elif item.role == ChatRole.ASSISTANT: lc_messages.append(AIMessage(content=content)) elif isinstance(item, FunctionCall): try: args = ( json.loads(item.arguments) if isinstance(item.arguments, str) else item.arguments ) except json.JSONDecodeError: args = {} lc_messages.append( AIMessage( content="", tool_calls=[ { "id": item.call_id, "name": item.name, "args": args, "type": "tool_call", } ], ) ) elif isinstance(item, FunctionCallOutput): lc_messages.append( ToolMessage( content=item.output, tool_call_id=item.call_id, name=item.name, ) ) return lc_messages def _extract_message_chunk(self, item: Any) -> BaseMessageChunk | str | None: """Normalise a raw item from ``graph.astream(stream_mode='messages')``. LangGraph can yield tokens in several shapes depending on version and whether ``subgraphs=True`` is used: - ``(token, metadata)`` — standard single-graph - ``(namespace, (token, metadata))`` — with subgraphs=True - ``(mode, (token, metadata))`` — multi-mode list variant - ``(namespace, mode, (token, metadata))`` — future-proof extension """ if isinstance(item, (BaseMessageChunk, str)): return item if not isinstance(item, tuple): return None if len(item) == 2 and not isinstance(item[1], tuple): return item[0] if len(item) == 2 and isinstance(item[1], tuple): inner = item[1] if len(inner) == 2: return inner[0] if len(item) == 3 and isinstance(item[2], tuple): inner = item[2] if len(inner) == 2: return inner[0] return None def _chunk_to_response(self, raw: Any) -> LLMResponse | None: """Convert a raw token/payload to an LLMResponse, or None to discard.""" content: str | None = None if isinstance(raw, str): content = raw elif isinstance(raw, BaseMessageChunk): content = raw.content if isinstance(raw.content, str) else None elif isinstance(raw, dict): c = raw.get("content") content = c if isinstance(c, str) else None elif hasattr(raw, "content"): c = raw.content content = c if isinstance(c, str) else None if not content: return None return LLMResponse(content=content, role=ChatRole.ASSISTANT) async def chat( self, messages: ChatContext, tools: list | None = None, conversational_graph: Any | None = None, **kwargs: Any, ) -> AsyncIterator[LLMResponse]: """Run the LangGraph graph and stream its AI text output. The full conversation history from ``messages`` is injected as the initial ``{"messages": [...]}`` graph state. The graph runs through all its nodes and this method streams the output back to the VideoSDK voice pipeline. Args: messages: VideoSDK ChatContext with full conversation history. tools: Ignored — tool execution is handled inside the graph. conversational_graph: Accepted for API compatibility; not used. **kwargs: Forwarded to ``graph.astream()``. Yields: LLMResponse text chunks as the graph streams its output. """ self._cancelled = False lc_messages = self._convert_messages(messages) run_config = self._config or None is_multi_mode = isinstance(self._stream_mode, list) try: astream_kwargs: dict[str, Any] = { "stream_mode": self._stream_mode, } if run_config: astream_kwargs["config"] = run_config if self._subgraphs: astream_kwargs["subgraphs"] = True if self._context is not None: astream_kwargs["context"] = self._context astream_kwargs.update(kwargs) try: aiter = self._graph.astream({"messages": lc_messages}, **astream_kwargs) except TypeError: safe_kwargs: dict[str, Any] = {"stream_mode": self._stream_mode} if run_config: safe_kwargs["config"] = run_config safe_kwargs.update(kwargs) aiter = self._graph.astream({"messages": lc_messages}, **safe_kwargs) async for item in aiter: if self._cancelled: return if is_multi_mode and isinstance(item, tuple) and len(item) == 2: mode, data = item if not isinstance(mode, str): continue if mode == "custom": resp = self._chunk_to_response(data) if resp: yield resp elif mode == "messages": token = self._extract_message_chunk(data) if token is None: continue if not isinstance(token, AIMessageChunk) or not token.content: continue if self._output_node is not None: meta = data[1] if isinstance(data, tuple) and len(data) == 2 else {} node = meta.get("langgraph_node", "") if isinstance(meta, dict) else "" if node != self._output_node: continue resp = self._chunk_to_response(token) if resp: yield resp continue if self._stream_mode == "custom": resp = self._chunk_to_response(item) if resp: yield resp elif self._stream_mode == "messages": token = self._extract_message_chunk(item) if token is None: continue if not isinstance(token, AIMessageChunk) or not token.content: continue if self._output_node is not None: meta = item[1] if isinstance(item, tuple) and len(item) == 2 else {} node = meta.get("langgraph_node", "") if isinstance(meta, dict) else "" if node != self._output_node: continue resp = self._chunk_to_response(token) if resp: yield resp except Exception as exc: if not self._cancelled: self.emit("error", exc) raise async def cancel_current_generation(self) -> None: self._cancelled = True async def aclose(self) -> None: await super().aclose()VideoSDK LLM adapter that wraps a compiled LangGraph StateGraph.
The entire graph — its nodes, edges, tool nodes, conditional routing, and internal state — runs as the "LLM" from VideoSDK's perspective. The STT → LLM → TTS pipeline stays unchanged; this adapter bridges VideoSDK's ChatContext into the graph's message state and streams the graph's AI output back as
LLMResponsechunks.Args
graph- A compiled LangGraph graph (
StateGraph.compile()). Must accept{"messages": list[BaseMessage]}as input and useMessagesState(or a compatible schema). output_node- If provided, only text chunks emitted by this node
name are forwarded to the voice pipeline.
Use this to
restrict output to a dedicated synthesis/response node and
suppress intermediate researcher/planner node text.
Only applies when
stream_mode="messages". Defaults toNone(all AI text chunks are forwarded). config- Optional LangGraph
RunnableConfigdict (e.g. for thread IDs, recursion limits, or custom callbacks). stream_mode- LangGraph streaming mode.
"messages"(default) streamsAIMessageChunktokens;"custom"streams arbitrary objects emitted by graph nodes viagraph.send(). Pass a list to enable both simultaneously. subgraphs- If
True, stream tokens from nested subgraphs as well as the top-level graph. Requires LangGraph ≥ 0.2. context- Optional LangGraph 2.0 context object injected into the graph at runtime. Useful for passing dependencies (database connections, session objects) that should not live in state.
Example — basic usage::
from videosdk.plugins.langchain import LangGraphLLM graph = build_my_research_graph() # returns CompiledStateGraph llm = LangGraphLLM(graph=graph, output_node="synthesizer")Example — nested subgraphs::
llm = LangGraphLLM(graph=graph, subgraphs=True)Example — custom streaming (nodes emit strings via graph.send)::
llm = LangGraphLLM(graph=graph, stream_mode="custom")Initialize the LLM base class.
Ancestors
- videosdk.agents.llm.llm.LLM
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: await super().aclose()Cleanup resources.
async def cancel_current_generation(self) ‑> None-
Expand source code
async def cancel_current_generation(self) -> None: self._cancelled = TrueCancel the current LLM generation if active.
Raises
NotImplementedError- This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list | None = None,
conversational_graph: Any | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]-
Expand source code
async def chat( self, messages: ChatContext, tools: list | None = None, conversational_graph: Any | None = None, **kwargs: Any, ) -> AsyncIterator[LLMResponse]: """Run the LangGraph graph and stream its AI text output. The full conversation history from ``messages`` is injected as the initial ``{"messages": [...]}`` graph state. The graph runs through all its nodes and this method streams the output back to the VideoSDK voice pipeline. Args: messages: VideoSDK ChatContext with full conversation history. tools: Ignored — tool execution is handled inside the graph. conversational_graph: Accepted for API compatibility; not used. **kwargs: Forwarded to ``graph.astream()``. Yields: LLMResponse text chunks as the graph streams its output. """ self._cancelled = False lc_messages = self._convert_messages(messages) run_config = self._config or None is_multi_mode = isinstance(self._stream_mode, list) try: astream_kwargs: dict[str, Any] = { "stream_mode": self._stream_mode, } if run_config: astream_kwargs["config"] = run_config if self._subgraphs: astream_kwargs["subgraphs"] = True if self._context is not None: astream_kwargs["context"] = self._context astream_kwargs.update(kwargs) try: aiter = self._graph.astream({"messages": lc_messages}, **astream_kwargs) except TypeError: safe_kwargs: dict[str, Any] = {"stream_mode": self._stream_mode} if run_config: safe_kwargs["config"] = run_config safe_kwargs.update(kwargs) aiter = self._graph.astream({"messages": lc_messages}, **safe_kwargs) async for item in aiter: if self._cancelled: return if is_multi_mode and isinstance(item, tuple) and len(item) == 2: mode, data = item if not isinstance(mode, str): continue if mode == "custom": resp = self._chunk_to_response(data) if resp: yield resp elif mode == "messages": token = self._extract_message_chunk(data) if token is None: continue if not isinstance(token, AIMessageChunk) or not token.content: continue if self._output_node is not None: meta = data[1] if isinstance(data, tuple) and len(data) == 2 else {} node = meta.get("langgraph_node", "") if isinstance(meta, dict) else "" if node != self._output_node: continue resp = self._chunk_to_response(token) if resp: yield resp continue if self._stream_mode == "custom": resp = self._chunk_to_response(item) if resp: yield resp elif self._stream_mode == "messages": token = self._extract_message_chunk(item) if token is None: continue if not isinstance(token, AIMessageChunk) or not token.content: continue if self._output_node is not None: meta = item[1] if isinstance(item, tuple) and len(item) == 2 else {} node = meta.get("langgraph_node", "") if isinstance(meta, dict) else "" if node != self._output_node: continue resp = self._chunk_to_response(token) if resp: yield resp except Exception as exc: if not self._cancelled: self.emit("error", exc) raiseRun the LangGraph graph and stream its AI text output.
The full conversation history from
messagesis injected as the initial{"messages": [...]}graph state. The graph runs through all its nodes and this method streams the output back to the VideoSDK voice pipeline.Args
messages- VideoSDK ChatContext with full conversation history.
tools- Ignored — tool execution is handled inside the graph.
conversational_graph- Accepted for API compatibility; not used.
**kwargs- Forwarded to
graph.astream().
Yields
LLMResponse text chunks as the graph streams its output.