Module agents.utils

Functions

def build_gemini_schema(function_tool: FunctionTool) ‑> google.genai.types.FunctionDeclaration
Expand source code
def build_gemini_schema(function_tool: FunctionTool) -> types.FunctionDeclaration:
    """Build Gemini-compatible schema from a function tool"""
    tool_info = get_tool_info(function_tool)
    
    parameter_json_schema_for_gemini: Optional[dict[str, Any]] = None

    if tool_info.parameters_schema is not None:
         if tool_info.parameters_schema and tool_info.parameters_schema.get("properties", True) is not None:
            simplified_schema = simplify_gemini_schema(tool_info.parameters_schema)
            parameter_json_schema_for_gemini = simplified_schema
    else:
        openai_schema = build_openai_schema(function_tool) 

        if openai_schema.get("parameters") and openai_schema["parameters"].get("properties", True) is not None:
             simplified_schema = simplify_gemini_schema(openai_schema["parameters"])
             parameter_json_schema_for_gemini = simplified_schema

    func_declaration = types.FunctionDeclaration(
        name=tool_info.name, 
        description=tool_info.description or "", 
        parameters=parameter_json_schema_for_gemini 
    )
    return func_declaration

Build Gemini-compatible schema from a function tool

def build_mcp_schema(function_tool: FunctionTool) ‑> dict
Expand source code
def build_mcp_schema(function_tool: FunctionTool) -> dict:
    """Convert function tool to MCP schema"""
    tool_info = get_tool_info(function_tool)
    return {
        "name": tool_info.name,
        "description": tool_info.description,
        "parameters": build_pydantic_args_model(function_tool).model_json_schema()
    }

Convert function tool to MCP schema

def build_nova_sonic_schema(function_tool: FunctionTool) ‑> dict[str, typing.Any]
Expand source code
def build_nova_sonic_schema(function_tool: FunctionTool) -> dict[str, Any]:
    """Build Amazon Nova Sonic-compatible schema from a function tool"""
    tool_info = get_tool_info(function_tool)

    params_schema_to_use: Optional[dict] = None

    if tool_info.parameters_schema is not None:
        params_schema_to_use = tool_info.parameters_schema
    else:
        model = build_pydantic_args_model(function_tool)
        params_schema_to_use = model.model_json_schema()
    

    final_params_schema_for_nova = params_schema_to_use if params_schema_to_use is not None else {"type": "object", "properties": {}}
    input_schema_json_string = json.dumps(final_params_schema_for_nova)

    description = tool_info.description or tool_info.name

    return {
        "toolSpec": {
            "name": tool_info.name,
            "description": description,
            "inputSchema": {
                "json": input_schema_json_string
            }
        }
    }

Build Amazon Nova Sonic-compatible schema from a function tool

def build_openai_schema(function_tool: FunctionTool) ‑> dict[str, typing.Any]
Expand source code
def build_openai_schema(function_tool: FunctionTool) -> dict[str, Any]:
    """Build OpenAI-compatible schema from a function tool"""
    tool_info = get_tool_info(function_tool)
    
    params_schema_to_use: Optional[dict] = None

    if tool_info.parameters_schema is not None:
        params_schema_to_use = tool_info.parameters_schema
    else:
        model = build_pydantic_args_model(function_tool)
        params_schema_to_use = model.model_json_schema()

    final_params_schema = params_schema_to_use if params_schema_to_use is not None else {"type": "object", "properties": {}}

    return {
            "name": tool_info.name,
            "description": tool_info.description or "",
            "parameters": final_params_schema,
            "type": "function",
    }

Build OpenAI-compatible schema from a function tool

