Module agents.utils

Functions

def build_gemini_schema(function_tool: FunctionTool) ‑> google.genai.types.FunctionDeclaration
Expand source code
def build_gemini_schema(function_tool: FunctionTool) -> types.FunctionDeclaration:
    """Build Gemini-compatible schema from a function tool"""
    tool_info = get_tool_info(function_tool)
    
    parameter_json_schema_for_gemini: Optional[dict[str, Any]] = None

    if tool_info.parameters_schema is not None:
         if tool_info.parameters_schema and tool_info.parameters_schema.get("properties", True) is not None:
            simplified_schema = simplify_gemini_schema(tool_info.parameters_schema)
            parameter_json_schema_for_gemini = simplified_schema
    else:
        openai_schema = build_openai_schema(function_tool) 

        if openai_schema.get("parameters") and openai_schema["parameters"].get("properties", True) is not None:
             simplified_schema = simplify_gemini_schema(openai_schema["parameters"])
             parameter_json_schema_for_gemini = simplified_schema

    func_declaration = types.FunctionDeclaration(
        name=tool_info.name, 
        description=tool_info.description or "", 
        parameters=parameter_json_schema_for_gemini 
    )
    return func_declaration

Build Gemini-compatible schema from a function tool

def build_mcp_schema(function_tool: FunctionTool) ‑> dict
Expand source code
def build_mcp_schema(function_tool: FunctionTool) -> dict:
    """Convert function tool to MCP schema"""
    tool_info = get_tool_info(function_tool)
    return {
        "name": tool_info.name,
        "description": tool_info.description,
        "parameters": build_pydantic_args_model(function_tool).model_json_schema()
    }

Convert function tool to MCP schema

def build_nova_sonic_schema(function_tool: FunctionTool) ‑> dict[str, typing.Any]
Expand source code
def build_nova_sonic_schema(function_tool: FunctionTool) -> dict[str, Any]:
    """Build Amazon Nova Sonic-compatible schema from a function tool"""
    tool_info = get_tool_info(function_tool)

    params_schema_to_use: Optional[dict] = None

    if tool_info.parameters_schema is not None:
        params_schema_to_use = tool_info.parameters_schema
    else:
        model = build_pydantic_args_model(function_tool)
        params_schema_to_use = model.model_json_schema()
    

    final_params_schema_for_nova = params_schema_to_use if params_schema_to_use is not None else {"type": "object", "properties": {}}
    input_schema_json_string = json.dumps(final_params_schema_for_nova)

    description = tool_info.description or tool_info.name

    return {
        "toolSpec": {
            "name": tool_info.name,
            "description": description,
            "inputSchema": {
                "json": input_schema_json_string
            }
        }
    }

Build Amazon Nova Sonic-compatible schema from a function tool

def build_openai_schema(function_tool: FunctionTool) ‑> dict[str, typing.Any]
Expand source code
def build_openai_schema(function_tool: FunctionTool) -> dict[str, Any]:
    """Build OpenAI-compatible schema from a function tool"""
    tool_info = get_tool_info(function_tool)
    
    params_schema_to_use: Optional[dict] = None

    if tool_info.parameters_schema is not None:
        params_schema_to_use = tool_info.parameters_schema
    else:
        model = build_pydantic_args_model(function_tool)
        params_schema_to_use = model.model_json_schema()

    final_params_schema = params_schema_to_use if params_schema_to_use is not None else {"type": "object", "properties": {}}

    return {
            "name": tool_info.name,
            "description": tool_info.description or "",
            "parameters": final_params_schema,
            "type": "function",
    }

Build OpenAI-compatible schema from a function tool

