Package conversational_graph
Public interface layer for the VideoSDK Conversational Graph engine.
Import everything you need from a single entry point:
from videosdk.conversational_graph import (
ConversationalGraph, GraphConfig, GraphState,
Route, END, START, Interrupt, HumanInLoop,
Context, State, Extractor, CollectResult,
GraphHook, LatencyProfilerHook,
BaseCheckpointer, CheckpointHook,
InMemorySaver, FileSaver, MongoDBSaver, VideoSDKCheckpointer,
GraphMoment,
)
Classes
class ConversationalGraph (data_model: Type[BaseModel] | None = None,
config: GraphConfig | None = None)-
Deterministic conversational graph engine. Orchestrates a state machine of node functions, handling transitions, state extraction, interrupts, human-in-the-loop pauses, and checkpointing.
Parameters
data_model:Type[BaseModel] | None- Pydantic model defining the state schema (subclass of
GraphState). config:GraphConfig | None- Graph configuration. Uses defaults when
None.
Example
from videosdk.conversational_graph import ( ConversationalGraph, GraphConfig, GraphState, Route, END, START, InMemorySaver, ) from pydantic import Field class MyState(GraphState): name: str = Field(None, description="User's name") age: int = Field(None, description="User's age") async def welcome(state, ctx): return "Hello! What is your name?" async def collect_age(state, ctx): result = await ctx.extractor.collect(fields=["age"], prompt="How old are you?") if result.success: return Route("done") async def done(state, ctx): await ctx.say(f"Thanks {state.name}, you are {state.age} years old!") return END graph = ( ConversationalGraph(data_model=MyState, config=GraphConfig(name="demo")) .add_start_node("welcome", welcome) .add_node("collect_age", collect_age) .add_end_node("done", done) .add_transition(START, "welcome", "collect_age", "done") )Graph Definition Methods
All definition methods return
selffor chaining.def add_node(self, name: str, func: Callable) ‑> ConversationalGraphRegister a node function.
funcsignature:(state, ctx) -> ...def add_start_node(self, name: str, func: Callable) ‑> ConversationalGraphRegister a node and mark it as the graph's entry point.
def add_end_node(self, name: str, func: Callable) ‑> ConversationalGraphRegister a terminal node (no outgoing edges).
def add_transition(self, *nodes) ‑> ConversationalGraphdef add_conditional_transition(self, from_node: str, mapping: dict[str, str], decide: Callable) ‑> ConversationalGraphAdd a conditional (branching) edge.
decidereceives(state)and returns a key frommapping.def add_hook(self, hook: GraphHook) ‑> ConversationalGraphRegister a lifecycle extension hook.
Compilation & Runtime
async def compile(self, user_config: dict | None = None, session_id: str | None = None) ‑> ConversationalGraphInitialise all runtime components. Must be called before
handle_input. Idempotent — safe to call multiple times.def set_callbacks(self, say: Callable | None = None, ask: Callable | None = None, hangup: Callable | None = None) ‑> NoneRegister callbacks that bridge the graph to the hosting environment (agent pipeline, tests, etc.). Set automatically when using
GraphPipelineAdapter.async def handle_input(self, user_text: str) ‑> strProcess a user message. Returns an enriched prompt for the LLM.
async def handle_decision(self, graph_response: Any) ‑> str | NoneProcess the LLM's structured response and advance the state machine.
Pause / Resume (Human-in-the-Loop)
async def resume_with_human_input(self, payload: dict | None = None) ‑> strResume a paused graph with external human input.
async def resume(self, thread_id: str | None = None, update: dict[str, Any] | None = None) ‑> ConversationalGraphRestore the graph to the latest checkpoint for a thread.
Checkpoint / Replay
async def get_moments(self) ‑> list[GraphMoment]Get all checkpoint moments for the current thread.
async def get_state(self, thread_id: str | None = None, moment_id: str | None = None, *, before: str | None = None, after: str | None = None) ‑> GraphMoment | NoneReturn a state moment for a thread. With no filters, returns the latest.
async def update_state(self, updates: dict[str, Any], thread_id: str | None = None, as_node: str | None = None) ‑> GraphMoment | NoneApply updates on top of the latest checkpoint and save a new moment.
async def get_state_history(self, thread_id: str | None = None) ‑> list[GraphMoment]Return all checkpoint moments for a thread, oldest first.
async def replay(self, moment_id: str | None = None, thread_id: str | None = None, update: dict[str, Any] | None = None) ‑> ConversationalGraphRestore the graph to a prior checkpoint (time-travel).
Inspection
def get_nodes(self) ‑> list[str]Return all registered node names.
def get_transitions(self) ‑> dict[str, list[str]]Return the edge adjacency map.
def visualize(self) ‑> NonePrint the full graph routing to stdout. Parses
Route()targets from node source code to resolve dynamic branches.def get_graph_status(self) ‑> strReturn a detailed string of the current graph status.
Properties
var current_node : str | NoneName of the currently executing node.
var is_ended : boolWhether the graph has terminated.
var is_paused : boolWhether the graph is paused (human-in-the-loop).
var pause_reason : str | NoneReason provided to
HumanInLoop.var thread_id : str | NoneActive thread identifier.
Thread Management
def set_thread(self, thread_id: str) ‑> NoneOverride the active thread_id (resets checkpoint counters).
async def get_or_create_thread(self, user_id: str) ‑> strReturn the existing thread_id for this graph + user, or create one.
class GraphConfig (name: str = 'conversational_graph',
graph_id: str = <random UUID>,
debug: bool = False,
max_retries: int = 10,
hangup_delay: float = 4.0,
stream: bool = False,
checkpointer: BaseCheckpointer | None = None)-
Configuration for a
ConversationalGraphinstance.Parameters
name:str- Human-readable name for the graph (used in logs/traces). Defaults to
"conversational_graph". graph_id:str- Unique version identifier (e.g.
"loan_v1.2"). Defaults to a random UUID. debug:bool- When
True, emit verbose DEBUG logs for every StateMachine step. Defaults toFalse. max_retries:int- Maximum times a node may return
Interruptbefore the engine ends the workflow with a warning. Defaults to10. hangup_delay:float- Seconds to wait before hanging up after the graph ends. Defaults to
4.0. stream:bool- When
True, aLatencyProfilerHookis automatically attached at compile time. The profiler instance is accessible viagraph.latency_profiler. Defaults toFalse. checkpointer:BaseCheckpointer | None- Optional checkpoint backend. When set, the graph auto-saves a
GraphMomentafter every turn and exposesget_state/update_state/get_state_history.
Example
config = GraphConfig( name="loan_workflow", graph_id="loan_v2", stream=True, checkpointer=InMemorySaver(), ) graph = ConversationalGraph(data_model=LoanState, config=config) class GraphState (**data)-
Base Pydantic model for defining a graph's state schema.
Subclass
GraphStateand declare fields with type annotations andField(description=...)to define what data the graph collects.from pydantic import Field from videosdk.conversational_graph import GraphState class LoanState(GraphState): name: str = Field(None, description="Applicant's full name") income: float = Field(None, description="Annual income in USD") loan_amount: float = Field(None, description="Requested loan amount")Pass the subclass to
ConversationalGraph(data_model=LoanState). class Route (target: str,
update: dict | None = None)-
Route execution to another node, with optional state updates.
Return a
Routefrom a node function to force an immediate transition to the target node. The optionalupdatedict is merged intostatesbefore the target node runs.Parameters
target:str- Name of the destination node.
update:dict | None- State updates to apply before transitioning.
Example
async def income_node(state, ctx): if state.employment_status == "unemployed": return Route("reject_node", update={"income": 0}) return "Please tell me your annual income." END-
Singleton action. Return from a node function to terminate the graph.
async def goodbye_node(state, ctx): await ctx.say("Thank you, goodbye!") return END START-
Singleton action used in
add_transition()to mark the graph entry point.graph.add_transition(START, "welcome", "collect_info", END) class Interrupt (say: str,
id: str | None = None)-
Pause graph execution and deliver a prompt to the user.
When a node returns
Interrupt, the graph stays on the current node and re-executes it on the next user input.Parameters
say:str- The message to present to the user.
id:str | None- Optional identifier for the interrupt.
Example
async def confirm_node(state, ctx): if state.confirmed is None: return Interrupt("Could you please confirm your details?") return Route("next_step") class HumanInLoop (reason: str,
say: str | None = None,
timeout: float | None = None)-
Pause the graph and wait for external human input.
The graph holds at the current node until
graph.resume_with_human_input(payload)is called from external code (API callback, webhook, etc.). When resumed, the same node re-runs with the payload available viactx.human_input.Parameters
reason:str- Why the graph is pausing (stored as
pause_reason). say:str | None- Optional message to send while waiting.
timeout:float | None- Seconds before auto-resuming.
None= wait forever.
Example
async def review_node(state, ctx): if ctx.human_input is None: return HumanInLoop( reason="manager_approval", say="Your application is under review.", timeout=300.0, ) decision = ctx.human_input.get("approved") return Route("approved" if decision else "rejected") class Context-
Execution context passed as the second argument to node functions.
Provides access to states, the agent, extraction helpers, conversation history, and human-in-the-loop input.
Example
async def greet_node(state, ctx): await ctx.say("Welcome! Let me help you today.") return "What is your name?"Instance variables
var states : dict[str, Any]Collected field values.
var current_node : strName of the currently executing node.
var execution_history : list[dict]Ordered log of node visits.
var last_user_message : str | NoneThe most recent user utterance.
var human_input : dict[str, Any] | NonePayload provided by an external human via
graph.resume_with_human_input().var extractor : ExtractorPer-turn extractor helper for intent matching and data collection.
Methods
async def ask(self, instruction: str, interruptible: bool = False) ‑> strPass
instructionto the LLM for naturalisation via a registered callback.async def say(self, message: str, interruptible: bool = True) ‑> NoneSpeak
messageverbatim via TTS (bypasses the LLM).
class State-
Attribute-style proxy over
ctx.states.The
stateobject is passed as the first argument to every node function. Read and write fields using dotted attribute access.async def my_node(state, ctx): if state.employment_status == "unemployed": state.income = 0 return Route("reject") return "What is your annual income?"Methods
def to_dict(self) ‑> dictReturn a plain dict copy of all current state values.
class TurnContext-
Ephemeral per-turn state container, accessible as
ctx.turn.Instance variables
var reply_triggered : boolTrueifctx.say()orctx.ask()fired this turn.var is_decision_rerun : boolTruewhen the node is re-running insideapply_decision()after the LLM extracted new values.var pending_collect_schemaPydantic model set by
extractor.collect().var pending_collect_prompt : str | NoneInstruction passed to
collect().var pending_say_message : str | NoneBuffered verbatim message from
say().var validation_errors : dict[str, str]Field → error message from last turn.
var semantic_extract_cache : dictPer-turn embedding cache.
class Extractor-
Per-turn extractor helper for intent classification and structured data collection.
Access via
ctx.extractorinside a node function.Methods
async def match_intent(self, intents: dict[str, str]) ‑> str-
Classify the current user utterance against a set of named intents.
Returns the best-matching intent key, or
"unknown"if none match above the similarity threshold.Parameters
intents:dict[str, str]- Mapping of
{intent_key: description}.
Example
intent = await ctx.extractor.match_intent({ "book": "User wants to book or reserve", "cancel": "User wants to cancel a booking", "query": "User wants to check status", }) async def collect(self, schema: Type[BaseModel] | None = None, prompt: str = '', fields: list[str] | None = None) ‑> CollectResult-
Collect structured data from the user's utterance.
Either
schemaorfieldsmust be provided.Parameters
schema:Type[BaseModel] | None- A Pydantic model whose fields define what to extract.
prompt:str- Instruction for the LLM when extraction fails.
fields:list[str] | None- Field names from the global state model to extract. A dynamic Pydantic model is built from these.
Returns
CollectResultwith.extracted(model instance),.raw(original text), and.success(Truewhen all required fields are populated).Example
result = await ctx.extractor.collect( fields=["name", "email"], prompt="Could you please provide your name and email?", ) if result.success: return Route("next_step")
class CollectResult-
Result of an
extractor.collect()call.Instance variables
var extracted : BaseModel | NonePydantic model instance with extracted values. Fields that were not extracted are
None.var raw : strThe original user message text.
var success : boolTruewhen all required fields have been populated.
class GraphHook-
Abstract base class for graph lifecycle hooks.
Subclass
GraphHookand override only the events your extension needs. All methods areasyncand default to no-ops.Example
class LoggingHook(GraphHook): async def on_node_enter(self, node, ctx): print(f"Entering: {node}") async def on_state_update(self, updates, ctx): print(f"State updated: {updates}") graph.add_hook(LoggingHook())Methods
async def on_node_enter(self, node: str, ctx: Context) ‑> NoneCalled when a node is about to execute.
async def on_node_exit(self, node: str, result: Any, ctx: Context) ‑> NoneCalled after a node returns.
async def on_state_update(self, updates: dict[str, Any], ctx: Context) ‑> NoneCalled whenever state fields are updated.
async def on_state_machine_advance(self, from_node: str | None, to_node: str | None, ctx: Context) ‑> NoneCalled on every state machine transition.
async def on_interrupt(self, node: str, say: str, retry_count: int, ctx: Context) ‑> NoneCalled when a node returns
Interrupt.async def on_human_in_loop(self, node: str, reason: str, ctx: Context) ‑> NoneCalled when a node returns
HumanInLoop.async def on_resume(self, payload: Any, ctx: Context) ‑> NoneCalled when the graph resumes from a pause.
async def on_end(self, ctx: Context) ‑> NoneCalled when the graph terminates.
class LatencyProfilerHook (live_print: bool = True,
slow_node_ms: float = 0,
event_sink: Callable | None = None)-
Production-grade hook that measures wall-clock time for every node execution, transition, turn, interrupt, and state update.
Includes percentile stats (p50/p95/p99), configurable slow-node warnings, thread-safe accumulation, hot-path detection, and a pluggable event sink for forwarding data to external APM/tracing systems.
Automatically attached when
GraphConfig(stream=True)is set — the profiler is then accessible viagraph.latency_profiler.Parameters
live_print:bool- Emit
INFO-level log lines for every lifecycle event. Defaults toTrue. slow_node_ms:float- Millisecond threshold — any node exceeding this triggers a
WARNINGlog.0disables. Defaults to0. event_sink:Callable | None(event_name, payload_dict) -> Nonecallback invoked for every profiler event, for forwarding to Datadog / OpenTelemetry / custom backends.
Example
# Automatic via stream=True: graph = ConversationalGraph( data_model=MyState, config=GraphConfig(stream=True), ) await graph.compile() graph.latency_profiler.print_summary() # Manual with custom settings: profiler = LatencyProfilerHook(slow_node_ms=500.0) graph.add_hook(profiler)Methods
def reset(self) ‑> NoneClear all accumulated data. Safe between runs.
def record_func_timing(self, node: str, func_ms: float, resolve_ms: float) ‑> NoneRecord function vs resolve timing split for a node.
def get_analysis(self) ‑> dict[str, Any]Structured analysis with percentile breakdowns, hot-path, and bottleneck analysis.
def dump(self, filepath: str = 'latency_report.json') ‑> strWrite the analysis to a JSON file and return the path.
def print_summary(self) ‑> NonePrint a human-readable latency summary to stdout.
class GraphMoment-
Immutable snapshot of graph state captured at each turn boundary.
Moments form a linked list via
parent_moment_id, giving a complete, replayable history of a conversation thread.Instance variables
var moment_id : strUnique identifier for this moment.
var thread_id : strConversation thread this moment belongs to.
var step : intSequential step number within the thread.
var state : dict[str, Any]Collected state values at this point.
var current_node : str | NoneNode that was active when captured.
var is_ended : boolWhether the graph had terminated.
var is_paused : boolWhether the graph was paused (human-in-the-loop).
var execution_history : list[dict]Node visit log up to this point.
var metadata : dict[str, Any]Arbitrary metadata.
var parent_moment_id : str | NoneID of the preceding moment.
var created_at : strISO-8601 UTC timestamp.
var session_id : str | NoneSession identifier (if available).
var user_message : str | NoneUser message that triggered this moment.
var ai_message : str | NoneAI response at this moment.
var next_node : str | NoneNode the graph transitioned to after this moment.
var parent_node : str | NoneNode that preceded this moment.
var duration_ms : float | NoneTurn duration in milliseconds.
Methods
def to_dict(self) ‑> dictSerialise to a plain dictionary.
classmethod from_dict(data: dict) ‑> GraphMomentDeserialise from a dictionary.
class BaseCheckpointer-
Abstract base class for checkpoint persistence backends.
Implement this to create custom storage backends. Moments are keyed by
(thread_id, moment_id).Methods
async def put(self, moment: GraphMoment) ‑> NoneAbstract. Persist a moment.
async def get(self, thread_id: str, moment_id: str | None = None, *, before: str | None = None, after: str | None = None) ‑> GraphMoment | NoneAbstract. Retrieve a moment. With no filters, returns the latest.
async def get_history(self, thread_id: str) ‑> list[GraphMoment]Abstract. Return all moments for a thread, oldest first.
async def delete(self, thread_id: str) ‑> NoneAbstract. Remove all moments for a thread.
async def get_or_create_thread(self, user_id: str, graph_id: str) ‑> strReturn the existing thread_id for (user_id, graph_id) or create a new one.
class CheckpointHook (thread_id: str | None = None)-
Lifecycle hook that captures a
GraphMomentafter each node execution.Moments are accumulated in
self.momentsand can be inspected or persisted after the conversation ends.Parameters
thread_id:str | None- Thread identifier. Defaults to a short UUID.
Example
hook = CheckpointHook() graph.add_hook(hook) # ... run the graph ... for moment in hook.get_moments(): print(moment.current_node, moment.state)Instance variables
var moments : list[GraphMoment]Accumulated graph moments.
Methods
def get_moments(self) ‑> list[GraphMoment]Return a copy of all captured moments.
def get_moment(self, moment_id: str) ‑> GraphMoment | NoneFind a specific moment by its checkpoint ID.
def to_list(self) ‑> list[dict]Serialise all moments to a list of dicts.
def summary(self) ‑> strHuman-readable summary of all captured moments.
class InMemorySaver-
In-memory checkpoint saver. Good for development and testing.
All data is lost when the process exits.
graph = ConversationalGraph( data_model=MyState, config=GraphConfig(checkpointer=InMemorySaver()), ) class FileSaver (directory: str = '/tmp/conv_graph_checkpoints')-
File-based checkpoint saver. Persists moments as one JSON file per thread.
Parameters
directory:str- Directory path for JSON files. Defaults to
"/tmp/conv_graph_checkpoints".
graph = ConversationalGraph( data_model=MyState, config=GraphConfig(checkpointer=FileSaver("./checkpoints")), ) class MongoDBSaver (uri: str = 'mongodb://localhost:27017',
db_name: str = 'conv_graph',
collection_name: str = 'checkpoints',
client=None)-
MongoDB checkpoint saver. Requires the
motorpackage.Parameters
uri:str- MongoDB connection URI. Defaults to
"mongodb://localhost:27017". db_name:str- Database name. Defaults to
"conv_graph". collection_name:str- Collection name. Defaults to
"checkpoints". client- Optional pre-existing
AsyncIOMotorClientinstance.
graph = ConversationalGraph( data_model=MyState, config=GraphConfig( checkpointer=MongoDBSaver(uri="mongodb://db:27017", db_name="myapp"), ), ) class VideoSDKCheckpointer (base_url: str,
token: str,
timeout: float = 10.0)-
REST API checkpoint saver via VideoSDK. Requires the
httpxpackage.Parameters
base_url:str- VideoSDK API base URL.
token:str- Authentication token. Also reads the
VIDEOSDK_AUTH_TOKENenvironment variable. timeout:float- Request timeout in seconds. Defaults to
10.0.
class GraphError (*args, **kwargs)-
Base exception for all ConversationalGraph errors.
Catch this to handle any graph-related error in a single clause.
try: await graph.handle_input(text) except GraphError as exc: ... class GraphNotCompiledError (*args, **kwargs)-
Raised when a method that requires
compile()is called before the graph has been compiled.Ancestors
class GraphAlreadyEndedError (*args, **kwargs)-
Raised when input is sent to a graph whose execution has already terminated (reached an
ENDnode).Ancestors
class GraphPausedError (*args, **kwargs)-
Raised when an operation requires the graph to be running but the graph is currently paused (waiting for human-in-the-loop input).
Ancestors
class GraphResumeError (*args, **kwargs)-
Raised when
resume()is called on a graph that is not paused.Ancestors
class GraphRecursionError (*args, **kwargs)-
Raised when node execution depth exceeds the configured recursion limit (
GraphConfig.max_retries).Ancestors
class GraphValidationError (*args, **kwargs)-
Raised when the graph definition fails topology validation (e.g. missing nodes, dangling edges, unreachable start node).
Ancestors
class NodeNotFoundError (*args, **kwargs)-
Raised when a node referenced by a
Route, transition, orstart_nodedoes not exist in the graph.Ancestors
class NodeExecutionError (*args, **kwargs)-
Raised when a node function raises an unhandled exception during execution.
Ancestors
Instance variables
var node_name : strName of the node that failed.
var cause : ExceptionThe original exception raised by the node function.
class InvalidTransitionError (*args, **kwargs)-
Raised when a conditional edge's
decide()function returns a key not present in its mapping.Ancestors
Instance variables
var node_name : strSource node of the conditional edge.
var key : strThe unmapped key that was returned.
var valid_keys : list[str]List of valid keys defined in the mapping.
class CheckpointNotFoundError (*args, **kwargs)-
Raised when a requested checkpoint ID does not exist for the given thread.
Ancestors
class CheckpointReplayError (*args, **kwargs)-
Raised when replay is attempted without a configured checkpointer or with an invalid checkpoint.
Ancestors
class CheckpointBackendError (*args, **kwargs)-
Raised when the checkpoint backend (HTTP, MongoDB, etc.) encounters an error during a storage operation.
Ancestors