Package conversational_graph

Public interface layer for the VideoSDK Conversational Graph engine.

Import everything you need from a single entry point:

from videosdk.conversational_graph import (
    ConversationalGraph, GraphConfig, GraphState,
    Route, END, START, Interrupt, HumanInLoop,
    Context, State, Extractor, CollectResult,
    GraphHook, LatencyProfilerHook,
    BaseCheckpointer, CheckpointHook,
    InMemorySaver, FileSaver, MongoDBSaver, VideoSDKCheckpointer,
    GraphMoment,
)

Classes

class ConversationalGraph (data_model: Type[BaseModel] | None = None,
config: GraphConfig | None = None)

Deterministic conversational graph engine. Orchestrates a state machine of node functions, handling transitions, state extraction, interrupts, human-in-the-loop pauses, and checkpointing.

Parameters

data_model : Type[BaseModel] | None
Pydantic model defining the state schema (subclass of GraphState).
config : GraphConfig | None
Graph configuration. Uses defaults when None.

Example

from videosdk.conversational_graph import (
    ConversationalGraph, GraphConfig, GraphState,
    Route, END, START, InMemorySaver,
)
from pydantic import Field

class MyState(GraphState):
    name: str = Field(None, description="User's name")
    age: int  = Field(None, description="User's age")

async def welcome(state, ctx):
    return "Hello! What is your name?"

async def collect_age(state, ctx):
    result = await ctx.extractor.collect(fields=["age"], prompt="How old are you?")
    if result.success:
        return Route("done")

async def done(state, ctx):
    await ctx.say(f"Thanks {state.name}, you are {state.age} years old!")
    return END

graph = (
    ConversationalGraph(data_model=MyState, config=GraphConfig(name="demo"))
    .add_start_node("welcome", welcome)
    .add_node("collect_age", collect_age)
    .add_end_node("done", done)
    .add_transition(START, "welcome", "collect_age", "done")
)

Graph Definition Methods

All definition methods return self for chaining.

def add_node(self, name: str, func: Callable) ‑> ConversationalGraph

Register a node function. func signature: (state, ctx) -> ...

def add_start_node(self, name: str, func: Callable) ‑> ConversationalGraph

Register a node and mark it as the graph's entry point.

def add_end_node(self, name: str, func: Callable) ‑> ConversationalGraph

Register a terminal node (no outgoing edges).

def add_transition(self, *nodes) ‑> ConversationalGraph

Add sequential edges. START and END are valid markers.

graph.add_transition(START, "welcome", "collect", END)
def add_conditional_transition(self, from_node: str, mapping: dict[str, str], decide: Callable) ‑> ConversationalGraph

Add a conditional (branching) edge. decide receives (state) and returns a key from mapping.

def add_hook(self, hook: GraphHook) ‑> ConversationalGraph

Register a lifecycle extension hook.

Compilation & Runtime

async def compile(self, user_config: dict | None = None, session_id: str | None = None) ‑> ConversationalGraph

Initialise all runtime components. Must be called before handle_input. Idempotent — safe to call multiple times.

def set_callbacks(self, say: Callable | None = None, ask: Callable | None = None, hangup: Callable | None = None) ‑> None

Register callbacks that bridge the graph to the hosting environment (agent pipeline, tests, etc.). Set automatically when using GraphPipelineAdapter.

async def handle_input(self, user_text: str) ‑> str

Process a user message. Returns an enriched prompt for the LLM.

async def handle_decision(self, graph_response: Any) ‑> str | None

Process the LLM's structured response and advance the state machine.

Pause / Resume (Human-in-the-Loop)

async def resume_with_human_input(self, payload: dict | None = None) ‑> str

Resume a paused graph with external human input.

async def resume(self, thread_id: str | None = None, update: dict[str, Any] | None = None) ‑> ConversationalGraph

Restore the graph to the latest checkpoint for a thread.

Checkpoint / Replay

async def get_moments(self) ‑> list[GraphMoment]

Get all checkpoint moments for the current thread.

async def get_state(self, thread_id: str | None = None, moment_id: str | None = None, *, before: str | None = None, after: str | None = None) ‑> GraphMoment | None

Return a state moment for a thread. With no filters, returns the latest.

async def update_state(self, updates: dict[str, Any], thread_id: str | None = None, as_node: str | None = None) ‑> GraphMoment | None

Apply updates on top of the latest checkpoint and save a new moment.

async def get_state_history(self, thread_id: str | None = None) ‑> list[GraphMoment]

Return all checkpoint moments for a thread, oldest first.