def build_pipeline_config(*,
stt: Any | None,
llm: Any | None,
tts: Any | None,
vad: Any | None,
turn_detector: Any | None,
avatar: Any | None,
denoise: Any | None,
realtime_model: Any | None,
realtime_config_mode: str | None) ‑> PipelineConfig
Expand source code
def build_pipeline_config(
    *,
    stt: Any | None,
    llm: Any | None,
    tts: Any | None,
    vad: Any | None,
    turn_detector: Any | None,
    avatar: Any | None,
    denoise: Any | None,
    realtime_model: Any | None,
    realtime_config_mode: str | None,
) -> PipelineConfig:
    """
    Detect pipeline mode, realtime mode, and active components from
    the raw component references provided at Pipeline construction.
    """
    # Build active component set
    components: set[PipelineComponent] = set()
    if stt is not None:
        components.add(PipelineComponent.STT)
    if llm is not None:
        components.add(PipelineComponent.LLM)
    if tts is not None:
        components.add(PipelineComponent.TTS)
    if vad is not None:
        components.add(PipelineComponent.VAD)
    if turn_detector is not None:
        components.add(PipelineComponent.TURN_DETECTOR)
    if avatar is not None:
        components.add(PipelineComponent.AVATAR)
    if denoise is not None:
        components.add(PipelineComponent.DENOISE)
    if realtime_model is not None:
        components.add(PipelineComponent.REALTIME_MODEL)

    # Detect realtime mode
    realtime_mode: RealtimeMode | None = None
    is_realtime = False

    if realtime_model is not None:
        has_external_stt = stt is not None
        has_external_tts = tts is not None

        if realtime_config_mode:
            realtime_mode = RealtimeMode(realtime_config_mode)
            is_realtime = (realtime_mode != RealtimeMode.LLM_ONLY)
        elif has_external_stt and has_external_tts:
            realtime_mode = RealtimeMode.LLM_ONLY
            is_realtime = False
        elif has_external_stt:
            realtime_mode = RealtimeMode.HYBRID_STT
            is_realtime = True
        elif has_external_tts:
            realtime_mode = RealtimeMode.HYBRID_TTS
            is_realtime = True
        else:
            realtime_mode = RealtimeMode.FULL_S2S
            is_realtime = True

    # Detect pipeline mode
    if is_realtime:
        pipeline_mode = PipelineMode.REALTIME
    elif stt and llm and tts and vad and turn_detector:
        pipeline_mode = PipelineMode.FULL_CASCADING
    elif llm and tts and not stt:
        pipeline_mode = PipelineMode.LLM_TTS_ONLY
    elif stt and llm and not tts:
        pipeline_mode = PipelineMode.STT_LLM_ONLY
    elif llm and not stt and not tts:
        pipeline_mode = PipelineMode.LLM_ONLY
    elif stt and tts and not llm:
        pipeline_mode = PipelineMode.STT_TTS_ONLY
    elif stt and not llm and not tts:
        pipeline_mode = PipelineMode.STT_ONLY
    elif tts and not llm and not stt:
        pipeline_mode = PipelineMode.TTS_ONLY
    else:
        pipeline_mode = PipelineMode.PARTIAL_CASCADING

    return PipelineConfig(
        pipeline_mode=pipeline_mode,
        realtime_mode=realtime_mode,
        is_realtime=is_realtime,
        active_components=frozenset(components),
    )

Detect pipeline mode, realtime mode, and active components from the raw component references provided at Pipeline construction.

def build_pydantic_args_model(func: Callable[..., Any]) ‑> type[pydantic.main.BaseModel]
Expand source code
def build_pydantic_args_model(func: Callable[..., Any]) -> type[BaseModel]:
    """
    Dynamically construct a Pydantic BaseModel class representing all
    valid positional arguments of the given function, complete with types,
    default values, and docstring descriptions.
    """
    return ModelBuilder(func).construct()

Dynamically construct a Pydantic BaseModel class representing all valid positional arguments of the given function, complete with types, default values, and docstring descriptions.

async def cancel_and_wait(task: asyncio.Task | None) ‑> None
Expand source code
async def cancel_and_wait(task: asyncio.Task | None) -> None:
    """Cancel a task and wait for it to finish, suppressing CancelledError."""
    if not task or task.done():
        return
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

Cancel a task and wait for it to finish, suppressing CancelledError.