def build_pipeline_config(*,
stt: Any | None,
llm: Any | None,
tts: Any | None,
vad: Any | None,
turn_detector: Any | None,
avatar: Any | None,
denoise: Any | None,
realtime_model: Any | None,
realtime_config_mode: str | None) ‑> PipelineConfig
Expand source code
def build_pipeline_config(
    *,
    stt: Any | None,
    llm: Any | None,
    tts: Any | None,
    vad: Any | None,
    turn_detector: Any | None,
    avatar: Any | None,
    denoise: Any | None,
    realtime_model: Any | None,
    realtime_config_mode: str | None,
) -> PipelineConfig:
    """
    Detect pipeline mode, realtime mode, and active components from
    the raw component references provided at Pipeline construction.
    """
    # Build active component set
    components: set[PipelineComponent] = set()
    if stt is not None:
        components.add(PipelineComponent.STT)
    if llm is not None:
        components.add(PipelineComponent.LLM)
    if tts is not None:
        components.add(PipelineComponent.TTS)
    if vad is not None:
        components.add(PipelineComponent.VAD)
    if turn_detector is not None:
        components.add(PipelineComponent.TURN_DETECTOR)
    if avatar is not None:
        components.add(PipelineComponent.AVATAR)
    if denoise is not None:
        components.add(PipelineComponent.DENOISE)
    if realtime_model is not None:
        components.add(PipelineComponent.REALTIME_MODEL)

    # Detect realtime mode
    realtime_mode: RealtimeMode | None = None
    is_realtime = False

    if realtime_model is not None:
        has_external_stt = stt is not None
        has_external_tts = tts is not None

        if realtime_config_mode:
            realtime_mode = RealtimeMode(realtime_config_mode)
            is_realtime = (realtime_mode != RealtimeMode.LLM_ONLY)
        elif has_external_stt and has_external_tts:
            realtime_mode = RealtimeMode.LLM_ONLY
            is_realtime = False
        elif has_external_stt:
            realtime_mode = RealtimeMode.HYBRID_STT
            is_realtime = True
        elif has_external_tts:
            realtime_mode = RealtimeMode.HYBRID_TTS
            is_realtime = True
        else:
            realtime_mode = RealtimeMode.FULL_S2S
            is_realtime = True

    # Detect pipeline mode
    if is_realtime:
        pipeline_mode = PipelineMode.REALTIME
    elif stt and llm and tts and vad and turn_detector:
        pipeline_mode = PipelineMode.FULL_CASCADING
    elif llm and tts and not stt:
        pipeline_mode = PipelineMode.LLM_TTS_ONLY
    elif stt and llm and not tts:
        pipeline_mode = PipelineMode.STT_LLM_ONLY
    elif llm and not stt and not tts:
        pipeline_mode = PipelineMode.LLM_ONLY
    elif stt and tts and not llm:
        pipeline_mode = PipelineMode.STT_TTS_ONLY
    elif stt and not llm and not tts:
        pipeline_mode = PipelineMode.STT_ONLY
    elif tts and not llm and not stt:
        pipeline_mode = PipelineMode.TTS_ONLY
    else:
        pipeline_mode = PipelineMode.PARTIAL_CASCADING

    return PipelineConfig(
        pipeline_mode=pipeline_mode,
        realtime_mode=realtime_mode,
        is_realtime=is_realtime,
        active_components=frozenset(components),
    )

Detect pipeline mode, realtime mode, and active components from the raw component references provided at Pipeline construction.

def build_pydantic_args_model(func: Callable[..., Any]) ‑> type[pydantic.main.BaseModel]
Expand source code
def build_pydantic_args_model(func: Callable[..., Any]) -> type[BaseModel]:
    """
    Dynamically construct a Pydantic BaseModel class representing all
    valid positional arguments of the given function, complete with types,
    default values, and docstring descriptions.
    """
    return ModelBuilder(func).construct()

Dynamically construct a Pydantic BaseModel class representing all valid positional arguments of the given function, complete with types, default values, and docstring descriptions.

async def cancel_and_wait(task: asyncio.Task | None) ‑> None
Expand source code
async def cancel_and_wait(task: asyncio.Task | None) -> None:
    """Cancel a task and wait for it to finish, suppressing CancelledError."""
    if not task or task.done():
        return
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

Cancel a task and wait for it to finish, suppressing CancelledError.

def create_generic_mcp_adapter(tool_name: str,
tool_description: str | None,
input_schema: dict,
client_call_function: Callable) ‑> FunctionTool
Expand source code
def create_generic_mcp_adapter(
    tool_name: str, 
    tool_description: str | None, 
    input_schema: dict,
    client_call_function: Callable
) -> FunctionTool:
    """
    Create a generic adapter that converts an MCP tool to a framework FunctionTool.
    
    Args:
        tool_name: Name of the MCP tool
        tool_description: Description of the MCP tool (if available)
        input_schema: JSON schema for the tool's input parameters
        client_call_function: Function to call the tool on the MCP server
        
    Returns:
        A function tool that can be registered with the agent
    """
    required_params = input_schema.get('required', [])
    
    param_properties = input_schema.get('properties', {})
    
    docstring = tool_description or f"Call the {tool_name} tool"
    if param_properties and "Args:" not in docstring:
        param_docs = "\n\nArgs:\n"
        for param_name, param_info in param_properties.items():
            required = " (required)" if param_name in required_params else ""
            description = param_info.get('description', f"Parameter for {tool_name}")
            param_docs += f"    {param_name}{required}: {description}\n"
        docstring += param_docs
    
    if not param_properties:
        @function_tool(name=tool_name)
        async def no_param_tool() -> Any:
            return await client_call_function({})
        no_param_tool.__doc__ = docstring
        tool_info_no_param = get_tool_info(no_param_tool)
        tool_info_no_param.parameters_schema = input_schema
        return no_param_tool
    else:
        @function_tool(name=tool_name) 
        async def param_tool(**kwargs) -> Any:
            actual_kwargs = kwargs.copy() # Work with a copy

            if 'instructions' in required_params and 'instructions' not in actual_kwargs:
                other_params_provided = any(p in actual_kwargs for p in param_properties if p != 'instructions')
                if other_params_provided:
                    actual_kwargs['instructions'] = f"Execute tool {tool_name} with the provided parameters."

            
            missing = [p for p in required_params if p not in actual_kwargs]
            if missing:
                missing_str = ", ".join(missing)
                param_details = []
                for param in missing:
                    param_info = param_properties.get(param, {})
                    desc = param_info.get('description', f"Parameter for {tool_name}")
                    param_details.append(f"'{param}': {desc}")
                
                param_help = "; ".join(param_details)
                raise ToolError(
                    f"Missing required parameters for {tool_name}: {missing_str}. "
                    f"Required parameters: {param_help}"
                )
            return await client_call_function(actual_kwargs)
        param_tool.__doc__ = docstring
        tool_info_param = get_tool_info(param_tool)
        tool_info_param.parameters_schema = input_schema
        return param_tool