async def replay(self, moment_id: str | None = None, thread_id: str | None = None, update: dict[str, Any] | None = None) ‑> ConversationalGraph

Restore the graph to a prior checkpoint (time-travel).

Inspection

def get_nodes(self) ‑> list[str]

Return all registered node names.

def get_transitions(self) ‑> dict[str, list[str]]

Return the edge adjacency map.

def visualize(self) ‑> None

Print the full graph routing to stdout. Parses Route() targets from node source code to resolve dynamic branches.

def get_graph_status(self) ‑> str

Return a detailed string of the current graph status.

Properties

var current_node : str | None

Name of the currently executing node.

var is_ended : bool

Whether the graph has terminated.

var is_paused : bool

Whether the graph is paused (human-in-the-loop).

var pause_reason : str | None

Reason provided to HumanInLoop.

var thread_id : str | None

Active thread identifier.

Thread Management

def set_thread(self, thread_id: str) ‑> None

Override the active thread_id (resets checkpoint counters).

async def get_or_create_thread(self, user_id: str) ‑> str

Return the existing thread_id for this graph + user, or create one.

class GraphConfig (name: str = 'conversational_graph',
graph_id: str = <random UUID>,
debug: bool = False,
max_retries: int = 10,
hangup_delay: float = 4.0,
stream: bool = False,
checkpointer: BaseCheckpointer | None = None)

Configuration for a ConversationalGraph instance.

Parameters

name : str
Human-readable name for the graph (used in logs/traces). Defaults to "conversational_graph".
graph_id : str
Unique version identifier (e.g. "loan_v1.2"). Defaults to a random UUID.
debug : bool
When True, emit verbose DEBUG logs for every StateMachine step. Defaults to False.
max_retries : int
Maximum times a node may return Interrupt before the engine ends the workflow with a warning. Defaults to 10.
hangup_delay : float
Seconds to wait before hanging up after the graph ends. Defaults to 4.0.
stream : bool
When True, a LatencyProfilerHook is automatically attached at compile time. The profiler instance is accessible via graph.latency_profiler. Defaults to False.
checkpointer : BaseCheckpointer | None
Optional checkpoint backend. When set, the graph auto-saves a GraphMoment after every turn and exposes get_state / update_state / get_state_history.

Example

config = GraphConfig(
    name="loan_workflow",
    graph_id="loan_v2",
    stream=True,
    checkpointer=InMemorySaver(),
)
graph = ConversationalGraph(data_model=LoanState, config=config)
class GraphState (**data)

Base Pydantic model for defining a graph's state schema.

Subclass GraphState and declare fields with type annotations and Field(description=...) to define what data the graph collects.

from pydantic import Field
from videosdk.conversational_graph import GraphState

class LoanState(GraphState):
    name: str        = Field(None, description="Applicant's full name")
    income: float    = Field(None, description="Annual income in USD")
    loan_amount: float = Field(None, description="Requested loan amount")

Pass the subclass to ConversationalGraph(data_model=LoanState).

class Route (target: str,
update: dict | None = None)

Route execution to another node, with optional state updates.

Return a Route from a node function to force an immediate transition to the target node. The optional update dict is merged into states before the target node runs.

Parameters

target : str
Name of the destination node.
update : dict | None
State updates to apply before transitioning.

Example

async def income_node(state, ctx):
    if state.employment_status == "unemployed":
        return Route("reject_node", update={"income": 0})
    return "Please tell me your annual income."
END

Singleton action. Return from a node function to terminate the graph.

async def goodbye_node(state, ctx):
    await ctx.say("Thank you, goodbye!")
    return END
START

Singleton action used in add_transition() to mark the graph entry point.

graph.add_transition(START, "welcome", "collect_info", END)
class Interrupt (say: str,
id: str | None = None)

Pause graph execution and deliver a prompt to the user.

When a node returns Interrupt, the graph stays on the current node and re-executes it on the next user input.

Parameters

say : str
The message to present to the user.
id : str | None
Optional identifier for the interrupt.

Example

async def confirm_node(state, ctx):
    if state.confirmed is None:
        return Interrupt("Could you please confirm your details?")
    return Route("next_step")
class HumanInLoop (reason: str,
say: str | None = None,
timeout: float | None = None)

Pause the graph and wait for external human input.

The graph holds at the current node until graph.resume_with_human_input(payload) is called from external code (API callback, webhook, etc.). When resumed, the same node re-runs with the payload available via ctx.human_input.

Parameters

reason : str
Why the graph is pausing (stored as pause_reason).
say : str | None
Optional message to send while waiting.
timeout : float | None
Seconds before auto-resuming. None = wait forever.

Example

