Module agents.utils

Functions

def build_gemini_schema(function_tool: FunctionTool) ‑> google.genai.types.FunctionDeclaration
Expand source code
def build_gemini_schema(function_tool: FunctionTool) -> types.FunctionDeclaration:
    """Build Gemini-compatible schema from a function tool"""
    tool_info = get_tool_info(function_tool)
    
    parameter_json_schema_for_gemini: Optional[dict[str, Any]] = None

    if tool_info.parameters_schema is not None:
         if tool_info.parameters_schema and tool_info.parameters_schema.get("properties", True) is not None:
            simplified_schema = simplify_gemini_schema(tool_info.parameters_schema)
            parameter_json_schema_for_gemini = simplified_schema
    else:
        openai_schema = build_openai_schema(function_tool) 

        if openai_schema.get("parameters") and openai_schema["parameters"].get("properties", True) is not None:
             simplified_schema = simplify_gemini_schema(openai_schema["parameters"])
             parameter_json_schema_for_gemini = simplified_schema

    func_declaration = types.FunctionDeclaration(
        name=tool_info.name, 
        description=tool_info.description or "", 
        parameters=parameter_json_schema_for_gemini 
    )
    return func_declaration

Build Gemini-compatible schema from a function tool

def build_mcp_schema(function_tool: FunctionTool) ‑> dict
Expand source code
def build_mcp_schema(function_tool: FunctionTool) -> dict:
    """Convert function tool to MCP schema"""
    tool_info = get_tool_info(function_tool)
    return {
        "name": tool_info.name,
        "description": tool_info.description,
        "parameters": build_pydantic_args_model(function_tool).model_json_schema()
    }

Convert function tool to MCP schema

def build_nova_sonic_schema(function_tool: FunctionTool) ‑> dict[str, typing.Any]
Expand source code
def build_nova_sonic_schema(function_tool: FunctionTool) -> dict[str, Any]:
    """Build Amazon Nova Sonic-compatible schema from a function tool"""
    tool_info = get_tool_info(function_tool)

    params_schema_to_use: Optional[dict] = None

    if tool_info.parameters_schema is not None:
        params_schema_to_use = tool_info.parameters_schema
    else:
        model = build_pydantic_args_model(function_tool)
        params_schema_to_use = model.model_json_schema()
    

    final_params_schema_for_nova = params_schema_to_use if params_schema_to_use is not None else {"type": "object", "properties": {}}
    input_schema_json_string = json.dumps(final_params_schema_for_nova)

    description = tool_info.description or tool_info.name

    return {
        "toolSpec": {
            "name": tool_info.name,
            "description": description,
            "inputSchema": {
                "json": input_schema_json_string
            }
        }
    }

Build Amazon Nova Sonic-compatible schema from a function tool

def build_openai_schema(function_tool: FunctionTool) ‑> dict[str, typing.Any]
Expand source code
def build_openai_schema(function_tool: FunctionTool) -> dict[str, Any]:
    """Build OpenAI-compatible schema from a function tool"""
    tool_info = get_tool_info(function_tool)
    
    params_schema_to_use: Optional[dict] = None

    if tool_info.parameters_schema is not None:
        params_schema_to_use = tool_info.parameters_schema
    else:
        model = build_pydantic_args_model(function_tool)
        params_schema_to_use = model.model_json_schema()

    final_params_schema = params_schema_to_use if params_schema_to_use is not None else {"type": "object", "properties": {}}

    return {
            "name": tool_info.name,
            "description": tool_info.description or "",
            "parameters": final_params_schema,
            "type": "function",
    }

Build OpenAI-compatible schema from a function tool

def build_pydantic_args_model(func: Callable[..., Any]) ‑> type[pydantic.main.BaseModel]
Expand source code
def build_pydantic_args_model(func: Callable[..., Any]) -> type[BaseModel]:
    """
    Dynamically construct a Pydantic BaseModel class representing all
    valid positional arguments of the given function, complete with types,
    default values, and docstring descriptions.
    """
    return ModelBuilder(func).construct()

Dynamically construct a Pydantic BaseModel class representing all valid positional arguments of the given function, complete with types, default values, and docstring descriptions.