def create_generic_mcp_adapter(tool_name: str,
tool_description: str | None,
input_schema: dict,
client_call_function: Callable) ‑> FunctionTool
Expand source code
def create_generic_mcp_adapter(
    tool_name: str, 
    tool_description: str | None, 
    input_schema: dict,
    client_call_function: Callable
) -> FunctionTool:
    """
    Create a generic adapter that converts an MCP tool to a framework FunctionTool.
    
    Args:
        tool_name: Name of the MCP tool
        tool_description: Description of the MCP tool (if available)
        input_schema: JSON schema for the tool's input parameters
        client_call_function: Function to call the tool on the MCP server
        
    Returns:
        A function tool that can be registered with the agent
    """
    required_params = input_schema.get('required', [])
    
    param_properties = input_schema.get('properties', {})
    
    docstring = tool_description or f"Call the {tool_name} tool"
    if param_properties and "Args:" not in docstring:
        param_docs = "\n\nArgs:\n"
        for param_name, param_info in param_properties.items():
            required = " (required)" if param_name in required_params else ""
            description = param_info.get('description', f"Parameter for {tool_name}")
            param_docs += f"    {param_name}{required}: {description}\n"
        docstring += param_docs
    
    if not param_properties:
        @function_tool(name=tool_name)
        async def no_param_tool() -> Any:
            return await client_call_function({})
        no_param_tool.__doc__ = docstring
        tool_info_no_param = get_tool_info(no_param_tool)
        tool_info_no_param.parameters_schema = input_schema
        return no_param_tool
    else:
        @function_tool(name=tool_name) 
        async def param_tool(**kwargs) -> Any:
            actual_kwargs = kwargs.copy() # Work with a copy

            if 'instructions' in required_params and 'instructions' not in actual_kwargs:
                other_params_provided = any(p in actual_kwargs for p in param_properties if p != 'instructions')
                if other_params_provided:
                    actual_kwargs['instructions'] = f"Execute tool {tool_name} with the provided parameters."

            
            missing = [p for p in required_params if p not in actual_kwargs]
            if missing:
                missing_str = ", ".join(missing)
                param_details = []
                for param in missing:
                    param_info = param_properties.get(param, {})
                    desc = param_info.get('description', f"Parameter for {tool_name}")
                    param_details.append(f"'{param}': {desc}")
                
                param_help = "; ".join(param_details)
                raise ToolError(
                    f"Missing required parameters for {tool_name}: {missing_str}. "
                    f"Required parameters: {param_help}"
                )
            return await client_call_function(actual_kwargs)
        param_tool.__doc__ = docstring
        tool_info_param = get_tool_info(param_tool)
        tool_info_param.parameters_schema = input_schema
        return param_tool

Create a generic adapter that converts an MCP tool to a framework FunctionTool.

Args

tool_name
Name of the MCP tool
tool_description
Description of the MCP tool (if available)
input_schema
JSON schema for the tool's input parameters
client_call_function
Function to call the tool on the MCP server

Returns

A function tool that can be registered with the agent

def function_tool(func: Optional[Callable] = None, *, name: Optional[str] = None)
Expand source code
def function_tool(func: Optional[Callable] = None, *, name: Optional[str] = None):
    """Decorator to mark a function as a tool. Can be used with or without parentheses."""
    
    def create_wrapper(fn: Callable) -> FunctionTool:
        tool_info = FunctionToolInfo(
            name=name or fn.__name__,
            description=fn.__doc__
        )
        
        if asyncio.iscoroutinefunction(fn):
            @wraps(fn)
            async def async_wrapper(*args, **kwargs):
                return await fn(*args, **kwargs)
            
            setattr(async_wrapper, "_tool_info", tool_info)
            return async_wrapper
        else:
            @wraps(fn)
            def sync_wrapper(*args, **kwargs):
                return fn(*args, **kwargs)
            
            setattr(sync_wrapper, "_tool_info", tool_info)
            return sync_wrapper
    
    if func is None:
        return create_wrapper
    
    return create_wrapper(func)

Decorator to mark a function as a tool. Can be used with or without parentheses.

def get_tool_info(tool: FunctionTool) ‑> FunctionToolInfo
Expand source code
def get_tool_info(tool: FunctionTool) -> FunctionToolInfo:
    """Get the tool info from a function tool"""
    if not is_function_tool(tool):
        raise ValueError("Object is not a function tool")
    
    if inspect.ismethod(tool):
        tool = tool.__func__
    return getattr(tool, "_tool_info")

Get the tool info from a function tool