Create a generic adapter that converts an MCP tool to a framework FunctionTool.

Args

tool_name
Name of the MCP tool
tool_description
Description of the MCP tool (if available)
input_schema
JSON schema for the tool's input parameters
client_call_function
Function to call the tool on the MCP server

Returns

A function tool that can be registered with the agent

def format_metrics(raw: dict) ‑> dict
Expand source code
def format_metrics(raw: dict) -> dict:
    if not hasattr(format_metrics, "_is_first"):
        format_metrics._is_first = True

    def clean(data):
        if not isinstance(data, dict):
            return data
        return {
            k: clean(v)
            for k, v in data.items()
            if v is not None and v != {}
        }

    latency = {
        "e2eLatency": raw.get("e2eLatency"),
        "ttfb": raw.get("ttfb")
    }

    if not raw.get("realtimeProviderClass"):
        latency.update({
            "sttLatency": raw.get("sttLatency"),
            "ttft": raw.get("ttft"),
            "eouLatency": raw.get("eouLatency")
        })

    user_text = None
    agent_text = None

    for event in raw.get("timeline", []):
        if event.get("eventType") == "user_speech":
            user_text = event.get("text")
        elif event.get("eventType") == "agent_speech":
            agent_text = event.get("text")

    user_text = user_text or raw.get("user_speech")
    agent_text = agent_text or raw.get("agent_speech")

    speech = {
        "user": {
            "eventType": "user_speech",
            "text": user_text
        } if user_text else None,
        "agent": {
            "eventType": "agent_speech",
            "text": agent_text
        } if agent_text else None
    }

    payload = {
        "latency": latency,
        "speech": speech
    }

    if format_metrics._is_first:

        if raw.get("realtimeProviderClass"):
            payload["providers"] = {
                "providerClass": raw.get("realtimeProviderClass"),
                "modelName": raw.get("realtimeModelName")
            }
        else:
            payload["providers"] = {
                "llmProviderClass": raw.get("llmProviderClass"),
                "llmModelName": raw.get("llmModelName"),
                "sttProviderClass": raw.get("sttProviderClass"),
                "sttModelName": raw.get("sttModelName"),
                "ttsProviderClass": raw.get("ttsProviderClass"),
                "ttsModelName": raw.get("ttsModelName")
            }

        payload["systemInstructions"] = raw.get("systemInstructions")

        format_metrics._is_first = False

    return clean(payload)    
def format_provider_class(obj: Any) ‑> str
Expand source code
def format_provider_class(obj: Any) -> str:
    """Display name for a provider component used in metrics.

    Inference components (videosdk.agents.inference.*) carry a `provider`
    attribute identifying the underlying gateway provider ("deepgram",
    "google", "cartesia", ...). For those we surface `Videosdk-<provider>`
    so metrics distinguish the inference path from direct plugin usage.
    All other components fall back to their actual class name (e.g.
    `DeepgramSTT`).
    """
    if obj is None:
        return ""
    cls = obj.__class__
    provider = getattr(obj, "provider", None)
    if isinstance(provider, str) and cls.__module__.startswith("videosdk.agents.inference"):
        return f"Videosdk-{provider}"
    return cls.__name__

Display name for a provider component used in metrics.

Inference components (videosdk.agents.inference.*) carry a provider attribute identifying the underlying gateway provider ("deepgram", "google", "cartesia", …). For those we surface Videosdk-<provider> so metrics distinguish the inference path from direct plugin usage. All other components fall back to their actual class name (e.g. DeepgramSTT).

def function_tool(func: Optional[Callable] = None, *, name: Optional[str] = None)
Expand source code
def function_tool(func: Optional[Callable] = None, *, name: Optional[str] = None):
    """Decorator to mark a function as a tool. Can be used with or without parentheses."""
    
    def create_wrapper(fn: Callable) -> FunctionTool:
        tool_info = FunctionToolInfo(
            name=name or fn.__name__,
            description=fn.__doc__
        )
        
        if asyncio.iscoroutinefunction(fn):
            @wraps(fn)
            async def async_wrapper(*args, **kwargs):
                return await fn(*args, **kwargs)
            
            setattr(async_wrapper, "_tool_info", tool_info)
            return async_wrapper
        else:
            @wraps(fn)
            def sync_wrapper(*args, **kwargs):
                return fn(*args, **kwargs)
            
            setattr(sync_wrapper, "_tool_info", tool_info)
            return sync_wrapper
    
    if func is None:
        return create_wrapper
    
    return create_wrapper(func)

Decorator to mark a function as a tool. Can be used with or without parentheses.