async def review_node(state, ctx):
    if ctx.human_input is None:
        return HumanInLoop(
            reason="manager_approval",
            say="Your application is under review.",
            timeout=300.0,
        )
    decision = ctx.human_input.get("approved")
    return Route("approved" if decision else "rejected")
class Context

Execution context passed as the second argument to node functions.

Provides access to states, the agent, extraction helpers, conversation history, and human-in-the-loop input.

Example

async def greet_node(state, ctx):
    await ctx.say("Welcome! Let me help you today.")
    return "What is your name?"

Instance variables

var states : dict[str, Any]

Collected field values.

var current_node : str

Name of the currently executing node.

var execution_history : list[dict]

Ordered log of node visits.

var last_user_message : str | None

The most recent user utterance.

var human_input : dict[str, Any] | None

Payload provided by an external human via graph.resume_with_human_input().

var extractor : Extractor

Per-turn extractor helper for intent matching and data collection.

Methods

async def ask(self, instruction: str, interruptible: bool = False) ‑> str

Pass instruction to the LLM for naturalisation via a registered callback.

async def say(self, message: str, interruptible: bool = True) ‑> None

Speak message verbatim via TTS (bypasses the LLM).

class State

Attribute-style proxy over ctx.states.

The state object is passed as the first argument to every node function. Read and write fields using dotted attribute access.

async def my_node(state, ctx):
    if state.employment_status == "unemployed":
        state.income = 0
        return Route("reject")
    return "What is your annual income?"

Methods

def to_dict(self) ‑> dict

Return a plain dict copy of all current state values.

class TurnContext

Ephemeral per-turn state container, accessible as ctx.turn.

Instance variables

var reply_triggered : bool

True if ctx.say() or ctx.ask() fired this turn.

var is_decision_rerun : bool

True when the node is re-running inside apply_decision() after the LLM extracted new values.

var pending_collect_schema

Pydantic model set by extractor.collect().

var pending_collect_prompt : str | None

Instruction passed to collect().

var pending_say_message : str | None

Buffered verbatim message from say().

var validation_errors : dict[str, str]

Field → error message from last turn.

var semantic_extract_cache : dict

Per-turn embedding cache.

class Extractor

Per-turn extractor helper for intent classification and structured data collection.

Access via ctx.extractor inside a node function.

Methods

async def match_intent(self, intents: dict[str, str]) ‑> str

Classify the current user utterance against a set of named intents.

Returns the best-matching intent key, or "unknown" if none match above the similarity threshold.

Parameters

intents : dict[str, str]
Mapping of {intent_key: description}.

Example

intent = await ctx.extractor.match_intent({
    "book":   "User wants to book or reserve",
    "cancel": "User wants to cancel a booking",
    "query":  "User wants to check status",
})
async def collect(self, schema: Type[BaseModel] | None = None, prompt: str = '', fields: list[str] | None = None) ‑> CollectResult

Collect structured data from the user's utterance.

Either schema or fields must be provided.

Parameters

schema : Type[BaseModel] | None
A Pydantic model whose fields define what to extract.
prompt : str
Instruction for the LLM when extraction fails.
fields : list[str] | None
Field names from the global state model to extract. A dynamic Pydantic model is built from these.

Returns

CollectResult with .extracted (model instance), .raw (original text), and .success (True when all required fields are populated).

Example

result = await ctx.extractor.collect(
    fields=["name", "email"],
    prompt="Could you please provide your name and email?",
)
if result.success:
    return Route("next_step")
class CollectResult

Result of an extractor.collect() call.

Instance variables

var extracted : BaseModel | None

Pydantic model instance with extracted values. Fields that were not extracted are None.

var raw : str

The original user message text.

var success : bool

True when all required fields have been populated.

class GraphHook

Abstract base class for graph lifecycle hooks.

Subclass GraphHook and override only the events your extension needs. All methods are async and default to no-ops.

Example

class LoggingHook(GraphHook):
    async def on_node_enter(self, node, ctx):
        print(f"Entering: {node}")

    async def on_state_update(self, updates, ctx):
        print(f"State updated: {updates}")

graph.add_hook(LoggingHook())

Methods

async def on_node_enter(self, node: str, ctx: Context) ‑> None

Called when a node is about to execute.

async def on_node_exit(self, node: str, result: Any, ctx: Context) ‑> None

Called after a node returns.

async def on_state_update(self, updates: dict[str, Any], ctx: Context) ‑> None

Called whenever state fields are updated.

async def on_state_machine_advance(self, from_node: str | None, to_node: str | None, ctx: Context) ‑> None

Called on every state machine transition.