async def graceful_cancel(*tasks: asyncio.Task) ‑> None
Expand source code
async def graceful_cancel(*tasks: asyncio.Task) -> None:
    """Simple utility to cancel tasks and wait for them to complete"""
    if not tasks:
        return

    for task in tasks:
        if not task.done():
            task.cancel()
    
    try:
        await asyncio.wait_for(
            asyncio.gather(*tasks, return_exceptions=True),
            timeout=0.5
        )
    except asyncio.TimeoutError:
        pass

Simple utility to cancel tasks and wait for them to complete

def is_function_tool(obj: Any) ‑> bool
Expand source code
def is_function_tool(obj: Any) -> bool:
    """Check if an object is a function tool"""
    if inspect.ismethod(obj):
        obj = obj.__func__
    return hasattr(obj, "_tool_info")

Check if an object is a function tool

async def run_stt(audio_stream: AsyncIterator[bytes]) ‑> AsyncIterator[Any]
Expand source code
async def run_stt(audio_stream: AsyncIterator[bytes]) -> AsyncIterator[Any]:
    """
    Run STT on an audio stream.
    
    Delegates to the STT component's stream_transcribe method.
    
    Args:
        audio_stream: Async iterator of audio bytes
        
    Yields:
        SpeechEvent objects (with text, etc.)
    """
    from .job import get_current_job_context
    ctx = get_current_job_context()
    if not ctx or not ctx._pipeline or not ctx._pipeline.stt:
        raise RuntimeError("No STT component available in current context")
    
    async for event in ctx._pipeline.stt.stream_transcribe(audio_stream):
        yield event

Run STT on an audio stream.

Delegates to the STT component's stream_transcribe method.

Args

audio_stream
Async iterator of audio bytes

Yields

SpeechEvent objects (with text, etc.)

async def run_tts(text_stream: AsyncIterator[str]) ‑> AsyncIterator[bytes]
Expand source code
async def run_tts(text_stream: AsyncIterator[str]) -> AsyncIterator[bytes]:
    """
    Run TTS on a text stream.
    
    Delegates to the TTS component's stream_synthesize method.
    
    Args:
        text_stream: Async iterator of text
        
    Yields:
        Audio bytes
    """
    from .job import get_current_job_context
    ctx = get_current_job_context()
    if not ctx or not ctx._pipeline or not ctx._pipeline.tts:
        raise RuntimeError("No TTS component available in current context")
    
    async for frame in ctx._pipeline.tts.stream_synthesize(text_stream):
        yield frame

Run TTS on a text stream.

Delegates to the TTS component's stream_synthesize method.

Args

text_stream
Async iterator of text

Yields

Audio bytes

async def segment_text(chunks: AsyncIterator[str],
delimiters: str = '.?!,;:\n',
keep_delimiter: bool = True,
min_chars: int = 50,
min_words: int = 12,
max_buffer: int = 600) ‑> AsyncIterator[str]
Expand source code
async def segment_text(
    chunks: AsyncIterator[str],
    delimiters: str = ".?!,;:\n",
    keep_delimiter: bool = True,
    min_chars: int = 50,
    min_words: int = 12,
    max_buffer: int = 600,
) -> AsyncIterator[str]:
    """
    Segment an async stream of text on delimiters or soft boundaries to reduce TTS latency.
    Yields segments while keeping the delimiter if requested.
    """
    buffer = ""

    def words_count(s: str) -> int:
        return len(s.split())

    def find_first_delim_index(s: str) -> int:
        indices = [i for d in delimiters if (i := s.find(d)) != -1]
        return min(indices) if indices else -1

    async for chunk in chunks:
        if not chunk:
            continue
        buffer += chunk

        while True:
            di = find_first_delim_index(buffer)
            if di != -1:
                seg = buffer[: di + (1 if keep_delimiter else 0)]
                yield seg
                buffer = buffer[di + 1 :].lstrip()
                continue
            else:
                if len(buffer) >= max_buffer or words_count(buffer) >= (min_words * 2):
                    target = max(min_chars, min(len(buffer), max_buffer))
                    cut_idx = buffer.rfind(" ", 0, target)
                    if cut_idx == -1:
                        cut_idx = target
                    seg = buffer[:cut_idx].rstrip()
                    if seg:
                        yield seg
                    buffer = buffer[cut_idx:].lstrip()
                    continue
                break

    if buffer:
        yield buffer