def generate_videosdk_token(api_key: str = '', secret: str = '', *, ttl_seconds: int = 3600) ‑> str
Expand source code
def generate_videosdk_token(
    api_key: str = "",
    secret: str = "",
    *,
    ttl_seconds: int = 3600,
) -> str:
    """
    Generate a pre-signed VideoSDK JWT token.

    Args:
        api_key (str): Your VideoSDK API key. Falls back to ``VIDEOSDK_API_KEY`` env.
        secret (str): Your VideoSDK secret key. Falls back to ``VIDEOSDK_SECRET_KEY`` env.
        ttl_seconds (int): Token validity in seconds (default: 3600).

    Returns:
        str: Signed JWT token.
    """
    try:
        import jwt
    except ImportError as exc:
        raise ImportError(
            "PyJWT is required. Install it using: pip install PyJWT"
        ) from exc

    api_key = api_key or os.getenv("VIDEOSDK_API_KEY", "")
    secret = secret or os.getenv("VIDEOSDK_SECRET_KEY", "")
    if not api_key or not secret:
        raise ValueError("VIDEOSDK_API_KEY and VIDEOSDK_SECRET_KEY are not set")

    now = int(time.time())
    payload = {
        "apikey": api_key,
        "permissions": ["allow_join"],
        "version": 2,
        "iat": now,
        "exp": now + ttl_seconds,
    }

    token = jwt.encode(payload, secret, algorithm="HS256")

    if isinstance(token, bytes):
        token = token.decode("utf-8")

    return token

Generate a pre-signed VideoSDK JWT token.

Args

api_key : str
Your VideoSDK API key. Falls back to VIDEOSDK_API_KEY env.
secret : str
Your VideoSDK secret key. Falls back to VIDEOSDK_SECRET_KEY env.
ttl_seconds : int
Token validity in seconds (default: 3600).

Returns

str
Signed JWT token.
def get_tool_info(tool: FunctionTool) ‑> FunctionToolInfo
Expand source code
def get_tool_info(tool: FunctionTool) -> FunctionToolInfo:
    """Get the tool info from a function tool"""
    if not is_function_tool(tool):
        raise ValueError("Object is not a function tool")
    
    if inspect.ismethod(tool):
        tool = tool.__func__
    return getattr(tool, "_tool_info")

Get the tool info from a function tool

async def graceful_cancel(*tasks: asyncio.Task) ‑> None
Expand source code
async def graceful_cancel(*tasks: asyncio.Task) -> None:
    """Simple utility to cancel tasks and wait for them to complete"""
    if not tasks:
        return

    for task in tasks:
        if not task.done():
            task.cancel()
    
    try:
        await asyncio.wait_for(
            asyncio.gather(*tasks, return_exceptions=True),
            timeout=0.5
        )
    except asyncio.TimeoutError:
        pass

Simple utility to cancel tasks and wait for them to complete

def is_function_tool(obj: Any) ‑> bool
Expand source code
def is_function_tool(obj: Any) -> bool:
    """Check if an object is a function tool"""
    if inspect.ismethod(obj):
        obj = obj.__func__
    return hasattr(obj, "_tool_info")

Check if an object is a function tool

def reset_metrics_formatter_state() ‑> None
Expand source code
def reset_metrics_formatter_state() -> None:
    """Reset the one-shot flag in ``format_metrics`` so providers and
    systemInstructions are re-included in the next transport payload.

    Used when the pipeline changes mid-room (agent swap) — the new agent's
    provider info and instructions need to be re-published instead of being
    suppressed by the flag that latched during the first turn.
    """
    format_metrics._is_first = True   

Reset the one-shot flag in format_metrics() so providers and systemInstructions are re-included in the next transport payload.

Used when the pipeline changes mid-room (agent swap) — the new agent's provider info and instructions need to be re-published instead of being suppressed by the flag that latched during the first turn.

def resolve_videosdk_auth_token(explicit: Optional[str] = None) ‑> str | None
Expand source code
def resolve_videosdk_auth_token(explicit: Optional[str] = None) -> Optional[str]:
    """
    Resolve the VideoSDK auth token with priority:
      1. explicit (from RoomOptions / WorkerOptions)
      2. VIDEOSDK_AUTH_TOKEN env var
      3. JWT generated from VIDEOSDK_API_KEY + VIDEOSDK_SECRET_KEY

    When path 3 is taken, the generated token is written back to
    os.environ["VIDEOSDK_AUTH_TOKEN"] so sub-modules that read the env var
    directly (inference/, metrics/analytics.py, knowledge_base/base.py) pick
    it up without needing a refactor.

    Returns None if nothing is available.
    """
    global _generated_token_cache

    if explicit:
        return explicit

    env_token = os.getenv("VIDEOSDK_AUTH_TOKEN")
    if env_token:
        return env_token

    if _generated_token_cache:
        return _generated_token_cache

    api_key = os.getenv("VIDEOSDK_API_KEY")
    secret = os.getenv("VIDEOSDK_SECRET_KEY")
    if not api_key and not secret:
        return None

    token = generate_videosdk_token(api_key or "", secret or "")
    _generated_token_cache = token
    os.environ["VIDEOSDK_AUTH_TOKEN"] = token
    return token

Resolve the VideoSDK auth token with priority: 1. explicit (from RoomOptions / WorkerOptions) 2. VIDEOSDK_AUTH_TOKEN env var 3. JWT generated from VIDEOSDK_API_KEY + VIDEOSDK_SECRET_KEY