async def on_interrupt(self, node: str, say: str, retry_count: int, ctx: Context) ‑> None

Called when a node returns Interrupt.

async def on_human_in_loop(self, node: str, reason: str, ctx: Context) ‑> None

Called when a node returns HumanInLoop.

async def on_resume(self, payload: Any, ctx: Context) ‑> None

Called when the graph resumes from a pause.

async def on_end(self, ctx: Context) ‑> None

Called when the graph terminates.

class LatencyProfilerHook (live_print: bool = True,
slow_node_ms: float = 0,
event_sink: Callable | None = None)

Production-grade hook that measures wall-clock time for every node execution, transition, turn, interrupt, and state update.

Includes percentile stats (p50/p95/p99), configurable slow-node warnings, thread-safe accumulation, hot-path detection, and a pluggable event sink for forwarding data to external APM/tracing systems.

Automatically attached when GraphConfig(stream=True) is set — the profiler is then accessible via graph.latency_profiler.

Parameters

live_print : bool
Emit INFO-level log lines for every lifecycle event. Defaults to True.
slow_node_ms : float
Millisecond threshold — any node exceeding this triggers a WARNING log. 0 disables. Defaults to 0.
event_sink : Callable | None
(event_name, payload_dict) -> None callback invoked for every profiler event, for forwarding to Datadog / OpenTelemetry / custom backends.

Example

# Automatic via stream=True:
graph = ConversationalGraph(
    data_model=MyState,
    config=GraphConfig(stream=True),
)
await graph.compile()
graph.latency_profiler.print_summary()

# Manual with custom settings:
profiler = LatencyProfilerHook(slow_node_ms=500.0)
graph.add_hook(profiler)

Methods

def reset(self) ‑> None

Clear all accumulated data. Safe between runs.

def record_func_timing(self, node: str, func_ms: float, resolve_ms: float) ‑> None

Record function vs resolve timing split for a node.

def get_analysis(self) ‑> dict[str, Any]

Structured analysis with percentile breakdowns, hot-path, and bottleneck analysis.

def dump(self, filepath: str = 'latency_report.json') ‑> str

Write the analysis to a JSON file and return the path.

def print_summary(self) ‑> None

Print a human-readable latency summary to stdout.

class GraphMoment

Immutable snapshot of graph state captured at each turn boundary.

Moments form a linked list via parent_moment_id, giving a complete, replayable history of a conversation thread.

Instance variables

var moment_id : str

Unique identifier for this moment.

var thread_id : str

Conversation thread this moment belongs to.

var step : int

Sequential step number within the thread.

var state : dict[str, Any]

Collected state values at this point.

var current_node : str | None

Node that was active when captured.

var is_ended : bool

Whether the graph had terminated.

var is_paused : bool

Whether the graph was paused (human-in-the-loop).

var execution_history : list[dict]

Node visit log up to this point.

var metadata : dict[str, Any]

Arbitrary metadata.

var parent_moment_id : str | None

ID of the preceding moment.

var created_at : str

ISO-8601 UTC timestamp.

var session_id : str | None

Session identifier (if available).

var user_message : str | None

User message that triggered this moment.

var ai_message : str | None

AI response at this moment.

var next_node : str | None

Node the graph transitioned to after this moment.

var parent_node : str | None

Node that preceded this moment.

var duration_ms : float | None

Turn duration in milliseconds.

Methods

def to_dict(self) ‑> dict

Serialise to a plain dictionary.

classmethod from_dict(data: dict) ‑> GraphMoment

Deserialise from a dictionary.

class BaseCheckpointer

Abstract base class for checkpoint persistence backends.

Implement this to create custom storage backends. Moments are keyed by (thread_id, moment_id).

Methods

async def put(self, moment: GraphMoment) ‑> None

Abstract. Persist a moment.

async def get(self, thread_id: str, moment_id: str | None = None, *, before: str | None = None, after: str | None = None) ‑> GraphMoment | None

Abstract. Retrieve a moment. With no filters, returns the latest.

async def get_history(self, thread_id: str) ‑> list[GraphMoment]

Abstract. Return all moments for a thread, oldest first.

async def delete(self, thread_id: str) ‑> None

Abstract. Remove all moments for a thread.

async def get_or_create_thread(self, user_id: str, graph_id: str) ‑> str

Return the existing thread_id for (user_id, graph_id) or create a new one.

class CheckpointHook (thread_id: str | None = None)

Lifecycle hook that captures a GraphMoment after each node execution.

Moments are accumulated in self.moments and can be inspected or persisted after the conversation ends.

Parameters