Segment an async stream of text on delimiters or soft boundaries to reduce TTS latency. Yields segments while keeping the delimiter if requested.

def simplify_gemini_schema(schema: dict[str, Any]) ‑> dict[str, typing.Any] | None
Expand source code
def simplify_gemini_schema(schema: dict[str, Any]) -> dict[str, Any] | None:
    """
    Transforms a JSON Schema into Gemini compatible format.
    """

    TYPE_MAPPING: dict[str, types.Type] = {
        "string": types.Type.STRING,
        "number": types.Type.NUMBER,
        "integer": types.Type.INTEGER,
        "boolean": types.Type.BOOLEAN,
        "array": types.Type.ARRAY,
        "object": types.Type.OBJECT,
    }

    SUPPORTED_TYPES = set(TYPE_MAPPING.keys())
    FIELDS_TO_REMOVE = ("title", "default", "additionalProperties", "$defs", "$schema")

    def process_node(node: dict[str, Any]) -> dict[str, Any] | None:
        new_node = node.copy()
        for field in FIELDS_TO_REMOVE:
            if field in new_node:
                del new_node[field]

        json_type = new_node.get("type")
        if json_type in SUPPORTED_TYPES:
            new_node["type"] = TYPE_MAPPING[json_type]

        node_type = new_node.get("type")
        if node_type == types.Type.OBJECT:
            if "properties" in new_node:
                new_props = {}
                for key, prop in new_node["properties"].items():
                    simplified = process_node(prop)
                    if simplified is not None:
                        new_props[key] = simplified
                new_node["properties"] = new_props
                if not new_props:
                    return None
            else:
                return None
        elif node_type == types.Type.ARRAY:
            if "items" in new_node:
                simplified_items = process_node(new_node["items"])
                if simplified_items is not None:
                    new_node["items"] = simplified_items
                else:
                    del new_node["items"]

        return new_node

    result = process_node(schema)
    if result and result.get("type") == types.Type.OBJECT and not result.get("properties"):
        return None
    return result

Transforms a JSON Schema into Gemini compatible format.

Classes

class AgentState (*args, **kwds)
Expand source code
@enum.unique
class AgentState(enum.Enum):
    """Represents the current state of the agent in a conversation session."""
    STARTING = "starting"
    IDLE = "idle"
    SPEAKING = "speaking"
    LISTENING = "listening"
    THINKING = "thinking"
    CLOSING = "closing"

Represents the current state of the agent in a conversation session.

Ancestors

  • enum.Enum

Class variables

var CLOSING
var IDLE
var LISTENING
var SPEAKING
var STARTING
var THINKING
class AsyncIteratorQueue
Expand source code
class AsyncIteratorQueue:
    """An async iterator backed by an asyncio.Queue, allowing producers to put items and consumers to async-iterate until closed."""
    def __init__(self):
        self.queue = asyncio.Queue()
        self.closed = False

    async def put(self, item):
        if not self.closed:
            await self.queue.put(item)

    def close(self):
        self.closed = True
        self.queue.put_nowait(None)

    def __aiter__(self):
        return self

    async def __anext__(self):
        item = await self.queue.get()
        if item is None:
            raise StopAsyncIteration
        return item

An async iterator backed by an asyncio.Queue, allowing producers to put items and consumers to async-iterate until closed.

Methods

def close(self)
Expand source code
def close(self):
    self.closed = True
    self.queue.put_nowait(None)
async def put(self, item)
Expand source code
async def put(self, item):
    if not self.closed:
        await self.queue.put(item)