When path 3 is taken, the generated token is written back to os.environ["VIDEOSDK_AUTH_TOKEN"] so sub-modules that read the env var directly (inference/, metrics/analytics.py, knowledge_base/base.py) pick it up without needing a refactor.

Returns None if nothing is available.

async def run_stt(audio_stream: AsyncIterator[bytes]) ‑> AsyncIterator[Any]
Expand source code
async def run_stt(audio_stream: AsyncIterator[bytes]) -> AsyncIterator[Any]:
    """
    Run STT on an audio stream.
    
    Delegates to the STT component's stream_transcribe method.
    
    Args:
        audio_stream: Async iterator of audio bytes
        
    Yields:
        SpeechEvent objects (with text, etc.)
    """
    from .job import get_current_job_context
    ctx = get_current_job_context()
    if not ctx or not ctx._pipeline or not ctx._pipeline.stt:
        raise RuntimeError("No STT component available in current context")
    
    async for event in ctx._pipeline.stt.stream_transcribe(audio_stream):
        yield event

Run STT on an audio stream.

Delegates to the STT component's stream_transcribe method.

Args

audio_stream
Async iterator of audio bytes

Yields

SpeechEvent objects (with text, etc.)

async def run_tts(text_stream: AsyncIterator[str]) ‑> AsyncIterator[bytes]
Expand source code
async def run_tts(text_stream: AsyncIterator[str]) -> AsyncIterator[bytes]:
    """
    Run TTS on a text stream.
    
    Delegates to the TTS component's stream_synthesize method.
    
    Args:
        text_stream: Async iterator of text
        
    Yields:
        Audio bytes
    """
    from .job import get_current_job_context
    ctx = get_current_job_context()
    if not ctx or not ctx._pipeline or not ctx._pipeline.tts:
        raise RuntimeError("No TTS component available in current context")
    
    async for frame in ctx._pipeline.tts.stream_synthesize(text_stream):
        yield frame

Run TTS on a text stream.

Delegates to the TTS component's stream_synthesize method.

Args

text_stream
Async iterator of text

Yields

Audio bytes

async def segment_text(chunks: AsyncIterator[str],
delimiters: str = '.?!,;:\n',
keep_delimiter: bool = True,
min_chars: int = 50,
min_words: int = 12,
max_buffer: int = 600) ‑> AsyncIterator[str]
Expand source code
async def segment_text(
    chunks: AsyncIterator[str],
    delimiters: str = ".?!,;:\n",
    keep_delimiter: bool = True,
    min_chars: int = 50,
    min_words: int = 12,
    max_buffer: int = 600,
) -> AsyncIterator[str]:
    """
    Segment an async stream of text on delimiters or soft boundaries to reduce TTS latency.
    Yields segments while keeping the delimiter if requested.
    """
    buffer = ""

    def words_count(s: str) -> int:
        return len(s.split())

    def find_first_delim_index(s: str) -> int:
        indices = [i for d in delimiters if (i := s.find(d)) != -1]
        return min(indices) if indices else -1

    from .tts.tts import FlushMarker

    async for chunk in chunks:
        if isinstance(chunk, FlushMarker):
            continue
        if not chunk:
            continue
        buffer += chunk

        while True:
            di = find_first_delim_index(buffer)
            if di != -1:
                seg = buffer[: di + (1 if keep_delimiter else 0)]
                yield seg
                buffer = buffer[di + 1 :].lstrip()
                continue
            else:
                if len(buffer) >= max_buffer or words_count(buffer) >= (min_words * 2):
                    target = max(min_chars, min(len(buffer), max_buffer))
                    cut_idx = buffer.rfind(" ", 0, target)
                    if cut_idx == -1:
                        cut_idx = target
                    seg = buffer[:cut_idx].rstrip()
                    if seg:
                        yield seg
                    buffer = buffer[cut_idx:].lstrip()
                    continue
                break

    if buffer:
        yield buffer

Segment an async stream of text on delimiters or soft boundaries to reduce TTS latency. Yields segments while keeping the delimiter if requested.

def simplify_gemini_schema(schema: dict[str, Any]) ‑> dict[str, typing.Any] | None
Expand source code
def simplify_gemini_schema(schema: dict[str, Any]) -> dict[str, Any] | None:
    """
    Transforms a JSON Schema into Gemini compatible format.
    """

    TYPE_MAPPING: dict[str, types.Type] = {
        "string": types.Type.STRING,
        "number": types.Type.NUMBER,
        "integer": types.Type.INTEGER,
        "boolean": types.Type.BOOLEAN,
        "array": types.Type.ARRAY,
        "object": types.Type.OBJECT,
    }

    SUPPORTED_TYPES = set(TYPE_MAPPING.keys())
    FIELDS_TO_REMOVE = ("title", "default", "additionalProperties", "$defs", "$schema")

    def process_node(node: dict[str, Any]) -> dict[str, Any] | None:
        new_node = node.copy()
        for field in FIELDS_TO_REMOVE:
            if field in new_node:
                del new_node[field]

        json_type = new_node.get("type")
        if json_type in SUPPORTED_TYPES:
            new_node["type"] = TYPE_MAPPING[json_type]

        node_type = new_node.get("type")
        if node_type == types.Type.OBJECT:
            if "properties" in new_node:
                new_props = {}
                for key, prop in new_node["properties"].items():
                    simplified = process_node(prop)
                    if simplified is not None:
                        new_props[key] = simplified
                new_node["properties"] = new_props
                if not new_props:
                    return None
            else:
                return None
        elif node_type == types.Type.ARRAY:
            if "items" in new_node:
                simplified_items = process_node(new_node["items"])
                if simplified_items is not None:
                    new_node["items"] = simplified_items
                else:
                    del new_node["items"]

        return new_node

    result = process_node(schema)
    if result and result.get("type") == types.Type.OBJECT and not result.get("properties"):
        return None
    return result