thread_id : str | None
Thread identifier. Defaults to a short UUID.

Example

hook = CheckpointHook()
graph.add_hook(hook)

# ... run the graph ...

for moment in hook.get_moments():
    print(moment.current_node, moment.state)

Instance variables

var moments : list[GraphMoment]

Accumulated graph moments.

Methods

def get_moments(self) ‑> list[GraphMoment]

Return a copy of all captured moments.

def get_moment(self, moment_id: str) ‑> GraphMoment | None

Find a specific moment by its checkpoint ID.

def to_list(self) ‑> list[dict]

Serialise all moments to a list of dicts.

def summary(self) ‑> str

Human-readable summary of all captured moments.

class InMemorySaver

In-memory checkpoint saver. Good for development and testing.

All data is lost when the process exits.

graph = ConversationalGraph(
    data_model=MyState,
    config=GraphConfig(checkpointer=InMemorySaver()),
)
class FileSaver (directory: str = '/tmp/conv_graph_checkpoints')

File-based checkpoint saver. Persists moments as one JSON file per thread.

Parameters

directory : str
Directory path for JSON files. Defaults to "/tmp/conv_graph_checkpoints".
graph = ConversationalGraph(
    data_model=MyState,
    config=GraphConfig(checkpointer=FileSaver("./checkpoints")),
)
class MongoDBSaver (uri: str = 'mongodb://localhost:27017',
db_name: str = 'conv_graph',
collection_name: str = 'checkpoints',
client=None)

MongoDB checkpoint saver. Requires the motor package.

Parameters

uri : str
MongoDB connection URI. Defaults to "mongodb://localhost:27017".
db_name : str
Database name. Defaults to "conv_graph".
collection_name : str
Collection name. Defaults to "checkpoints".
client
Optional pre-existing AsyncIOMotorClient instance.
graph = ConversationalGraph(
    data_model=MyState,
    config=GraphConfig(
        checkpointer=MongoDBSaver(uri="mongodb://db:27017", db_name="myapp"),
    ),
)
class VideoSDKCheckpointer (base_url: str,
token: str,
timeout: float = 10.0)

REST API checkpoint saver via VideoSDK. Requires the httpx package.

Parameters

base_url : str
VideoSDK API base URL.
token : str
Authentication token. Also reads the VIDEOSDK_AUTH_TOKEN environment variable.
timeout : float
Request timeout in seconds. Defaults to 10.0.
class GraphError (*args, **kwargs)

Base exception for all ConversationalGraph errors.

Catch this to handle any graph-related error in a single clause.

try:
    await graph.handle_input(text)
except GraphError as exc:
    ...
class GraphNotCompiledError (*args, **kwargs)

Raised when a method that requires compile() is called before the graph has been compiled.

Ancestors

class GraphAlreadyEndedError (*args, **kwargs)

Raised when input is sent to a graph whose execution has already terminated (reached an END node).

Ancestors

class GraphPausedError (*args, **kwargs)

Raised when an operation requires the graph to be running but the graph is currently paused (waiting for human-in-the-loop input).

Ancestors

class GraphResumeError (*args, **kwargs)

Raised when resume() is called on a graph that is not paused.

Ancestors

class GraphRecursionError (*args, **kwargs)

Raised when node execution depth exceeds the configured recursion limit (GraphConfig.max_retries).

Ancestors

class GraphValidationError (*args, **kwargs)

Raised when the graph definition fails topology validation (e.g. missing nodes, dangling edges, unreachable start node).

Ancestors

class NodeNotFoundError (*args, **kwargs)

Raised when a node referenced by a Route, transition, or start_node does not exist in the graph.

Ancestors

class NodeExecutionError (*args, **kwargs)

Raised when a node function raises an unhandled exception during execution.

Ancestors

Instance variables

var node_name : str

Name of the node that failed.

var cause : Exception

The original exception raised by the node function.

class InvalidTransitionError (*args, **kwargs)

Raised when a conditional edge's decide() function returns a key not present in its mapping.

Ancestors

Instance variables

var node_name : str

Source node of the conditional edge.

var key : str

The unmapped key that was returned.

var valid_keys : list[str]

List of valid keys defined in the mapping.

class CheckpointNotFoundError (*args, **kwargs)

Raised when a requested checkpoint ID does not exist for the given thread.

Ancestors

class CheckpointReplayError (*args, **kwargs)

Raised when replay is attempted without a configured checkpointer or with an invalid checkpoint.

Ancestors

class CheckpointBackendError (*args, **kwargs)

Raised when the checkpoint backend (HTTP, MongoDB, etc.) encounters an error during a storage operation.

Ancestors