def create_generic_mcp_adapter(tool_name: str,
tool_description: str | None,
input_schema: dict,
client_call_function: Callable) ‑> FunctionTool
Expand source code
def create_generic_mcp_adapter(
    tool_name: str, 
    tool_description: str | None, 
    input_schema: dict,
    client_call_function: Callable
) -> FunctionTool:
    """
    Create a generic adapter that converts an MCP tool to a framework FunctionTool.
    
    Args:
        tool_name: Name of the MCP tool
        tool_description: Description of the MCP tool (if available)
        input_schema: JSON schema for the tool's input parameters
        client_call_function: Function to call the tool on the MCP server
        
    Returns:
        A function tool that can be registered with the agent
    """
    required_params = input_schema.get('required', [])
    
    param_properties = input_schema.get('properties', {})
    
    docstring = tool_description or f"Call the {tool_name} tool"
    if param_properties and "Args:" not in docstring:
        param_docs = "\n\nArgs:\n"
        for param_name, param_info in param_properties.items():
            required = " (required)" if param_name in required_params else ""
            description = param_info.get('description', f"Parameter for {tool_name}")
            param_docs += f"    {param_name}{required}: {description}\n"
        docstring += param_docs
    
    if not param_properties:
        @function_tool(name=tool_name)
        async def no_param_tool() -> Any:
            return await client_call_function({})
        no_param_tool.__doc__ = docstring
        tool_info_no_param = get_tool_info(no_param_tool)
        tool_info_no_param.parameters_schema = input_schema
        return no_param_tool
    else:
        @function_tool(name=tool_name) 
        async def param_tool(**kwargs) -> Any:
            actual_kwargs = kwargs.copy() # Work with a copy

            if 'instructions' in required_params and 'instructions' not in actual_kwargs:
                other_params_provided = any(p in actual_kwargs for p in param_properties if p != 'instructions')
                if other_params_provided:
                    actual_kwargs['instructions'] = f"Execute tool {tool_name} with the provided parameters."

            
            missing = [p for p in required_params if p not in actual_kwargs]
            if missing:
                missing_str = ", ".join(missing)
                param_details = []
                for param in missing:
                    param_info = param_properties.get(param, {})
                    desc = param_info.get('description', f"Parameter for {tool_name}")
                    param_details.append(f"'{param}': {desc}")
                
                param_help = "; ".join(param_details)
                raise ToolError(
                    f"Missing required parameters for {tool_name}: {missing_str}. "
                    f"Required parameters: {param_help}"
                )
            return await client_call_function(actual_kwargs)
        param_tool.__doc__ = docstring
        tool_info_param = get_tool_info(param_tool)
        tool_info_param.parameters_schema = input_schema
        return param_tool

Create a generic adapter that converts an MCP tool to a framework FunctionTool.

Args

tool_name
Name of the MCP tool
tool_description
Description of the MCP tool (if available)
input_schema
JSON schema for the tool's input parameters
client_call_function
Function to call the tool on the MCP server

Returns

A function tool that can be registered with the agent

def function_tool(func: Optional[Callable] = None, *, name: Optional[str] = None)
Expand source code
def function_tool(func: Optional[Callable] = None, *, name: Optional[str] = None):
    """Decorator to mark a function as a tool. Can be used with or without parentheses."""
    
    def create_wrapper(fn: Callable) -> FunctionTool:
        tool_info = FunctionToolInfo(
            name=name or fn.__name__,
            description=fn.__doc__
        )
        
        if asyncio.iscoroutinefunction(fn):
            @wraps(fn)
            async def async_wrapper(*args, **kwargs):
                return await fn(*args, **kwargs)
            
            setattr(async_wrapper, "_tool_info", tool_info)
            return async_wrapper
        else:
            @wraps(fn)
            def sync_wrapper(*args, **kwargs):
                return fn(*args, **kwargs)
            
            setattr(sync_wrapper, "_tool_info", tool_info)
            return sync_wrapper
    
    if func is None:
        return create_wrapper
    
    return create_wrapper(func)

Decorator to mark a function as a tool. Can be used with or without parentheses.

def get_tool_info(tool: FunctionTool) ‑> FunctionToolInfo
Expand source code
def get_tool_info(tool: FunctionTool) -> FunctionToolInfo:
    """Get the tool info from a function tool"""
    if not is_function_tool(tool):
        raise ValueError("Object is not a function tool")
    
    if inspect.ismethod(tool):
        tool = tool.__func__
    return getattr(tool, "_tool_info")

Get the tool info from a function tool

async def graceful_cancel(*tasks: asyncio.Task) ‑> None
Expand source code
async def graceful_cancel(*tasks: asyncio.Task) -> None:
    """Simple utility to cancel tasks and wait for them to complete"""
    if not tasks:
        return

    for task in tasks:
        if not task.done():
            task.cancel()
    
    try:
        await asyncio.wait_for(
            asyncio.gather(*tasks, return_exceptions=True),
            timeout=0.5
        )
    except asyncio.TimeoutError:
        pass

Simple utility to cancel tasks and wait for them to complete

def is_function_tool(obj: Any) ‑> bool
Expand source code
def is_function_tool(obj: Any) -> bool:
    """Check if an object is a function tool"""
    if inspect.ismethod(obj):
        obj = obj.__func__
    return hasattr(obj, "_tool_info")