Transforms a JSON Schema into Gemini compatible format.

Classes

class AgentState (*args, **kwds)
Expand source code
@enum.unique
class AgentState(enum.Enum):
    """Represents the current state of the agent in a conversation session."""
    STARTING = "starting"
    IDLE = "idle"
    SPEAKING = "speaking"
    LISTENING = "listening"
    THINKING = "thinking"
    CLOSING = "closing"

Represents the current state of the agent in a conversation session.

Ancestors

  • enum.Enum

Class variables

var CLOSING
var IDLE
var LISTENING
var SPEAKING
var STARTING
var THINKING
class AsyncIteratorQueue
Expand source code
class AsyncIteratorQueue:
    """An async iterator backed by an asyncio.Queue, allowing producers to put items and consumers to async-iterate until closed."""
    def __init__(self):
        self.queue = asyncio.Queue()
        self.closed = False

    async def put(self, item):
        if not self.closed:
            await self.queue.put(item)

    def close(self):
        self.closed = True
        self.queue.put_nowait(None)

    def __aiter__(self):
        return self

    async def __anext__(self):
        item = await self.queue.get()
        if item is None:
            raise StopAsyncIteration
        return item

An async iterator backed by an asyncio.Queue, allowing producers to put items and consumers to async-iterate until closed.

Methods

def close(self)
Expand source code
def close(self):
    self.closed = True
    self.queue.put_nowait(None)
async def put(self, item)
Expand source code
async def put(self, item):
    if not self.closed:
        await self.queue.put(item)
class AudioByteStream (sample_rate: int, num_channels: int, samples_per_channel: int)
Expand source code
class AudioByteStream:
    """
    Buffers raw PCM bytes and emits fixed-size AudioFrame objects.

    Useful for converting variable-size chunks (e.g. data-channel payloads)
    into the steady frame cadence that AvatarSynchronizer expects.
    """

    def __init__(self, sample_rate: int, num_channels: int, samples_per_channel: int):
        import numpy as _np
        self._np = _np
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._samples_per_channel = samples_per_channel
        self._sample_width = 2  
        self._bytes_per_frame = samples_per_channel * num_channels * self._sample_width
        self._buffer = bytearray()

    def push(self, frame_data: bytes) -> list:
        from av import AudioFrame
        self._buffer.extend(frame_data)
        frames = []
        while len(self._buffer) >= self._bytes_per_frame:
            chunk = bytes(self._buffer[: self._bytes_per_frame])
            del self._buffer[: self._bytes_per_frame]
            arr = self._np.frombuffer(chunk, dtype=self._np.int16).reshape(
                self._num_channels, self._samples_per_channel
            )
            frame = AudioFrame.from_ndarray(
                arr,
                format="s16",
                layout="mono" if self._num_channels == 1 else "stereo",
            )
            frame.sample_rate = self._sample_rate
            frames.append(frame)
        return frames

    def flush(self) -> list:
        from av import AudioFrame
        if not self._buffer:
            return []
        chunk = bytes(self._buffer)
        self._buffer.clear()
        missing = self._bytes_per_frame - len(chunk)
        if missing > 0:
            chunk += bytes(missing)
        arr = self._np.frombuffer(chunk, dtype=self._np.int16).reshape(
            self._samples_per_channel, self._num_channels
        )
        frame = AudioFrame.from_ndarray(
            arr.T,
            format="s16",
            layout="mono" if self._num_channels == 1 else "stereo",
        )
        frame.sample_rate = self._sample_rate
        return [frame]

Buffers raw PCM bytes and emits fixed-size AudioFrame objects.

Useful for converting variable-size chunks (e.g. data-channel payloads) into the steady frame cadence that AvatarSynchronizer expects.

Methods

def flush(self) ‑> list
Expand source code
def flush(self) -> list:
    from av import AudioFrame
    if not self._buffer:
        return []
    chunk = bytes(self._buffer)
    self._buffer.clear()
    missing = self._bytes_per_frame - len(chunk)
    if missing > 0:
        chunk += bytes(missing)
    arr = self._np.frombuffer(chunk, dtype=self._np.int16).reshape(
        self._samples_per_channel, self._num_channels
    )
    frame = AudioFrame.from_ndarray(
        arr.T,
        format="s16",
        layout="mono" if self._num_channels == 1 else "stereo",
    )
    frame.sample_rate = self._sample_rate
    return [frame]
