Module agents.graph_adapter

Functions

def prepare_strict_schema(schema_dict)
Expand source code
def prepare_strict_schema(schema_dict):
    if isinstance(schema_dict, dict):
        if schema_dict.get("type") == "object":
            schema_dict["additionalProperties"] = False
            if "properties" in schema_dict:
                all_props = list(schema_dict["properties"].keys())
                schema_dict["required"] = all_props
        
        for key, value in schema_dict.items():
            if isinstance(value, dict):
                prepare_strict_schema(value)
            elif isinstance(value, list):
                for item in value:
                    if isinstance(item, dict):
                        prepare_strict_schema(item)
    return schema_dict

Classes

class ConversationalGraphResponse (**data: Any)
Expand source code
class ConversationalGraphResponse(BaseModel):
    """Structured LLM response format used when a conversational graph is active."""

    response_to_user: str = Field(..., description="Response to the user by agent")
    extracted_values: List[ExtractedField] = Field(
        default_factory=list,
        description="List of extracted values from the user input",
    )

Structured LLM response format used when a conversational graph is active.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var extracted_values : List[ExtractedField]
var model_config
var response_to_user : str
class ExtractedField (**data: Any)
Expand source code
class ExtractedField(BaseModel):
    key: str = Field(..., description="The name of the field")
    value: Union[str, int, float, bool] = Field(..., description="The value of the field")

Usage Documentation

Models

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
A dictionary containing metadata about generic Pydantic models. The origin and args items map to the [__origin__][genericalias.origin] and [__args__][genericalias.args] attributes of [generic aliases][types-genericalias], and the parameter item maps to the __parameter__ attribute of generic classes.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var key : str
var model_config
var value : str | int | float | bool
class GraphPipelineAdapter (graph: Any)
Expand source code
class GraphPipelineAdapter:
    """`ConversationalGraph` instance with the agent pipeline."""

    def __init__(self, graph: Any) -> None:
        self._graph = graph
        self._agent: Any = None
        self._callbacks_bound: bool = False
        self._conversational_graph_schema_cache = None

    @property
    def has_agent(self) -> bool:
        """Check whether an agent is already bound."""
        return self._agent is not None

    def set_agent(self, agent: Any) -> None:
        """Bind *agent* to the graph via callbacks"""
        if self._callbacks_bound:
            return
        self._agent = agent
        self._callbacks_bound = True

        async def _say_cb(message: str, interruptible: bool = True) -> None:
            if hasattr(agent, "session") and agent.session:
                await agent.session.say(message, interruptible=interruptible)

        async def _ask_cb(instruction: str, interruptible: bool = False) -> None:
            if hasattr(agent, "session") and agent.session:
                await agent.session.reply(
                    instruction, interruptible=interruptible, wait_for_playback=False,
                )

        def _hangup_cb() -> None:
            self._schedule_hangup(agent)

        self._graph.set_callbacks(say=_say_cb, ask=_ask_cb, hangup=_hangup_cb)
        try:
            room = agent.session._job_context.room
            sid = getattr(room, "_session_id", None)
            if sid:
                self._graph._session_id = sid
        except Exception:
            pass

    def _schedule_hangup(self, agent: Any) -> None:
        """Hang up exactly when the final TTS stream ends."""
        pipeline = (
            agent.session.pipeline
            if hasattr(agent, "session") and agent.session and hasattr(agent.session, "pipeline")
            else None
        )

        if pipeline and hasattr(pipeline, "on"):
            fired = False

            async def _on_tts_end():
                nonlocal fired
                if fired:
                    return
                fired = True
                if hasattr(agent, "hangup"):
                    try:
                        await agent.hangup()
                    except Exception as exc:
                        logger.warning("[GRAPH] agent.hangup() failed: %s", exc)

            pipeline.on("agent_turn_end")(_on_tts_end)
        else:
            asyncio.ensure_future(self._hangup_after_timer(agent))

    async def _hangup_after_timer(self, agent: Any) -> None:
        delay = getattr(self._graph.config, "hangup_delay", 4.0)
        await asyncio.sleep(delay)
        if hasattr(agent, "hangup"):
            try:
                await agent.hangup()
            except Exception as exc:
                logger.warning("[GRAPH] agent.hangup() failed: %s", exc)

    def get_system_instructions(self) -> str:
        """Return graph-specific system instructions to prepend to the agent prompt."""
        return self._graph.get_system_instructions()


    async def handle_input(self, text: str) -> tuple[str | None, bool]:
        """ Pre-process user text through the graph. """
        result = await self._graph.handle_input(text)
        if result == _END_STREAM:
            return None, True
        return result, False

    async def handle_decision(self, agent: Any, graph_response: str) -> tuple[str | None, bool]:
        """Post-process the LLM's structured response through the graph."""
        if not self._callbacks_bound and agent:
            self.set_agent(agent)

        result = await self._graph.handle_decision(graph_response)
        if result == _END_STREAM:
            return None, True
        return result, False

    def stream_conversational_graph_response(
        self, current_content: str, state: Dict[str, Any]
    ) -> Iterator[str]:
        """Stream parsed ``response_to_user`` characters for TTS."""
        return self._graph.stream_conversational_graph_response(current_content, state)

    @staticmethod
    def get_response_schema() -> dict:
        """Return the JSON schema that LLM providers use as ``response_format``."""
        return ConversationalGraphResponse.model_json_schema()

    def _get_graph_schema(self):
        """Get the prepared strict schema from the graph adapter, cached after first call."""
        if self._conversational_graph_schema_cache is None:
            self._conversational_graph_schema_cache = prepare_strict_schema(self.get_response_schema())
        return self._conversational_graph_schema_cache