Check if an object is a function tool

async def segment_text(chunks: AsyncIterator[str],
delimiters: str = '.?!,;:\n',
keep_delimiter: bool = True,
min_chars: int = 50,
min_words: int = 12,
max_buffer: int = 600) ‑> AsyncIterator[str]
Expand source code
async def segment_text(
    chunks: AsyncIterator[str],
    delimiters: str = ".?!,;:\n",
    keep_delimiter: bool = True,
    min_chars: int = 50,
    min_words: int = 12,
    max_buffer: int = 600,
) -> AsyncIterator[str]:
    """
    Segment an async stream of text on delimiters or soft boundaries to reduce TTS latency.
    Yields segments while keeping the delimiter if requested.
    """
    buffer = ""

    def words_count(s: str) -> int:
        return len(s.split())

    def find_first_delim_index(s: str) -> int:
        indices = [i for d in delimiters if (i := s.find(d)) != -1]
        return min(indices) if indices else -1

    async for chunk in chunks:
        if not chunk:
            continue
        buffer += chunk

        while True:
            di = find_first_delim_index(buffer)
            if di != -1:
                seg = buffer[: di + (1 if keep_delimiter else 0)]
                yield seg
                buffer = buffer[di + 1 :].lstrip()
                continue
            else:
                if len(buffer) >= max_buffer or words_count(buffer) >= (min_words * 2):
                    target = max(min_chars, min(len(buffer), max_buffer))
                    cut_idx = buffer.rfind(" ", 0, target)
                    if cut_idx == -1:
                        cut_idx = target
                    seg = buffer[:cut_idx].rstrip()
                    if seg:
                        yield seg
                    buffer = buffer[cut_idx:].lstrip()
                    continue
                break

    if buffer:
        yield buffer

Segment an async stream of text on delimiters or soft boundaries to reduce TTS latency. Yields segments while keeping the delimiter if requested.

def simplify_gemini_schema(schema: dict[str, Any]) ‑> dict[str, typing.Any] | None
Expand source code
def simplify_gemini_schema(schema: dict[str, Any]) -> dict[str, Any] | None:
    """
    Transforms a JSON Schema into Gemini compatible format.
    """

    TYPE_MAPPING: dict[str, types.Type] = {
        "string": types.Type.STRING,
        "number": types.Type.NUMBER,
        "integer": types.Type.INTEGER,
        "boolean": types.Type.BOOLEAN,
        "array": types.Type.ARRAY,
        "object": types.Type.OBJECT,
    }

    SUPPORTED_TYPES = set(TYPE_MAPPING.keys())
    FIELDS_TO_REMOVE = ("title", "default", "additionalProperties", "$defs")

    def process_node(node: dict[str, Any]) -> dict[str, Any] | None:
        new_node = node.copy()
        for field in FIELDS_TO_REMOVE:
            if field in new_node:
                del new_node[field]

        json_type = new_node.get("type")
        if json_type in SUPPORTED_TYPES:
            new_node["type"] = TYPE_MAPPING[json_type]

        node_type = new_node.get("type")
        if node_type == types.Type.OBJECT:
            if "properties" in new_node:
                new_props = {}
                for key, prop in new_node["properties"].items():
                    simplified = process_node(prop)
                    if simplified is not None:
                        new_props[key] = simplified
                new_node["properties"] = new_props
                if not new_props:
                    return None
            else:
                return None
        elif node_type == types.Type.ARRAY:
            if "items" in new_node:
                simplified_items = process_node(new_node["items"])
                if simplified_items is not None:
                    new_node["items"] = simplified_items
                else:
                    del new_node["items"]

        return new_node

    result = process_node(schema)
    if result and result.get("type") == types.Type.OBJECT and not result.get("properties"):
        return None
    return result

Transforms a JSON Schema into Gemini compatible format.

Classes

class AgentState (*args, **kwds)
Expand source code
@enum.unique
class AgentState(enum.Enum):
    STARTING = "starting"
    IDLE = "idle"
    SPEAKING = "speaking"
    LISTENING = "listening"
    THINKING = "thinking"
    CLOSING = "closing"

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Ancestors

  • enum.Enum

Class variables