class AudioByteStream (sample_rate: int, num_channels: int, samples_per_channel: int)
Expand source code
class AudioByteStream:
    """
    Buffers raw PCM bytes and emits fixed-size AudioFrame objects.

    Useful for converting variable-size chunks (e.g. data-channel payloads)
    into the steady frame cadence that AvatarSynchronizer expects.
    """

    def __init__(self, sample_rate: int, num_channels: int, samples_per_channel: int):
        import numpy as _np
        self._np = _np
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._samples_per_channel = samples_per_channel
        self._sample_width = 2  
        self._bytes_per_frame = samples_per_channel * num_channels * self._sample_width
        self._buffer = bytearray()

    def push(self, frame_data: bytes) -> list:
        from av import AudioFrame
        self._buffer.extend(frame_data)
        frames = []
        while len(self._buffer) >= self._bytes_per_frame:
            chunk = bytes(self._buffer[: self._bytes_per_frame])
            del self._buffer[: self._bytes_per_frame]
            arr = self._np.frombuffer(chunk, dtype=self._np.int16).reshape(
                self._num_channels, self._samples_per_channel
            )
            frame = AudioFrame.from_ndarray(
                arr,
                format="s16",
                layout="mono" if self._num_channels == 1 else "stereo",
            )
            frame.sample_rate = self._sample_rate
            frames.append(frame)
        return frames

    def flush(self) -> list:
        from av import AudioFrame
        if not self._buffer:
            return []
        chunk = bytes(self._buffer)
        self._buffer.clear()
        missing = self._bytes_per_frame - len(chunk)
        if missing > 0:
            chunk += bytes(missing)
        arr = self._np.frombuffer(chunk, dtype=self._np.int16).reshape(
            self._samples_per_channel, self._num_channels
        )
        frame = AudioFrame.from_ndarray(
            arr.T,
            format="s16",
            layout="mono" if self._num_channels == 1 else "stereo",
        )
        frame.sample_rate = self._sample_rate
        return [frame]

Buffers raw PCM bytes and emits fixed-size AudioFrame objects.

Useful for converting variable-size chunks (e.g. data-channel payloads) into the steady frame cadence that AvatarSynchronizer expects.

Methods

def flush(self) ‑> list
Expand source code
def flush(self) -> list:
    from av import AudioFrame
    if not self._buffer:
        return []
    chunk = bytes(self._buffer)
    self._buffer.clear()
    missing = self._bytes_per_frame - len(chunk)
    if missing > 0:
        chunk += bytes(missing)
    arr = self._np.frombuffer(chunk, dtype=self._np.int16).reshape(
        self._samples_per_channel, self._num_channels
    )
    frame = AudioFrame.from_ndarray(
        arr.T,
        format="s16",
        layout="mono" if self._num_channels == 1 else "stereo",
    )
    frame.sample_rate = self._sample_rate
    return [frame]
def push(self, frame_data: bytes) ‑> list
Expand source code
def push(self, frame_data: bytes) -> list:
    from av import AudioFrame
    self._buffer.extend(frame_data)
    frames = []
    while len(self._buffer) >= self._bytes_per_frame:
        chunk = bytes(self._buffer[: self._bytes_per_frame])
        del self._buffer[: self._bytes_per_frame]
        arr = self._np.frombuffer(chunk, dtype=self._np.int16).reshape(
            self._num_channels, self._samples_per_channel
        )
        frame = AudioFrame.from_ndarray(
            arr,
            format="s16",
            layout="mono" if self._num_channels == 1 else "stereo",
        )
        frame.sample_rate = self._sample_rate
        frames.append(frame)
    return frames
class FieldBuilder (param: inspect.Parameter,
type_processor: TypeProcessor,
description: str | None)
Expand source code
class FieldBuilder:
    """Builds a Pydantic field tuple from a function parameter, its processed type, and docstring description."""
    def __init__(self, param: inspect.Parameter, type_processor: TypeProcessor, description: str | None):
        self.param = param
        self.type_processor = type_processor
        self.description = description
    
    def build(self) -> tuple[Any, FieldInfo]:
        field_info = self.type_processor.field_info
        self._apply_default_if_needed(field_info)
        self._apply_description_if_needed(field_info)
        return (self.type_processor.base_type, field_info)
    
    def _apply_default_if_needed(self, field_info: FieldInfo) -> None:
        if (self.param.default is not inspect.Parameter.empty and 
            field_info.default is PydanticUndefined):
            field_info.default = self.param.default
    
    def _apply_description_if_needed(self, field_info: FieldInfo) -> None:
        if field_info.description is None and self.description is not None:
            field_info.description = self.description

Builds a Pydantic field tuple from a function parameter, its processed type, and docstring description.

Methods