def push(self, frame_data: bytes) ‑> list
Expand source code
def push(self, frame_data: bytes) -> list:
    from av import AudioFrame
    self._buffer.extend(frame_data)
    frames = []
    while len(self._buffer) >= self._bytes_per_frame:
        chunk = bytes(self._buffer[: self._bytes_per_frame])
        del self._buffer[: self._bytes_per_frame]
        arr = self._np.frombuffer(chunk, dtype=self._np.int16).reshape(
            self._num_channels, self._samples_per_channel
        )
        frame = AudioFrame.from_ndarray(
            arr,
            format="s16",
            layout="mono" if self._num_channels == 1 else "stereo",
        )
        frame.sample_rate = self._sample_rate
        frames.append(frame)
    return frames
class FieldBuilder (param: inspect.Parameter,
type_processor: TypeProcessor,
description: str | None)
Expand source code
class FieldBuilder:
    """Builds a Pydantic field tuple from a function parameter, its processed type, and docstring description."""
    def __init__(self, param: inspect.Parameter, type_processor: TypeProcessor, description: str | None):
        self.param = param
        self.type_processor = type_processor
        self.description = description
    
    def build(self) -> tuple[Any, FieldInfo]:
        field_info = self.type_processor.field_info
        self._apply_default_if_needed(field_info)
        self._apply_description_if_needed(field_info)
        return (self.type_processor.base_type, field_info)
    
    def _apply_default_if_needed(self, field_info: FieldInfo) -> None:
        if (self.param.default is not inspect.Parameter.empty and 
            field_info.default is PydanticUndefined):
            field_info.default = self.param.default
    
    def _apply_description_if_needed(self, field_info: FieldInfo) -> None:
        if field_info.description is None and self.description is not None:
            field_info.description = self.description

Builds a Pydantic field tuple from a function parameter, its processed type, and docstring description.

Methods

def build(self) ‑> tuple[typing.Any, pydantic.fields.FieldInfo]
Expand source code
def build(self) -> tuple[Any, FieldInfo]:
    field_info = self.type_processor.field_info
    self._apply_default_if_needed(field_info)
    self._apply_description_if_needed(field_info)
    return (self.type_processor.base_type, field_info)
class FunctionTool (*args, **kwargs)
Expand source code
@runtime_checkable
class FunctionTool(Protocol):
    @property
    @abstractmethod
    def _tool_info(self) -> "FunctionToolInfo": ...
    def __call__(self, *args: Any, **kwargs: Any) -> Any: ...

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...

Ancestors

  • typing.Protocol
  • typing.Generic
class FunctionToolInfo (name: str,
description: str | None = None,
parameters_schema: Optional[dict] = None)
Expand source code
@dataclass
class FunctionToolInfo:
    """Holds metadata about a function tool, including its name, description, and parameter schema."""
    name: str
    description: str | None = None
    parameters_schema: Optional[dict] = None

Holds metadata about a function tool, including its name, description, and parameter schema.

Instance variables

var description : str | None
var name : str
var parameters_schema : dict | None
class ModelBuilder (func: Callable[..., Any])
Expand source code
class ModelBuilder:
    """Constructs a Pydantic BaseModel from a function's signature, type hints, and docstring."""
    def __init__(self, func: Callable[..., Any]):
        self.func = func
        self.sig = inspect.signature(func)
        self.hints = get_type_hints(func, include_extras=True)
        self.docs = parse_from_object(func)
        self._field_registry = {}
    
    def construct(self) -> type[BaseModel]:
        self._register_all_fields()
        return create_model(self._generate_class_name(), **self._field_registry)
    
    def _register_all_fields(self) -> None:
        for name, param in self.sig.parameters.items():
            if self._should_skip_param(name):
                continue
            self._register_field(name, param)
    
    def _should_skip_param(self, name: str) -> bool:
        return name in ("self", "cls") or name not in self.hints
    
    def _register_field(self, name: str, param: inspect.Parameter) -> None:
        type_info = TypeProcessor(self.hints[name])
        field_builder = FieldBuilder(param, type_info, self._find_doc_description(name))
        self._field_registry[name] = field_builder.build()
    
    def _find_doc_description(self, param_name: str) -> str | None:
        return next(
            (p.description for p in self.docs.params if p.arg_name == param_name),
            None
        )
    
    def _generate_class_name(self) -> str:
        return "".join(part.title() for part in self.func.__name__.split("_")) + "Args"

Constructs a Pydantic BaseModel from a function's signature, type hints, and docstring.

Methods

def construct(self) ‑> type[pydantic.main.BaseModel]
Expand source code
def construct(self) -> type[BaseModel]:
    self._register_all_fields()
    return create_model(self._generate_class_name(), **self._field_registry)
class PipelineComponent (*args, **kwds)
Expand source code
@enum.unique
class PipelineComponent(enum.Enum):
    """Identifiers for each component slot in the pipeline."""
    STT = "stt"
    LLM = "llm"
    TTS = "tts"
    VAD = "vad"
    TURN_DETECTOR = "turn_detector"
    AVATAR = "avatar"
    DENOISE = "denoise"
    REALTIME_MODEL = "realtime_model"

Identifiers for each component slot in the pipeline.

Ancestors

  • enum.Enum

Class variables