var CLOSING
var IDLE
var LISTENING
var SPEAKING
var STARTING
var THINKING
class FieldBuilder (param: inspect.Parameter,
type_processor: TypeProcessor,
description: str | None)
Expand source code
class FieldBuilder:
    def __init__(self, param: inspect.Parameter, type_processor: TypeProcessor, description: str | None):
        self.param = param
        self.type_processor = type_processor
        self.description = description
    
    def build(self) -> tuple[Any, FieldInfo]:
        field_info = self.type_processor.field_info
        self._apply_default_if_needed(field_info)
        self._apply_description_if_needed(field_info)
        return (self.type_processor.base_type, field_info)
    
    def _apply_default_if_needed(self, field_info: FieldInfo) -> None:
        if (self.param.default is not inspect.Parameter.empty and 
            field_info.default is PydanticUndefined):
            field_info.default = self.param.default
    
    def _apply_description_if_needed(self, field_info: FieldInfo) -> None:
        if field_info.description is None and self.description is not None:
            field_info.description = self.description

Methods

def build(self) ‑> tuple[typing.Any, pydantic.fields.FieldInfo]
Expand source code
def build(self) -> tuple[Any, FieldInfo]:
    field_info = self.type_processor.field_info
    self._apply_default_if_needed(field_info)
    self._apply_description_if_needed(field_info)
    return (self.type_processor.base_type, field_info)
class FunctionTool (*args, **kwargs)
Expand source code
@runtime_checkable
class FunctionTool(Protocol):
    @property
    @abstractmethod
    def _tool_info(self) -> "FunctionToolInfo": ...
    def __call__(self, *args: Any, **kwargs: Any) -> Any: ...

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...

Ancestors

  • typing.Protocol
  • typing.Generic
class FunctionToolInfo (name: str,
description: str | None = None,
parameters_schema: Optional[dict] = None)
Expand source code
@dataclass
class FunctionToolInfo:
    name: str
    description: str | None = None
    parameters_schema: Optional[dict] = None

FunctionToolInfo(name: 'str', description: 'str | None' = None, parameters_schema: 'Optional[dict]' = None)

Instance variables

var description : str | None
var name : str
var parameters_schema : dict | None
class ModelBuilder (func: Callable[..., Any])
Expand source code
class ModelBuilder:
    def __init__(self, func: Callable[..., Any]):
        self.func = func
        self.sig = inspect.signature(func)
        self.hints = get_type_hints(func, include_extras=True)
        self.docs = parse_from_object(func)
        self._field_registry = {}
    
    def construct(self) -> type[BaseModel]:
        self._register_all_fields()
        return create_model(self._generate_class_name(), **self._field_registry)
    
    def _register_all_fields(self) -> None:
        for name, param in self.sig.parameters.items():
            if self._should_skip_param(name):
                continue
            self._register_field(name, param)
    
    def _should_skip_param(self, name: str) -> bool:
        return name in ("self", "cls") or name not in self.hints
    
    def _register_field(self, name: str, param: inspect.Parameter) -> None:
        type_info = TypeProcessor(self.hints[name])
        field_builder = FieldBuilder(param, type_info, self._find_doc_description(name))
        self._field_registry[name] = field_builder.build()
    
    def _find_doc_description(self, param_name: str) -> str | None:
        return next(
            (p.description for p in self.docs.params if p.arg_name == param_name),
            None
        )
    
    def _generate_class_name(self) -> str:
        return "".join(part.title() for part in self.func.__name__.split("_")) + "Args"

Methods

def construct(self) ‑> type[pydantic.main.BaseModel]
Expand source code
def construct(self) -> type[BaseModel]:
    self._register_all_fields()
    return create_model(self._generate_class_name(), **self._field_registry)
class RawFunctionTool (*args, **kwargs)
Expand source code
class RawFunctionTool(Protocol):
    """Protocol for raw function tool without framework wrapper"""
    def __call__(self, *args: Any, **kwargs: Any) -> Any: ...

Protocol for raw function tool without framework wrapper

Ancestors

  • typing.Protocol
  • typing.Generic
class ToolError (*args, **kwargs)
Expand source code
class ToolError(Exception):
    """Exception raised when a tool execution fails"""
    pass    

Exception raised when a tool execution fails

Ancestors

  • builtins.Exception
  • builtins.BaseException
class TypeProcessor (hint: Any)
Expand source code
class TypeProcessor:
    def __init__(self, hint: Any):
        self.original_hint = hint
        self.base_type = hint
        self.field_info = Field()
        self._process_annotation()
    
    def _process_annotation(self) -> None:
        if get_origin(self.original_hint) is Annotated:
            args = get_args(self.original_hint)
            self.base_type = args[0]
            self._extract_field_info_from_args(args[1:])
    
    def _extract_field_info_from_args(self, args: tuple) -> None:
        for arg in args:
            if isinstance(arg, FieldInfo):
                self.field_info = arg
                break
class UserState (*args, **kwds)
Expand source code
@enum.unique
class UserState(enum.Enum):
    IDLE = "idle"
    SPEAKING = "speaking"
    LISTENING = "listening"

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Ancestors

  • enum.Enum

Class variables

var IDLE
var LISTENING
var SPEAKING