def build(self) ‑> tuple[typing.Any, pydantic.fields.FieldInfo]
Expand source code
def build(self) -> tuple[Any, FieldInfo]:
    field_info = self.type_processor.field_info
    self._apply_default_if_needed(field_info)
    self._apply_description_if_needed(field_info)
    return (self.type_processor.base_type, field_info)
class FunctionTool (*args, **kwargs)
Expand source code
@runtime_checkable
class FunctionTool(Protocol):
    @property
    @abstractmethod
    def _tool_info(self) -> "FunctionToolInfo": ...
    def __call__(self, *args: Any, **kwargs: Any) -> Any: ...

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...

Ancestors

  • typing.Protocol
  • typing.Generic
class FunctionToolInfo (name: str,
description: str | None = None,
parameters_schema: Optional[dict] = None)
Expand source code
@dataclass
class FunctionToolInfo:
    """Holds metadata about a function tool, including its name, description, and parameter schema."""
    name: str
    description: str | None = None
    parameters_schema: Optional[dict] = None

Holds metadata about a function tool, including its name, description, and parameter schema.

Instance variables

var description : str | None
var name : str
var parameters_schema : dict | None
class ModelBuilder (func: Callable[..., Any])
Expand source code
class ModelBuilder:
    """Constructs a Pydantic BaseModel from a function's signature, type hints, and docstring."""
    def __init__(self, func: Callable[..., Any]):
        self.func = func
        self.sig = inspect.signature(func)
        self.hints = get_type_hints(func, include_extras=True)
        self.docs = parse_from_object(func)
        self._field_registry = {}
    
    def construct(self) -> type[BaseModel]:
        self._register_all_fields()
        return create_model(self._generate_class_name(), **self._field_registry)
    
    def _register_all_fields(self) -> None:
        for name, param in self.sig.parameters.items():
            if self._should_skip_param(name):
                continue
            self._register_field(name, param)
    
    def _should_skip_param(self, name: str) -> bool:
        return name in ("self", "cls") or name not in self.hints
    
    def _register_field(self, name: str, param: inspect.Parameter) -> None:
        type_info = TypeProcessor(self.hints[name])
        field_builder = FieldBuilder(param, type_info, self._find_doc_description(name))
        self._field_registry[name] = field_builder.build()
    
    def _find_doc_description(self, param_name: str) -> str | None:
        return next(
            (p.description for p in self.docs.params if p.arg_name == param_name),
            None
        )
    
    def _generate_class_name(self) -> str:
        return "".join(part.title() for part in self.func.__name__.split("_")) + "Args"

Constructs a Pydantic BaseModel from a function's signature, type hints, and docstring.

Methods

def construct(self) ‑> type[pydantic.main.BaseModel]
Expand source code
def construct(self) -> type[BaseModel]:
    self._register_all_fields()
    return create_model(self._generate_class_name(), **self._field_registry)
class PipelineComponent (*args, **kwds)
Expand source code
@enum.unique
class PipelineComponent(enum.Enum):
    """Identifiers for each component slot in the pipeline."""
    STT = "stt"
    LLM = "llm"
    TTS = "tts"
    VAD = "vad"
    TURN_DETECTOR = "turn_detector"
    AVATAR = "avatar"
    DENOISE = "denoise"
    REALTIME_MODEL = "realtime_model"

Identifiers for each component slot in the pipeline.

Ancestors

  • enum.Enum

Class variables

var AVATAR
var DENOISE
var LLM
var REALTIME_MODEL
var STT
var TTS
var TURN_DETECTOR
var VAD
class PipelineConfig (pipeline_mode: PipelineMode,
realtime_mode: RealtimeMode | None,
is_realtime: bool,
active_components: frozenset[PipelineComponent])
Expand source code
@dataclass(frozen=True)
class PipelineConfig:
    """
    Immutable snapshot of the pipeline's detected configuration.
    Computed once during Pipeline.__init__ and accessible everywhere via pipeline.config.
    """
    pipeline_mode: PipelineMode
    realtime_mode: RealtimeMode | None
    is_realtime: bool
    active_components: frozenset[PipelineComponent]

    def has_component(self, component: PipelineComponent) -> bool:
        """Check whether a specific component is present."""
        return component in self.active_components

    @property
    def component_names(self) -> list[str]:
        """Return sorted list of active component value strings (for metrics/logging)."""
        return sorted(c.value for c in self.active_components)