var AVATAR
var DENOISE
var LLM
var REALTIME_MODEL
var STT
var TTS
var TURN_DETECTOR
var VAD
class PipelineConfig (pipeline_mode: PipelineMode,
realtime_mode: RealtimeMode | None,
is_realtime: bool,
active_components: frozenset[PipelineComponent])
Expand source code
@dataclass(frozen=True)
class PipelineConfig:
    """
    Immutable snapshot of the pipeline's detected configuration.
    Computed once during Pipeline.__init__ and accessible everywhere via pipeline.config.
    """
    pipeline_mode: PipelineMode
    realtime_mode: RealtimeMode | None
    is_realtime: bool
    active_components: frozenset[PipelineComponent]

    def has_component(self, component: PipelineComponent) -> bool:
        """Check whether a specific component is present."""
        return component in self.active_components

    @property
    def component_names(self) -> list[str]:
        """Return sorted list of active component value strings (for metrics/logging)."""
        return sorted(c.value for c in self.active_components)

Immutable snapshot of the pipeline's detected configuration. Computed once during Pipeline.init and accessible everywhere via pipeline.config.

Instance variables

var active_components : frozenset[PipelineComponent]
prop component_names : list[str]
Expand source code
@property
def component_names(self) -> list[str]:
    """Return sorted list of active component value strings (for metrics/logging)."""
    return sorted(c.value for c in self.active_components)

Return sorted list of active component value strings (for metrics/logging).

var is_realtime : bool
var pipeline_modePipelineMode
var realtime_modeRealtimeMode | None

Methods

def has_component(self,
component: PipelineComponent) ‑> bool
Expand source code
def has_component(self, component: PipelineComponent) -> bool:
    """Check whether a specific component is present."""
    return component in self.active_components

Check whether a specific component is present.

class PipelineMode (*args, **kwds)
Expand source code
@enum.unique
class PipelineMode(enum.Enum):
    """The overall pipeline architecture mode based on which components are present."""
    REALTIME = "realtime"
    FULL_CASCADING = "full_cascading"
    LLM_TTS_ONLY = "llm_tts_only"
    STT_LLM_ONLY = "stt_llm_only"
    LLM_ONLY = "llm_only"
    STT_ONLY = "stt_only"
    TTS_ONLY = "tts_only"
    STT_TTS_ONLY = "stt_tts_only"
    HYBRID = "hybrid"
    PARTIAL_CASCADING = "partial_cascading"

The overall pipeline architecture mode based on which components are present.

Ancestors

  • enum.Enum

Class variables

var FULL_CASCADING
var HYBRID
var LLM_ONLY
var LLM_TTS_ONLY
var PARTIAL_CASCADING
var REALTIME
var STT_LLM_ONLY
var STT_ONLY
var STT_TTS_ONLY
var TTS_ONLY
class RawFunctionTool (*args, **kwargs)
Expand source code
class RawFunctionTool(Protocol):
    """Protocol for raw function tool without framework wrapper"""
    def __call__(self, *args: Any, **kwargs: Any) -> Any: ...

Protocol for raw function tool without framework wrapper

Ancestors

  • typing.Protocol
  • typing.Generic
class RealtimeMode (*args, **kwds)
Expand source code
@enum.unique
class RealtimeMode(enum.Enum):
    """The realtime sub-mode when a RealtimeBaseModel is used as the LLM."""
    FULL_S2S = "full_s2s"
    HYBRID_STT = "hybrid_stt"
    HYBRID_TTS = "hybrid_tts"
    LLM_ONLY = "llm_only"

The realtime sub-mode when a RealtimeBaseModel is used as the LLM.

Ancestors

  • enum.Enum

Class variables

var FULL_S2S
var HYBRID_STT
var HYBRID_TTS
var LLM_ONLY
class ToolError (*args, **kwargs)
Expand source code
class ToolError(Exception):
    """Exception raised when a tool execution fails"""
    pass    

Exception raised when a tool execution fails

Ancestors

  • builtins.Exception
  • builtins.BaseException
class TypeProcessor (hint: Any)
Expand source code
class TypeProcessor:
    """Extracts the base type and Pydantic FieldInfo from a possibly Annotated type hint."""
    def __init__(self, hint: Any):
        self.original_hint = hint
        self.base_type = hint
        self.field_info = Field()
        self._process_annotation()
    
    def _process_annotation(self) -> None:
        if get_origin(self.original_hint) is Annotated:
            args = get_args(self.original_hint)
            self.base_type = args[0]
            self._extract_field_info_from_args(args[1:])
    
    def _extract_field_info_from_args(self, args: tuple) -> None:
        for arg in args:
            if isinstance(arg, FieldInfo):
                self.field_info = arg
                break

Extracts the base type and Pydantic FieldInfo from a possibly Annotated type hint.

class UserState (*args, **kwds)
Expand source code
@enum.unique
class UserState(enum.Enum):
    """Represents the current state of the user in a conversation session."""
    IDLE = "idle"
    SPEAKING = "speaking"
    LISTENING = "listening"

Represents the current state of the user in a conversation session.

Ancestors

  • enum.Enum

Class variables

var IDLE
var LISTENING
var SPEAKING