Module agents.graph_adapter
Functions
def prepare_strict_schema(schema_dict)-
Expand source code
def prepare_strict_schema(schema_dict): if isinstance(schema_dict, dict): if schema_dict.get("type") == "object": schema_dict["additionalProperties"] = False if "properties" in schema_dict: all_props = list(schema_dict["properties"].keys()) schema_dict["required"] = all_props for key, value in schema_dict.items(): if isinstance(value, dict): prepare_strict_schema(value) elif isinstance(value, list): for item in value: if isinstance(item, dict): prepare_strict_schema(item) return schema_dict
Classes
class ConversationalGraphResponse (**data: Any)-
Expand source code
class ConversationalGraphResponse(BaseModel): """Structured LLM response format used when a conversational graph is active.""" response_to_user: str = Field(..., description="Response to the user by agent") extracted_values: List[ExtractedField] = Field( default_factory=list, description="List of extracted values from the user input", )Structured LLM response format used when a conversational graph is active.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var extracted_values : List[ExtractedField]var model_configvar response_to_user : str
class ExtractedField (**data: Any)-
Expand source code
class ExtractedField(BaseModel): key: str = Field(..., description="The name of the field") value: Union[str, int, float, bool] = Field(..., description="The value of the field")Usage Documentation
A base class for creating Pydantic models.
Attributes
__class_vars__- The names of the class variables defined on the model.
__private_attributes__- Metadata about the private attributes of the model.
__signature__- The synthesized
__init__[Signature][inspect.Signature] of the model. __pydantic_complete__- Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__- The core schema of the model.
__pydantic_custom_init__- Whether the model has a custom
__init__function. __pydantic_decorators__- Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. __pydantic_generic_metadata__- A dictionary containing metadata about generic Pydantic models.
The
originandargsitems map to the [__origin__][genericalias.origin] and [__args__][genericalias.args] attributes of [generic aliases][types-genericalias], and theparameteritem maps to the__parameter__attribute of generic classes. __pydantic_parent_namespace__- Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__- The name of the post-init method for the model, if defined.
__pydantic_root_model__- Whether the model is a [
RootModel][pydantic.root_model.RootModel]. __pydantic_serializer__- The
pydantic-coreSchemaSerializerused to dump instances of the model. __pydantic_validator__- The
pydantic-coreSchemaValidatorused to validate instances of the model. __pydantic_fields__- A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. __pydantic_computed_fields__- A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. __pydantic_extra__- A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. __pydantic_fields_set__- The names of fields explicitly set during instantiation.
__pydantic_private__- Values of private attributes set on the model instance.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var key : strvar model_configvar value : str | int | float | bool
class GraphPipelineAdapter (graph: Any)-
Expand source code
class GraphPipelineAdapter: """`ConversationalGraph` instance with the agent pipeline.""" def __init__(self, graph: Any) -> None: self._graph = graph self._agent: Any = None self._callbacks_bound: bool = False self._conversational_graph_schema_cache = None @property def has_agent(self) -> bool: """Check whether an agent is already bound.""" return self._agent is not None def set_agent(self, agent: Any) -> None: """Bind *agent* to the graph via callbacks""" if self._callbacks_bound: return self._agent = agent self._callbacks_bound = True async def _say_cb(message: str, interruptible: bool = True) -> None: if hasattr(agent, "session") and agent.session: await agent.session.say(message, interruptible=interruptible) async def _ask_cb(instruction: str, interruptible: bool = False) -> None: if hasattr(agent, "session") and agent.session: await agent.session.reply( instruction, interruptible=interruptible, wait_for_playback=False, ) def _hangup_cb() -> None: self._schedule_hangup(agent) self._graph.set_callbacks(say=_say_cb, ask=_ask_cb, hangup=_hangup_cb) try: room = agent.session._job_context.room sid = getattr(room, "_session_id", None) if sid: self._graph._session_id = sid except Exception: pass def _schedule_hangup(self, agent: Any) -> None: """Hang up exactly when the final TTS stream ends.""" pipeline = ( agent.session.pipeline if hasattr(agent, "session") and agent.session and hasattr(agent.session, "pipeline") else None ) if pipeline and hasattr(pipeline, "on"): fired = False async def _on_tts_end(): nonlocal fired if fired: return fired = True if hasattr(agent, "hangup"): try: await agent.hangup() except Exception as exc: logger.warning("[GRAPH] agent.hangup() failed: %s", exc) pipeline.on("agent_turn_end")(_on_tts_end) else: asyncio.ensure_future(self._hangup_after_timer(agent)) async def _hangup_after_timer(self, agent: Any) -> None: delay = getattr(self._graph.config, "hangup_delay", 4.0) await asyncio.sleep(delay) if hasattr(agent, "hangup"): try: await agent.hangup() except Exception as exc: logger.warning("[GRAPH] agent.hangup() failed: %s", exc) def get_system_instructions(self) -> str: """Return graph-specific system instructions to prepend to the agent prompt.""" return self._graph.get_system_instructions() async def handle_input(self, text: str) -> tuple[str | None, bool]: """ Pre-process user text through the graph. """ result = await self._graph.handle_input(text) if result == _END_STREAM: return None, True return result, False async def handle_decision(self, agent: Any, graph_response: str) -> tuple[str | None, bool]: """Post-process the LLM's structured response through the graph.""" if not self._callbacks_bound and agent: self.set_agent(agent) result = await self._graph.handle_decision(graph_response) if result == _END_STREAM: return None, True return result, False def stream_conversational_graph_response( self, current_content: str, state: Dict[str, Any] ) -> Iterator[str]: """Stream parsed ``response_to_user`` characters for TTS.""" return self._graph.stream_conversational_graph_response(current_content, state) @staticmethod def get_response_schema() -> dict: """Return the JSON schema that LLM providers use as ``response_format``.""" return ConversationalGraphResponse.model_json_schema() def _get_graph_schema(self): """Get the prepared strict schema from the graph adapter, cached after first call.""" if self._conversational_graph_schema_cache is None: self._conversational_graph_schema_cache = prepare_strict_schema(self.get_response_schema()) return self._conversational_graph_schema_cacheConversationalGraphinstance with the agent pipeline.Static methods
def get_response_schema() ‑> dict-
Expand source code
@staticmethod def get_response_schema() -> dict: """Return the JSON schema that LLM providers use as ``response_format``.""" return ConversationalGraphResponse.model_json_schema()Return the JSON schema that LLM providers use as
response_format.
Instance variables
prop has_agent : bool-
Expand source code
@property def has_agent(self) -> bool: """Check whether an agent is already bound.""" return self._agent is not NoneCheck whether an agent is already bound.
Methods
def get_system_instructions(self) ‑> str-
Expand source code
def get_system_instructions(self) -> str: """Return graph-specific system instructions to prepend to the agent prompt.""" return self._graph.get_system_instructions()Return graph-specific system instructions to prepend to the agent prompt.
async def handle_decision(self, agent: Any, graph_response: str) ‑> tuple[str | None, bool]-
Expand source code
async def handle_decision(self, agent: Any, graph_response: str) -> tuple[str | None, bool]: """Post-process the LLM's structured response through the graph.""" if not self._callbacks_bound and agent: self.set_agent(agent) result = await self._graph.handle_decision(graph_response) if result == _END_STREAM: return None, True return result, FalsePost-process the LLM's structured response through the graph.
async def handle_input(self, text: str) ‑> tuple[str | None, bool]-
Expand source code
async def handle_input(self, text: str) -> tuple[str | None, bool]: """ Pre-process user text through the graph. """ result = await self._graph.handle_input(text) if result == _END_STREAM: return None, True return result, FalsePre-process user text through the graph.
def set_agent(self, agent: Any) ‑> None-
Expand source code
def set_agent(self, agent: Any) -> None: """Bind *agent* to the graph via callbacks""" if self._callbacks_bound: return self._agent = agent self._callbacks_bound = True async def _say_cb(message: str, interruptible: bool = True) -> None: if hasattr(agent, "session") and agent.session: await agent.session.say(message, interruptible=interruptible) async def _ask_cb(instruction: str, interruptible: bool = False) -> None: if hasattr(agent, "session") and agent.session: await agent.session.reply( instruction, interruptible=interruptible, wait_for_playback=False, ) def _hangup_cb() -> None: self._schedule_hangup(agent) self._graph.set_callbacks(say=_say_cb, ask=_ask_cb, hangup=_hangup_cb) try: room = agent.session._job_context.room sid = getattr(room, "_session_id", None) if sid: self._graph._session_id = sid except Exception: passBind agent to the graph via callbacks
def stream_conversational_graph_response(self, current_content: str, state: Dict[str, Any]) ‑> Iterator[str]-
Expand source code
def stream_conversational_graph_response( self, current_content: str, state: Dict[str, Any] ) -> Iterator[str]: """Stream parsed ``response_to_user`` characters for TTS.""" return self._graph.stream_conversational_graph_response(current_content, state)Stream parsed
response_to_usercharacters for TTS.