Immutable snapshot of the pipeline's detected configuration. Computed once during Pipeline.init and accessible everywhere via pipeline.config.

Instance variables

var active_components : frozenset[PipelineComponent]
prop component_names : list[str]
Expand source code
@property
def component_names(self) -> list[str]:
    """Return sorted list of active component value strings (for metrics/logging)."""
    return sorted(c.value for c in self.active_components)

Return sorted list of active component value strings (for metrics/logging).

var is_realtime : bool
var pipeline_modePipelineMode
var realtime_modeRealtimeMode | None

Methods

def has_component(self,
component: PipelineComponent) ‑> bool
Expand source code
def has_component(self, component: PipelineComponent) -> bool:
    """Check whether a specific component is present."""
    return component in self.active_components

Check whether a specific component is present.

class PipelineMode (*args, **kwds)
Expand source code
@enum.unique
class PipelineMode(enum.Enum):
    """The overall pipeline architecture mode based on which components are present."""
    REALTIME = "realtime"
    FULL_CASCADING = "full_cascading"
    LLM_TTS_ONLY = "llm_tts_only"
    STT_LLM_ONLY = "stt_llm_only"
    LLM_ONLY = "llm_only"
    STT_ONLY = "stt_only"
    TTS_ONLY = "tts_only"
    STT_TTS_ONLY = "stt_tts_only"
    HYBRID = "hybrid"
    PARTIAL_CASCADING = "partial_cascading"

The overall pipeline architecture mode based on which components are present.

Ancestors

  • enum.Enum

Class variables

var FULL_CASCADING
var HYBRID
var LLM_ONLY
var LLM_TTS_ONLY
var PARTIAL_CASCADING
var REALTIME
var STT_LLM_ONLY
var STT_ONLY
var STT_TTS_ONLY
var TTS_ONLY
class RawFunctionTool (*args, **kwargs)
Expand source code
class RawFunctionTool(Protocol):
    """Protocol for raw function tool without framework wrapper"""
    def __call__(self, *args: Any, **kwargs: Any) -> Any: ...

Protocol for raw function tool without framework wrapper

Ancestors

  • typing.Protocol
  • typing.Generic
class RealtimeMode (*args, **kwds)
Expand source code
@enum.unique
class RealtimeMode(enum.Enum):
    """The realtime sub-mode when a RealtimeBaseModel is used as the LLM."""
    FULL_S2S = "full_s2s"
    HYBRID_STT = "hybrid_stt"
    HYBRID_TTS = "hybrid_tts"
    LLM_ONLY = "llm_only"

The realtime sub-mode when a RealtimeBaseModel is used as the LLM.

Ancestors

  • enum.Enum

Class variables

var FULL_S2S
var HYBRID_STT
var HYBRID_TTS
var LLM_ONLY
class ToolError (*args, **kwargs)
Expand source code
class ToolError(Exception):
    """Exception raised when a tool execution fails"""
    pass    

Exception raised when a tool execution fails

Ancestors

  • builtins.Exception
  • builtins.BaseException
class TypeProcessor (hint: Any)
Expand source code
class TypeProcessor:
    """Extracts the base type and Pydantic FieldInfo from a possibly Annotated type hint."""
    def __init__(self, hint: Any):
        self.original_hint = hint
        self.base_type = hint
        self.field_info = Field()
        self._process_annotation()
    
    def _process_annotation(self) -> None:
        if get_origin(self.original_hint) is Annotated:
            args = get_args(self.original_hint)
            self.base_type = args[0]
            self._extract_field_info_from_args(args[1:])
    
    def _extract_field_info_from_args(self, args: tuple) -> None:
        for arg in args:
            if isinstance(arg, FieldInfo):
                self.field_info = arg
                break

Extracts the base type and Pydantic FieldInfo from a possibly Annotated type hint.

class UserState (*args, **kwds)
Expand source code
@enum.unique
class UserState(enum.Enum):
    """Represents the current state of the user in a conversation session."""
    IDLE = "idle"
    SPEAKING = "speaking"
    LISTENING = "listening"

Represents the current state of the user in a conversation session.

Ancestors

  • enum.Enum

Class variables

var IDLE
var LISTENING
var SPEAKING