ConversationalGraph instance with the agent pipeline.

Static methods

def get_response_schema() ‑> dict
Expand source code
@staticmethod
def get_response_schema() -> dict:
    """Return the JSON schema that LLM providers use as ``response_format``."""
    return ConversationalGraphResponse.model_json_schema()

Return the JSON schema that LLM providers use as response_format.

Instance variables

prop has_agent : bool
Expand source code
@property
def has_agent(self) -> bool:
    """Check whether an agent is already bound."""
    return self._agent is not None

Check whether an agent is already bound.

Methods

def get_system_instructions(self) ‑> str
Expand source code
def get_system_instructions(self) -> str:
    """Return graph-specific system instructions to prepend to the agent prompt."""
    return self._graph.get_system_instructions()

Return graph-specific system instructions to prepend to the agent prompt.

async def handle_decision(self, agent: Any, graph_response: str) ‑> tuple[str | None, bool]
Expand source code
async def handle_decision(self, agent: Any, graph_response: str) -> tuple[str | None, bool]:
    """Post-process the LLM's structured response through the graph."""
    if not self._callbacks_bound and agent:
        self.set_agent(agent)

    result = await self._graph.handle_decision(graph_response)
    if result == _END_STREAM:
        return None, True
    return result, False

Post-process the LLM's structured response through the graph.

async def handle_input(self, text: str) ‑> tuple[str | None, bool]
Expand source code
async def handle_input(self, text: str) -> tuple[str | None, bool]:
    """ Pre-process user text through the graph. """
    result = await self._graph.handle_input(text)
    if result == _END_STREAM:
        return None, True
    return result, False

Pre-process user text through the graph.

def set_agent(self, agent: Any) ‑> None
Expand source code
def set_agent(self, agent: Any) -> None:
    """Bind *agent* to the graph via callbacks"""
    if self._callbacks_bound:
        return
    self._agent = agent
    self._callbacks_bound = True

    async def _say_cb(message: str, interruptible: bool = True) -> None:
        if hasattr(agent, "session") and agent.session:
            await agent.session.say(message, interruptible=interruptible)

    async def _ask_cb(instruction: str, interruptible: bool = False) -> None:
        if hasattr(agent, "session") and agent.session:
            await agent.session.reply(
                instruction, interruptible=interruptible, wait_for_playback=False,
            )

    def _hangup_cb() -> None:
        self._schedule_hangup(agent)

    self._graph.set_callbacks(say=_say_cb, ask=_ask_cb, hangup=_hangup_cb)
    try:
        room = agent.session._job_context.room
        sid = getattr(room, "_session_id", None)
        if sid:
            self._graph._session_id = sid
    except Exception:
        pass

Bind agent to the graph via callbacks

def stream_conversational_graph_response(self, current_content: str, state: Dict[str, Any]) ‑> Iterator[str]
Expand source code
def stream_conversational_graph_response(
    self, current_content: str, state: Dict[str, Any]
) -> Iterator[str]:
    """Stream parsed ``response_to_user`` characters for TTS."""
    return self._graph.stream_conversational_graph_response(current_content, state)

Stream parsed response_to_user characters for TTS.