Package videosdk.plugins.langchain
Sub-modules
videosdk.plugins.langchain.graphvideosdk.plugins.langchain.llm
Classes
class LangChainLLM (llm: BaseChatModel, tools: list | None = None, max_tool_iterations: int = 10)-
Expand source code
class LangChainLLM(LLM): """VideoSDK LLM adapter for any LangChain ``BaseChatModel``. **Two usage modes:** Mode A — VideoSDK tools (``@function_tool`` methods on the Agent class): Pass no tools at init. VideoSDK's ContentGeneration automatically passes ``@function_tool`` methods to ``chat(tools=...)``. This adapter converts them to LangChain stubs for schema binding, then emits tool call metadata back to VideoSDK which handles dispatch and re-calls ``chat()`` with the result — exactly like ``OpenAILLM`` and ``GoogleLLM``. Mode B — LangChain-native tools (Tavily, Wikipedia, custom ``@tool`` functions): Pass LangChain tools at init via ``tools=[...]``. The full tool-calling loop (call model → execute tool → feed result → repeat) runs entirely inside this adapter. The voice pipeline only sees the final text stream. Both modes can also be used together on the same instance. Args: llm: Any LangChain ``BaseChatModel`` instance. tools: Optional list of LangChain tools executed internally (Mode B). max_tool_iterations: Safety cap on consecutive internal tool-call rounds. Example — VideoSDK tools (Mode A):: from videosdk.plugins.langchain import LangChainLLM from langchain_openai import ChatOpenAI llm = LangChainLLM(llm=ChatOpenAI(model="gpt-4o-mini")) # Tools come from @function_tool methods on the Agent subclass — # no extra configuration needed here. Example — LangChain tools (Mode B):: from videosdk.plugins.langchain import LangChainLLM from langchain_openai import ChatOpenAI from langchain_community.tools.tavily_search import TavilySearchResults llm = LangChainLLM( llm=ChatOpenAI(model="gpt-4o-mini"), tools=[TavilySearchResults(max_results=3)], ) """ def __init__( self, llm: BaseChatModel, tools: list | None = None, max_tool_iterations: int = 10, ) -> None: super().__init__() self._base_llm = llm self._langchain_tools: list = tools or [] self._max_tool_iterations = max_tool_iterations self._cancelled = False self._lc_tool_map: dict[str, Any] = {t.name: t for t in self._langchain_tools} self._llm_with_lc_tools = ( self._base_llm.bind_tools(self._langchain_tools) if self._langchain_tools else self._base_llm ) def _convert_messages(self, ctx: ChatContext) -> List[BaseMessage]: """Convert a VideoSDK ChatContext into a LangChain message list.""" lc_messages: List[BaseMessage] = [] for item in ctx.items: if item is None: continue if isinstance(item, ChatMessage): content = item.content if isinstance(content, list): content = " ".join(str(c) for c in content if isinstance(c, str)) if item.role == ChatRole.SYSTEM: lc_messages.append(SystemMessage(content=content)) elif item.role == ChatRole.USER: lc_messages.append(HumanMessage(content=content)) elif item.role == ChatRole.ASSISTANT: lc_messages.append(AIMessage(content=content)) elif isinstance(item, FunctionCall): try: args = ( json.loads(item.arguments) if isinstance(item.arguments, str) else item.arguments ) except json.JSONDecodeError: args = {} lc_messages.append( AIMessage( content="", tool_calls=[{"id": item.call_id, "name": item.name, "args": args, "type": "tool_call"}], ) ) elif isinstance(item, FunctionCallOutput): lc_messages.append( ToolMessage(content=item.output, tool_call_id=item.call_id, name=item.name) ) return lc_messages async def _stream_round( self, llm: BaseChatModel, lc_messages: List[BaseMessage], **kwargs: Any ): """Stream one model call. Yields (content_chunk, tool_call_chunks) per chunk.""" async for chunk in llm.astream(lc_messages, **kwargs): if self._cancelled: return if not isinstance(chunk, AIMessageChunk): continue yield chunk async def _chat_videosdk_tools( self, lc_messages: List[BaseMessage], videosdk_tool_names: set[str], llm: BaseChatModel, **kwargs: Any, ) -> AsyncIterator[LLMResponse]: """Single-shot stream: text chunks yielded, tool calls emitted as metadata.""" pending_tool_calls: dict[int, dict] = {} async for chunk in self._stream_round(llm, lc_messages, **kwargs): if chunk.content: yield LLMResponse(content=chunk.content, role=ChatRole.ASSISTANT) if chunk.tool_call_chunks: for tc in chunk.tool_call_chunks: idx = tc.get("index", 0) if idx not in pending_tool_calls: pending_tool_calls[idx] = { "id": tc.get("id") or "", "name": tc.get("name") or "", "args": tc.get("args") or "", } else: existing = pending_tool_calls[idx] if tc.get("name"): existing["name"] += tc["name"] if tc.get("args"): existing["args"] += tc["args"] if tc.get("id") and not existing["id"]: existing["id"] = tc["id"] for tc_data in pending_tool_calls.values(): try: args = json.loads(tc_data["args"]) if tc_data["args"] else {} except json.JSONDecodeError: args = {} yield LLMResponse( content="", role=ChatRole.ASSISTANT, metadata={"function_call": {"name": tc_data["name"], "arguments": args, "id": tc_data["id"]}}, ) async def _chat_lc_tools_loop( self, lc_messages: List[BaseMessage], llm: BaseChatModel, **kwargs: Any, ) -> AsyncIterator[LLMResponse]: """Full internal tool-calling loop until a text-only response is produced.""" for iteration in range(self._max_tool_iterations): if self._cancelled: return full_content = "" pending_tool_calls: dict[int, dict] = {} async for chunk in self._stream_round(llm, lc_messages, **kwargs): if chunk.content: full_content += chunk.content yield LLMResponse(content=chunk.content, role=ChatRole.ASSISTANT) if chunk.tool_call_chunks: for tc in chunk.tool_call_chunks: idx = tc.get("index", 0) if idx not in pending_tool_calls: pending_tool_calls[idx] = { "id": tc.get("id") or "", "name": tc.get("name") or "", "args": tc.get("args") or "", } else: existing = pending_tool_calls[idx] if tc.get("name"): existing["name"] += tc["name"] if tc.get("args"): existing["args"] += tc["args"] if tc.get("id") and not existing["id"]: existing["id"] = tc["id"] if not pending_tool_calls: break parsed: list[dict] = [] for tc_data in pending_tool_calls.values(): try: args = json.loads(tc_data["args"]) if tc_data["args"] else {} except json.JSONDecodeError: args = {} parsed.append({"id": tc_data["id"], "name": tc_data["name"], "args": args, "type": "tool_call"}) lc_messages.append(AIMessage(content=full_content, tool_calls=parsed)) for tc in parsed: if self._cancelled: return tool = self._lc_tool_map.get(tc["name"]) if tool is None: result = f"Tool '{tc['name']}' not found." logger.warning(result) else: try: result = await tool.ainvoke(tc["args"]) except Exception as exc: result = f"Tool execution error: {exc}" logger.error("Error executing tool %s: %s", tc["name"], exc) lc_messages.append( ToolMessage(content=str(result), tool_call_id=tc["id"], name=tc["name"]) ) logger.debug("LangChainLLM tool iteration %d complete", iteration + 1) async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, conversational_graph: Any | None = None, **kwargs: Any, ) -> AsyncIterator[LLMResponse]: """Stream a response from the LangChain model. - If VideoSDK ``@function_tool`` tools are passed (via ``tools``): the model receives their schemas, and any tool calls are emitted as ``LLMResponse`` metadata for VideoSDK ContentGeneration to dispatch (Mode A). - If only LangChain-native tools were given at init (``tools`` is empty): the internal tool-calling loop runs until a text answer is produced (Mode B). Args: messages: VideoSDK ChatContext with full conversation history. tools: VideoSDK FunctionTools from the Agent's ``@function_tool`` methods. conversational_graph: Accepted for API compatibility; not used by LangChain. **kwargs: Forwarded to the underlying model's ``astream`` call. Yields: LLMResponse chunks — text content and/or tool call metadata. """ self._cancelled = False lc_messages = self._convert_messages(messages) try: videosdk_tools = [vt for vt in (tools or []) if is_function_tool(vt)] if videosdk_tools: videosdk_tool_names = set() stub_tools: list[StructuredTool] = [] for vt in videosdk_tools: info = get_tool_info(vt) schema = build_openai_schema(vt) videosdk_tool_names.add(info.name) stub_tools.append( _make_stub_tool( name=info.name, description=info.description or info.name, parameters=schema.get("parameters", {}), ) ) active_llm = self._base_llm.bind_tools(stub_tools + self._langchain_tools) async for chunk in self._chat_videosdk_tools( lc_messages, videosdk_tool_names, active_llm, **kwargs ): yield chunk return async for chunk in self._chat_lc_tools_loop( lc_messages, self._llm_with_lc_tools, **kwargs ): yield chunk except Exception as exc: if not self._cancelled: self.emit("error", exc) raise async def cancel_current_generation(self) -> None: self._cancelled = True async def aclose(self) -> None: await super().aclose()VideoSDK LLM adapter for any LangChain
BaseChatModel.Two usage modes:
Mode A — VideoSDK tools (
@function_toolmethods on the Agent class): Pass no tools at init. VideoSDK's ContentGeneration automatically passes@function_toolmethods tochat(tools=...). This adapter converts them to LangChain stubs for schema binding, then emits tool call metadata back to VideoSDK which handles dispatch and re-callschat()with the result — exactly likeOpenAILLMandGoogleLLM.Mode B — LangChain-native tools (Tavily, Wikipedia, custom
@toolfunctions): Pass LangChain tools at init viatools=[...]. The full tool-calling loop (call model → execute tool → feed result → repeat) runs entirely inside this adapter. The voice pipeline only sees the final text stream.Both modes can also be used together on the same instance.
Args
llm- Any LangChain
BaseChatModelinstance. tools- Optional list of LangChain tools executed internally (Mode B).
max_tool_iterations- Safety cap on consecutive internal tool-call rounds.
Example — VideoSDK tools (Mode A)::
from videosdk.plugins.langchain import LangChainLLM from langchain_openai import ChatOpenAI llm = LangChainLLM(llm=ChatOpenAI(model="gpt-4o-mini")) # Tools come from @function_tool methods on the Agent subclass — # no extra configuration needed here.Example — LangChain tools (Mode B)::
from videosdk.plugins.langchain import LangChainLLM from langchain_openai import ChatOpenAI from langchain_community.tools.tavily_search import TavilySearchResults llm = LangChainLLM( llm=ChatOpenAI(model="gpt-4o-mini"), tools=[TavilySearchResults(max_results=3)], )Initialize the LLM base class.
Ancestors
- videosdk.agents.llm.llm.LLM
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: await super().aclose()Cleanup resources.
async def cancel_current_generation(self) ‑> None-
Expand source code
async def cancel_current_generation(self) -> None: self._cancelled = TrueCancel the current LLM generation if active.
Raises
NotImplementedError- This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
conversational_graph: Any | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]-
Expand source code
async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, conversational_graph: Any | None = None, **kwargs: Any, ) -> AsyncIterator[LLMResponse]: """Stream a response from the LangChain model. - If VideoSDK ``@function_tool`` tools are passed (via ``tools``): the model receives their schemas, and any tool calls are emitted as ``LLMResponse`` metadata for VideoSDK ContentGeneration to dispatch (Mode A). - If only LangChain-native tools were given at init (``tools`` is empty): the internal tool-calling loop runs until a text answer is produced (Mode B). Args: messages: VideoSDK ChatContext with full conversation history. tools: VideoSDK FunctionTools from the Agent's ``@function_tool`` methods. conversational_graph: Accepted for API compatibility; not used by LangChain. **kwargs: Forwarded to the underlying model's ``astream`` call. Yields: LLMResponse chunks — text content and/or tool call metadata. """ self._cancelled = False lc_messages = self._convert_messages(messages) try: videosdk_tools = [vt for vt in (tools or []) if is_function_tool(vt)] if videosdk_tools: videosdk_tool_names = set() stub_tools: list[StructuredTool] = [] for vt in videosdk_tools: info = get_tool_info(vt) schema = build_openai_schema(vt) videosdk_tool_names.add(info.name) stub_tools.append( _make_stub_tool( name=info.name, description=info.description or info.name, parameters=schema.get("parameters", {}), ) ) active_llm = self._base_llm.bind_tools(stub_tools + self._langchain_tools) async for chunk in self._chat_videosdk_tools( lc_messages, videosdk_tool_names, active_llm, **kwargs ): yield chunk return async for chunk in self._chat_lc_tools_loop( lc_messages, self._llm_with_lc_tools, **kwargs ): yield chunk except Exception as exc: if not self._cancelled: self.emit("error", exc) raiseStream a response from the LangChain model.
- If VideoSDK
@function_tooltools are passed (viatools): the model receives their schemas, and any tool calls are emitted asLLMResponsemetadata for VideoSDK ContentGeneration to dispatch (Mode A). - If only LangChain-native tools were given at init (
toolsis empty): the internal tool-calling loop runs until a text answer is produced (Mode B).
Args
messages- VideoSDK ChatContext with full conversation history.
tools- VideoSDK FunctionTools from the Agent's
@function_toolmethods. conversational_graph- Accepted for API compatibility; not used by LangChain.
**kwargs- Forwarded to the underlying model's
astreamcall.
Yields
LLMResponse chunks — text content and/or tool call metadata.
- If VideoSDK
class LangGraphLLM (graph: Any,
output_node: str | None = None,
config: dict | None = None,
stream_mode: str | list[str] = 'messages',
subgraphs: bool = False,
context: Any | None = None)-
Expand source code
class LangGraphLLM(LLM): """VideoSDK LLM adapter that wraps a compiled LangGraph StateGraph. The entire graph — its nodes, edges, tool nodes, conditional routing, and internal state — runs as the "LLM" from VideoSDK's perspective. The STT → LLM → TTS pipeline stays unchanged; this adapter bridges VideoSDK's ChatContext into the graph's message state and streams the graph's AI output back as ``LLMResponse`` chunks. Args: graph: A compiled LangGraph graph (``StateGraph.compile()``). Must accept ``{"messages": list[BaseMessage]}`` as input and use ``MessagesState`` (or a compatible schema). output_node: If provided, only text chunks emitted by this node name are forwarded to the voice pipeline. Use this to restrict output to a dedicated synthesis/response node and suppress intermediate researcher/planner node text. Only applies when ``stream_mode="messages"``. Defaults to ``None`` (all AI text chunks are forwarded). config: Optional LangGraph ``RunnableConfig`` dict (e.g. for thread IDs, recursion limits, or custom callbacks). stream_mode: LangGraph streaming mode. ``"messages"`` (default) streams ``AIMessageChunk`` tokens; ``"custom"`` streams arbitrary objects emitted by graph nodes via ``graph.send()``. Pass a list to enable both simultaneously. subgraphs: If ``True``, stream tokens from nested subgraphs as well as the top-level graph. Requires LangGraph ≥ 0.2. context: Optional LangGraph 2.0 context object injected into the graph at runtime. Useful for passing dependencies (database connections, session objects) that should not live in state. Example — basic usage:: from videosdk.plugins.langchain import LangGraphLLM graph = build_my_research_graph() # returns CompiledStateGraph llm = LangGraphLLM(graph=graph, output_node="synthesizer") Example — nested subgraphs:: llm = LangGraphLLM(graph=graph, subgraphs=True) Example — custom streaming (nodes emit strings via graph.send):: llm = LangGraphLLM(graph=graph, stream_mode="custom") """ def __init__( self, graph: Any, output_node: str | None = None, config: dict | None = None, stream_mode: str | list[str] = "messages", subgraphs: bool = False, context: Any | None = None, ) -> None: super().__init__() modes = {stream_mode} if isinstance(stream_mode, str) else set(stream_mode) unsupported = modes - _SUPPORTED_MODES if unsupported: raise ValueError( f"Unsupported stream_mode(s): {unsupported}. " f"Supported: {_SUPPORTED_MODES}" ) self._graph = graph self._output_node = output_node self._config = config or {} self._stream_mode = stream_mode self._subgraphs = subgraphs self._context = context self._cancelled = False def _convert_messages(self, ctx: ChatContext) -> List[BaseMessage]: """Convert a VideoSDK ChatContext into a LangChain message list.""" lc_messages: List[BaseMessage] = [] for item in ctx.items: if item is None: continue if isinstance(item, ChatMessage): content = item.content if isinstance(content, list): content = " ".join( str(c) for c in content if isinstance(c, str) ) if item.role == ChatRole.SYSTEM: lc_messages.append(SystemMessage(content=content)) elif item.role == ChatRole.USER: lc_messages.append(HumanMessage(content=content)) elif item.role == ChatRole.ASSISTANT: lc_messages.append(AIMessage(content=content)) elif isinstance(item, FunctionCall): try: args = ( json.loads(item.arguments) if isinstance(item.arguments, str) else item.arguments ) except json.JSONDecodeError: args = {} lc_messages.append( AIMessage( content="", tool_calls=[ { "id": item.call_id, "name": item.name, "args": args, "type": "tool_call", } ], ) ) elif isinstance(item, FunctionCallOutput): lc_messages.append( ToolMessage( content=item.output, tool_call_id=item.call_id, name=item.name, ) ) return lc_messages def _extract_message_chunk(self, item: Any) -> BaseMessageChunk | str | None: """Normalise a raw item from ``graph.astream(stream_mode='messages')``. LangGraph can yield tokens in several shapes depending on version and whether ``subgraphs=True`` is used: - ``(token, metadata)`` — standard single-graph - ``(namespace, (token, metadata))`` — with subgraphs=True - ``(mode, (token, metadata))`` — multi-mode list variant - ``(namespace, mode, (token, metadata))`` — future-proof extension """ if isinstance(item, (BaseMessageChunk, str)): return item if not isinstance(item, tuple): return None if len(item) == 2 and not isinstance(item[1], tuple): return item[0] if len(item) == 2 and isinstance(item[1], tuple): inner = item[1] if len(inner) == 2: return inner[0] if len(item) == 3 and isinstance(item[2], tuple): inner = item[2] if len(inner) == 2: return inner[0] return None def _chunk_to_response(self, raw: Any) -> LLMResponse | None: """Convert a raw token/payload to an LLMResponse, or None to discard.""" content: str | None = None if isinstance(raw, str): content = raw elif isinstance(raw, BaseMessageChunk): content = raw.content if isinstance(raw.content, str) else None elif isinstance(raw, dict): c = raw.get("content") content = c if isinstance(c, str) else None elif hasattr(raw, "content"): c = raw.content content = c if isinstance(c, str) else None if not content: return None return LLMResponse(content=content, role=ChatRole.ASSISTANT) async def chat( self, messages: ChatContext, tools: list | None = None, conversational_graph: Any | None = None, **kwargs: Any, ) -> AsyncIterator[LLMResponse]: """Run the LangGraph graph and stream its AI text output. The full conversation history from ``messages`` is injected as the initial ``{"messages": [...]}`` graph state. The graph runs through all its nodes and this method streams the output back to the VideoSDK voice pipeline. Args: messages: VideoSDK ChatContext with full conversation history. tools: Ignored — tool execution is handled inside the graph. conversational_graph: Accepted for API compatibility; not used. **kwargs: Forwarded to ``graph.astream()``. Yields: LLMResponse text chunks as the graph streams its output. """ self._cancelled = False lc_messages = self._convert_messages(messages) run_config = self._config or None is_multi_mode = isinstance(self._stream_mode, list) try: astream_kwargs: dict[str, Any] = { "stream_mode": self._stream_mode, } if run_config: astream_kwargs["config"] = run_config if self._subgraphs: astream_kwargs["subgraphs"] = True if self._context is not None: astream_kwargs["context"] = self._context astream_kwargs.update(kwargs) try: aiter = self._graph.astream({"messages": lc_messages}, **astream_kwargs) except TypeError: safe_kwargs: dict[str, Any] = {"stream_mode": self._stream_mode} if run_config: safe_kwargs["config"] = run_config safe_kwargs.update(kwargs) aiter = self._graph.astream({"messages": lc_messages}, **safe_kwargs) async for item in aiter: if self._cancelled: return if is_multi_mode and isinstance(item, tuple) and len(item) == 2: mode, data = item if not isinstance(mode, str): continue if mode == "custom": resp = self._chunk_to_response(data) if resp: yield resp elif mode == "messages": token = self._extract_message_chunk(data) if token is None: continue if not isinstance(token, AIMessageChunk) or not token.content: continue if self._output_node is not None: meta = data[1] if isinstance(data, tuple) and len(data) == 2 else {} node = meta.get("langgraph_node", "") if isinstance(meta, dict) else "" if node != self._output_node: continue resp = self._chunk_to_response(token) if resp: yield resp continue if self._stream_mode == "custom": resp = self._chunk_to_response(item) if resp: yield resp elif self._stream_mode == "messages": token = self._extract_message_chunk(item) if token is None: continue if not isinstance(token, AIMessageChunk) or not token.content: continue if self._output_node is not None: meta = item[1] if isinstance(item, tuple) and len(item) == 2 else {} node = meta.get("langgraph_node", "") if isinstance(meta, dict) else "" if node != self._output_node: continue resp = self._chunk_to_response(token) if resp: yield resp except Exception as exc: if not self._cancelled: self.emit("error", exc) raise async def cancel_current_generation(self) -> None: self._cancelled = True async def aclose(self) -> None: await super().aclose()VideoSDK LLM adapter that wraps a compiled LangGraph StateGraph.
The entire graph — its nodes, edges, tool nodes, conditional routing, and internal state — runs as the "LLM" from VideoSDK's perspective. The STT → LLM → TTS pipeline stays unchanged; this adapter bridges VideoSDK's ChatContext into the graph's message state and streams the graph's AI output back as
LLMResponsechunks.Args
graph- A compiled LangGraph graph (
StateGraph.compile()). Must accept{"messages": list[BaseMessage]}as input and useMessagesState(or a compatible schema). output_node- If provided, only text chunks emitted by this node
name are forwarded to the voice pipeline.
Use this to
restrict output to a dedicated synthesis/response node and
suppress intermediate researcher/planner node text.
Only applies when
stream_mode="messages". Defaults toNone(all AI text chunks are forwarded). config- Optional LangGraph
RunnableConfigdict (e.g. for thread IDs, recursion limits, or custom callbacks). stream_mode- LangGraph streaming mode.
"messages"(default) streamsAIMessageChunktokens;"custom"streams arbitrary objects emitted by graph nodes viagraph.send(). Pass a list to enable both simultaneously. subgraphs- If
True, stream tokens from nested subgraphs as well as the top-level graph. Requires LangGraph ≥ 0.2. context- Optional LangGraph 2.0 context object injected into the graph at runtime. Useful for passing dependencies (database connections, session objects) that should not live in state.
Example — basic usage::
from videosdk.plugins.langchain import LangGraphLLM graph = build_my_research_graph() # returns CompiledStateGraph llm = LangGraphLLM(graph=graph, output_node="synthesizer")Example — nested subgraphs::
llm = LangGraphLLM(graph=graph, subgraphs=True)Example — custom streaming (nodes emit strings via graph.send)::
llm = LangGraphLLM(graph=graph, stream_mode="custom")Initialize the LLM base class.
Ancestors
- videosdk.agents.llm.llm.LLM
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: await super().aclose()Cleanup resources.
async def cancel_current_generation(self) ‑> None-
Expand source code
async def cancel_current_generation(self) -> None: self._cancelled = TrueCancel the current LLM generation if active.
Raises
NotImplementedError- This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list | None = None,
conversational_graph: Any | None = None,
**kwargs: Any) ‑> AsyncIterator[videosdk.agents.llm.llm.LLMResponse]-
Expand source code
async def chat( self, messages: ChatContext, tools: list | None = None, conversational_graph: Any | None = None, **kwargs: Any, ) -> AsyncIterator[LLMResponse]: """Run the LangGraph graph and stream its AI text output. The full conversation history from ``messages`` is injected as the initial ``{"messages": [...]}`` graph state. The graph runs through all its nodes and this method streams the output back to the VideoSDK voice pipeline. Args: messages: VideoSDK ChatContext with full conversation history. tools: Ignored — tool execution is handled inside the graph. conversational_graph: Accepted for API compatibility; not used. **kwargs: Forwarded to ``graph.astream()``. Yields: LLMResponse text chunks as the graph streams its output. """ self._cancelled = False lc_messages = self._convert_messages(messages) run_config = self._config or None is_multi_mode = isinstance(self._stream_mode, list) try: astream_kwargs: dict[str, Any] = { "stream_mode": self._stream_mode, } if run_config: astream_kwargs["config"] = run_config if self._subgraphs: astream_kwargs["subgraphs"] = True if self._context is not None: astream_kwargs["context"] = self._context astream_kwargs.update(kwargs) try: aiter = self._graph.astream({"messages": lc_messages}, **astream_kwargs) except TypeError: safe_kwargs: dict[str, Any] = {"stream_mode": self._stream_mode} if run_config: safe_kwargs["config"] = run_config safe_kwargs.update(kwargs) aiter = self._graph.astream({"messages": lc_messages}, **safe_kwargs) async for item in aiter: if self._cancelled: return if is_multi_mode and isinstance(item, tuple) and len(item) == 2: mode, data = item if not isinstance(mode, str): continue if mode == "custom": resp = self._chunk_to_response(data) if resp: yield resp elif mode == "messages": token = self._extract_message_chunk(data) if token is None: continue if not isinstance(token, AIMessageChunk) or not token.content: continue if self._output_node is not None: meta = data[1] if isinstance(data, tuple) and len(data) == 2 else {} node = meta.get("langgraph_node", "") if isinstance(meta, dict) else "" if node != self._output_node: continue resp = self._chunk_to_response(token) if resp: yield resp continue if self._stream_mode == "custom": resp = self._chunk_to_response(item) if resp: yield resp elif self._stream_mode == "messages": token = self._extract_message_chunk(item) if token is None: continue if not isinstance(token, AIMessageChunk) or not token.content: continue if self._output_node is not None: meta = item[1] if isinstance(item, tuple) and len(item) == 2 else {} node = meta.get("langgraph_node", "") if isinstance(meta, dict) else "" if node != self._output_node: continue resp = self._chunk_to_response(token) if resp: yield resp except Exception as exc: if not self._cancelled: self.emit("error", exc) raiseRun the LangGraph graph and stream its AI text output.
The full conversation history from
messagesis injected as the initial{"messages": [...]}graph state. The graph runs through all its nodes and this method streams the output back to the VideoSDK voice pipeline.Args
messages- VideoSDK ChatContext with full conversation history.
tools- Ignored — tool execution is handled inside the graph.
conversational_graph- Accepted for API compatibility; not used.
**kwargs- Forwarded to
graph.astream().
Yields
LLMResponse text chunks as the graph streams its output.