Module agents.utils
Functions
def build_gemini_schema(function_tool: FunctionTool) ‑> google.genai.types.FunctionDeclaration-
Expand source code
def build_gemini_schema(function_tool: FunctionTool) -> types.FunctionDeclaration: """Build Gemini-compatible schema from a function tool""" tool_info = get_tool_info(function_tool) parameter_json_schema_for_gemini: Optional[dict[str, Any]] = None if tool_info.parameters_schema is not None: if tool_info.parameters_schema and tool_info.parameters_schema.get("properties", True) is not None: simplified_schema = simplify_gemini_schema(tool_info.parameters_schema) parameter_json_schema_for_gemini = simplified_schema else: openai_schema = build_openai_schema(function_tool) if openai_schema.get("parameters") and openai_schema["parameters"].get("properties", True) is not None: simplified_schema = simplify_gemini_schema(openai_schema["parameters"]) parameter_json_schema_for_gemini = simplified_schema func_declaration = types.FunctionDeclaration( name=tool_info.name, description=tool_info.description or "", parameters=parameter_json_schema_for_gemini ) return func_declarationBuild Gemini-compatible schema from a function tool
def build_mcp_schema(function_tool: FunctionTool) ‑> dict-
Expand source code
def build_mcp_schema(function_tool: FunctionTool) -> dict: """Convert function tool to MCP schema""" tool_info = get_tool_info(function_tool) return { "name": tool_info.name, "description": tool_info.description, "parameters": build_pydantic_args_model(function_tool).model_json_schema() }Convert function tool to MCP schema
def build_nova_sonic_schema(function_tool: FunctionTool) ‑> dict[str, typing.Any]-
Expand source code
def build_nova_sonic_schema(function_tool: FunctionTool) -> dict[str, Any]: """Build Amazon Nova Sonic-compatible schema from a function tool""" tool_info = get_tool_info(function_tool) params_schema_to_use: Optional[dict] = None if tool_info.parameters_schema is not None: params_schema_to_use = tool_info.parameters_schema else: model = build_pydantic_args_model(function_tool) params_schema_to_use = model.model_json_schema() final_params_schema_for_nova = params_schema_to_use if params_schema_to_use is not None else {"type": "object", "properties": {}} input_schema_json_string = json.dumps(final_params_schema_for_nova) description = tool_info.description or tool_info.name return { "toolSpec": { "name": tool_info.name, "description": description, "inputSchema": { "json": input_schema_json_string } } }Build Amazon Nova Sonic-compatible schema from a function tool
def build_openai_schema(function_tool: FunctionTool) ‑> dict[str, typing.Any]-
Expand source code
def build_openai_schema(function_tool: FunctionTool) -> dict[str, Any]: """Build OpenAI-compatible schema from a function tool""" tool_info = get_tool_info(function_tool) params_schema_to_use: Optional[dict] = None if tool_info.parameters_schema is not None: params_schema_to_use = tool_info.parameters_schema else: model = build_pydantic_args_model(function_tool) params_schema_to_use = model.model_json_schema() final_params_schema = params_schema_to_use if params_schema_to_use is not None else {"type": "object", "properties": {}} return { "name": tool_info.name, "description": tool_info.description or "", "parameters": final_params_schema, "type": "function", }Build OpenAI-compatible schema from a function tool
def build_pipeline_config(*,
stt: Any | None,
llm: Any | None,
tts: Any | None,
vad: Any | None,
turn_detector: Any | None,
avatar: Any | None,
denoise: Any | None,
realtime_model: Any | None,
realtime_config_mode: str | None) ‑> PipelineConfig-
Expand source code
def build_pipeline_config( *, stt: Any | None, llm: Any | None, tts: Any | None, vad: Any | None, turn_detector: Any | None, avatar: Any | None, denoise: Any | None, realtime_model: Any | None, realtime_config_mode: str | None, ) -> PipelineConfig: """ Detect pipeline mode, realtime mode, and active components from the raw component references provided at Pipeline construction. """ # Build active component set components: set[PipelineComponent] = set() if stt is not None: components.add(PipelineComponent.STT) if llm is not None: components.add(PipelineComponent.LLM) if tts is not None: components.add(PipelineComponent.TTS) if vad is not None: components.add(PipelineComponent.VAD) if turn_detector is not None: components.add(PipelineComponent.TURN_DETECTOR) if avatar is not None: components.add(PipelineComponent.AVATAR) if denoise is not None: components.add(PipelineComponent.DENOISE) if realtime_model is not None: components.add(PipelineComponent.REALTIME_MODEL) # Detect realtime mode realtime_mode: RealtimeMode | None = None is_realtime = False if realtime_model is not None: has_external_stt = stt is not None has_external_tts = tts is not None if realtime_config_mode: realtime_mode = RealtimeMode(realtime_config_mode) is_realtime = (realtime_mode != RealtimeMode.LLM_ONLY) elif has_external_stt and has_external_tts: realtime_mode = RealtimeMode.LLM_ONLY is_realtime = False elif has_external_stt: realtime_mode = RealtimeMode.HYBRID_STT is_realtime = True elif has_external_tts: realtime_mode = RealtimeMode.HYBRID_TTS is_realtime = True else: realtime_mode = RealtimeMode.FULL_S2S is_realtime = True # Detect pipeline mode if is_realtime: pipeline_mode = PipelineMode.REALTIME elif stt and llm and tts and vad and turn_detector: pipeline_mode = PipelineMode.FULL_CASCADING elif llm and tts and not stt: pipeline_mode = PipelineMode.LLM_TTS_ONLY elif stt and llm and not tts: pipeline_mode = PipelineMode.STT_LLM_ONLY elif llm and not stt and not tts: pipeline_mode = PipelineMode.LLM_ONLY elif stt and tts and not llm: pipeline_mode = PipelineMode.STT_TTS_ONLY elif stt and not llm and not tts: pipeline_mode = PipelineMode.STT_ONLY elif tts and not llm and not stt: pipeline_mode = PipelineMode.TTS_ONLY else: pipeline_mode = PipelineMode.PARTIAL_CASCADING return PipelineConfig( pipeline_mode=pipeline_mode, realtime_mode=realtime_mode, is_realtime=is_realtime, active_components=frozenset(components), )Detect pipeline mode, realtime mode, and active components from the raw component references provided at Pipeline construction.
def build_pydantic_args_model(func: Callable[..., Any]) ‑> type[pydantic.main.BaseModel]-
Expand source code
def build_pydantic_args_model(func: Callable[..., Any]) -> type[BaseModel]: """ Dynamically construct a Pydantic BaseModel class representing all valid positional arguments of the given function, complete with types, default values, and docstring descriptions. """ return ModelBuilder(func).construct()Dynamically construct a Pydantic BaseModel class representing all valid positional arguments of the given function, complete with types, default values, and docstring descriptions.
async def cancel_and_wait(task: asyncio.Task | None) ‑> None-
Expand source code
async def cancel_and_wait(task: asyncio.Task | None) -> None: """Cancel a task and wait for it to finish, suppressing CancelledError.""" if not task or task.done(): return task.cancel() try: await task except asyncio.CancelledError: passCancel a task and wait for it to finish, suppressing CancelledError.
def create_generic_mcp_adapter(tool_name: str,
tool_description: str | None,
input_schema: dict,
client_call_function: Callable) ‑> FunctionTool-
Expand source code
def create_generic_mcp_adapter( tool_name: str, tool_description: str | None, input_schema: dict, client_call_function: Callable ) -> FunctionTool: """ Create a generic adapter that converts an MCP tool to a framework FunctionTool. Args: tool_name: Name of the MCP tool tool_description: Description of the MCP tool (if available) input_schema: JSON schema for the tool's input parameters client_call_function: Function to call the tool on the MCP server Returns: A function tool that can be registered with the agent """ required_params = input_schema.get('required', []) param_properties = input_schema.get('properties', {}) docstring = tool_description or f"Call the {tool_name} tool" if param_properties and "Args:" not in docstring: param_docs = "\n\nArgs:\n" for param_name, param_info in param_properties.items(): required = " (required)" if param_name in required_params else "" description = param_info.get('description', f"Parameter for {tool_name}") param_docs += f" {param_name}{required}: {description}\n" docstring += param_docs if not param_properties: @function_tool(name=tool_name) async def no_param_tool() -> Any: return await client_call_function({}) no_param_tool.__doc__ = docstring tool_info_no_param = get_tool_info(no_param_tool) tool_info_no_param.parameters_schema = input_schema return no_param_tool else: @function_tool(name=tool_name) async def param_tool(**kwargs) -> Any: actual_kwargs = kwargs.copy() # Work with a copy if 'instructions' in required_params and 'instructions' not in actual_kwargs: other_params_provided = any(p in actual_kwargs for p in param_properties if p != 'instructions') if other_params_provided: actual_kwargs['instructions'] = f"Execute tool {tool_name} with the provided parameters." missing = [p for p in required_params if p not in actual_kwargs] if missing: missing_str = ", ".join(missing) param_details = [] for param in missing: param_info = param_properties.get(param, {}) desc = param_info.get('description', f"Parameter for {tool_name}") param_details.append(f"'{param}': {desc}") param_help = "; ".join(param_details) raise ToolError( f"Missing required parameters for {tool_name}: {missing_str}. " f"Required parameters: {param_help}" ) return await client_call_function(actual_kwargs) param_tool.__doc__ = docstring tool_info_param = get_tool_info(param_tool) tool_info_param.parameters_schema = input_schema return param_toolCreate a generic adapter that converts an MCP tool to a framework FunctionTool.
Args
tool_name- Name of the MCP tool
tool_description- Description of the MCP tool (if available)
input_schema- JSON schema for the tool's input parameters
client_call_function- Function to call the tool on the MCP server
Returns
A function tool that can be registered with the agent
def function_tool(func: Optional[Callable] = None, *, name: Optional[str] = None)-
Expand source code
def function_tool(func: Optional[Callable] = None, *, name: Optional[str] = None): """Decorator to mark a function as a tool. Can be used with or without parentheses.""" def create_wrapper(fn: Callable) -> FunctionTool: tool_info = FunctionToolInfo( name=name or fn.__name__, description=fn.__doc__ ) if asyncio.iscoroutinefunction(fn): @wraps(fn) async def async_wrapper(*args, **kwargs): return await fn(*args, **kwargs) setattr(async_wrapper, "_tool_info", tool_info) return async_wrapper else: @wraps(fn) def sync_wrapper(*args, **kwargs): return fn(*args, **kwargs) setattr(sync_wrapper, "_tool_info", tool_info) return sync_wrapper if func is None: return create_wrapper return create_wrapper(func)Decorator to mark a function as a tool. Can be used with or without parentheses.
def get_tool_info(tool: FunctionTool) ‑> FunctionToolInfo-
Expand source code
def get_tool_info(tool: FunctionTool) -> FunctionToolInfo: """Get the tool info from a function tool""" if not is_function_tool(tool): raise ValueError("Object is not a function tool") if inspect.ismethod(tool): tool = tool.__func__ return getattr(tool, "_tool_info")Get the tool info from a function tool
async def graceful_cancel(*tasks: asyncio.Task) ‑> None-
Expand source code
async def graceful_cancel(*tasks: asyncio.Task) -> None: """Simple utility to cancel tasks and wait for them to complete""" if not tasks: return for task in tasks: if not task.done(): task.cancel() try: await asyncio.wait_for( asyncio.gather(*tasks, return_exceptions=True), timeout=0.5 ) except asyncio.TimeoutError: passSimple utility to cancel tasks and wait for them to complete
def is_function_tool(obj: Any) ‑> bool-
Expand source code
def is_function_tool(obj: Any) -> bool: """Check if an object is a function tool""" if inspect.ismethod(obj): obj = obj.__func__ return hasattr(obj, "_tool_info")Check if an object is a function tool
async def run_stt(audio_stream: AsyncIterator[bytes]) ‑> AsyncIterator[Any]-
Expand source code
async def run_stt(audio_stream: AsyncIterator[bytes]) -> AsyncIterator[Any]: """ Run STT on an audio stream. Delegates to the STT component's stream_transcribe method. Args: audio_stream: Async iterator of audio bytes Yields: SpeechEvent objects (with text, etc.) """ from .job import get_current_job_context ctx = get_current_job_context() if not ctx or not ctx._pipeline or not ctx._pipeline.stt: raise RuntimeError("No STT component available in current context") async for event in ctx._pipeline.stt.stream_transcribe(audio_stream): yield eventRun STT on an audio stream.
Delegates to the STT component's stream_transcribe method.
Args
audio_stream- Async iterator of audio bytes
Yields
SpeechEvent objects (with text, etc.)
async def run_tts(text_stream: AsyncIterator[str]) ‑> AsyncIterator[bytes]-
Expand source code
async def run_tts(text_stream: AsyncIterator[str]) -> AsyncIterator[bytes]: """ Run TTS on a text stream. Delegates to the TTS component's stream_synthesize method. Args: text_stream: Async iterator of text Yields: Audio bytes """ from .job import get_current_job_context ctx = get_current_job_context() if not ctx or not ctx._pipeline or not ctx._pipeline.tts: raise RuntimeError("No TTS component available in current context") async for frame in ctx._pipeline.tts.stream_synthesize(text_stream): yield frameRun TTS on a text stream.
Delegates to the TTS component's stream_synthesize method.
Args
text_stream- Async iterator of text
Yields
Audio bytes
async def segment_text(chunks: AsyncIterator[str],
delimiters: str = '.?!,;:\n',
keep_delimiter: bool = True,
min_chars: int = 50,
min_words: int = 12,
max_buffer: int = 600) ‑> AsyncIterator[str]-
Expand source code
async def segment_text( chunks: AsyncIterator[str], delimiters: str = ".?!,;:\n", keep_delimiter: bool = True, min_chars: int = 50, min_words: int = 12, max_buffer: int = 600, ) -> AsyncIterator[str]: """ Segment an async stream of text on delimiters or soft boundaries to reduce TTS latency. Yields segments while keeping the delimiter if requested. """ buffer = "" def words_count(s: str) -> int: return len(s.split()) def find_first_delim_index(s: str) -> int: indices = [i for d in delimiters if (i := s.find(d)) != -1] return min(indices) if indices else -1 async for chunk in chunks: if not chunk: continue buffer += chunk while True: di = find_first_delim_index(buffer) if di != -1: seg = buffer[: di + (1 if keep_delimiter else 0)] yield seg buffer = buffer[di + 1 :].lstrip() continue else: if len(buffer) >= max_buffer or words_count(buffer) >= (min_words * 2): target = max(min_chars, min(len(buffer), max_buffer)) cut_idx = buffer.rfind(" ", 0, target) if cut_idx == -1: cut_idx = target seg = buffer[:cut_idx].rstrip() if seg: yield seg buffer = buffer[cut_idx:].lstrip() continue break if buffer: yield bufferSegment an async stream of text on delimiters or soft boundaries to reduce TTS latency. Yields segments while keeping the delimiter if requested.
def simplify_gemini_schema(schema: dict[str, Any]) ‑> dict[str, typing.Any] | None-
Expand source code
def simplify_gemini_schema(schema: dict[str, Any]) -> dict[str, Any] | None: """ Transforms a JSON Schema into Gemini compatible format. """ TYPE_MAPPING: dict[str, types.Type] = { "string": types.Type.STRING, "number": types.Type.NUMBER, "integer": types.Type.INTEGER, "boolean": types.Type.BOOLEAN, "array": types.Type.ARRAY, "object": types.Type.OBJECT, } SUPPORTED_TYPES = set(TYPE_MAPPING.keys()) FIELDS_TO_REMOVE = ("title", "default", "additionalProperties", "$defs", "$schema") def process_node(node: dict[str, Any]) -> dict[str, Any] | None: new_node = node.copy() for field in FIELDS_TO_REMOVE: if field in new_node: del new_node[field] json_type = new_node.get("type") if json_type in SUPPORTED_TYPES: new_node["type"] = TYPE_MAPPING[json_type] node_type = new_node.get("type") if node_type == types.Type.OBJECT: if "properties" in new_node: new_props = {} for key, prop in new_node["properties"].items(): simplified = process_node(prop) if simplified is not None: new_props[key] = simplified new_node["properties"] = new_props if not new_props: return None else: return None elif node_type == types.Type.ARRAY: if "items" in new_node: simplified_items = process_node(new_node["items"]) if simplified_items is not None: new_node["items"] = simplified_items else: del new_node["items"] return new_node result = process_node(schema) if result and result.get("type") == types.Type.OBJECT and not result.get("properties"): return None return resultTransforms a JSON Schema into Gemini compatible format.
Classes
class AgentState (*args, **kwds)-
Expand source code
@enum.unique class AgentState(enum.Enum): """Represents the current state of the agent in a conversation session.""" STARTING = "starting" IDLE = "idle" SPEAKING = "speaking" LISTENING = "listening" THINKING = "thinking" CLOSING = "closing"Represents the current state of the agent in a conversation session.
Ancestors
- enum.Enum
Class variables
var CLOSINGvar IDLEvar LISTENINGvar SPEAKINGvar STARTINGvar THINKING
class AsyncIteratorQueue-
Expand source code
class AsyncIteratorQueue: """An async iterator backed by an asyncio.Queue, allowing producers to put items and consumers to async-iterate until closed.""" def __init__(self): self.queue = asyncio.Queue() self.closed = False async def put(self, item): if not self.closed: await self.queue.put(item) def close(self): self.closed = True self.queue.put_nowait(None) def __aiter__(self): return self async def __anext__(self): item = await self.queue.get() if item is None: raise StopAsyncIteration return itemAn async iterator backed by an asyncio.Queue, allowing producers to put items and consumers to async-iterate until closed.
Methods
def close(self)-
Expand source code
def close(self): self.closed = True self.queue.put_nowait(None) async def put(self, item)-
Expand source code
async def put(self, item): if not self.closed: await self.queue.put(item)
class AudioByteStream (sample_rate: int, num_channels: int, samples_per_channel: int)-
Expand source code
class AudioByteStream: """ Buffers raw PCM bytes and emits fixed-size AudioFrame objects. Useful for converting variable-size chunks (e.g. data-channel payloads) into the steady frame cadence that AvatarSynchronizer expects. """ def __init__(self, sample_rate: int, num_channels: int, samples_per_channel: int): import numpy as _np self._np = _np self._sample_rate = sample_rate self._num_channels = num_channels self._samples_per_channel = samples_per_channel self._sample_width = 2 self._bytes_per_frame = samples_per_channel * num_channels * self._sample_width self._buffer = bytearray() def push(self, frame_data: bytes) -> list: from av import AudioFrame self._buffer.extend(frame_data) frames = [] while len(self._buffer) >= self._bytes_per_frame: chunk = bytes(self._buffer[: self._bytes_per_frame]) del self._buffer[: self._bytes_per_frame] arr = self._np.frombuffer(chunk, dtype=self._np.int16).reshape( self._num_channels, self._samples_per_channel ) frame = AudioFrame.from_ndarray( arr, format="s16", layout="mono" if self._num_channels == 1 else "stereo", ) frame.sample_rate = self._sample_rate frames.append(frame) return frames def flush(self) -> list: from av import AudioFrame if not self._buffer: return [] chunk = bytes(self._buffer) self._buffer.clear() missing = self._bytes_per_frame - len(chunk) if missing > 0: chunk += bytes(missing) arr = self._np.frombuffer(chunk, dtype=self._np.int16).reshape( self._samples_per_channel, self._num_channels ) frame = AudioFrame.from_ndarray( arr.T, format="s16", layout="mono" if self._num_channels == 1 else "stereo", ) frame.sample_rate = self._sample_rate return [frame]Buffers raw PCM bytes and emits fixed-size AudioFrame objects.
Useful for converting variable-size chunks (e.g. data-channel payloads) into the steady frame cadence that AvatarSynchronizer expects.
Methods
def flush(self) ‑> list-
Expand source code
def flush(self) -> list: from av import AudioFrame if not self._buffer: return [] chunk = bytes(self._buffer) self._buffer.clear() missing = self._bytes_per_frame - len(chunk) if missing > 0: chunk += bytes(missing) arr = self._np.frombuffer(chunk, dtype=self._np.int16).reshape( self._samples_per_channel, self._num_channels ) frame = AudioFrame.from_ndarray( arr.T, format="s16", layout="mono" if self._num_channels == 1 else "stereo", ) frame.sample_rate = self._sample_rate return [frame] def push(self, frame_data: bytes) ‑> list-
Expand source code
def push(self, frame_data: bytes) -> list: from av import AudioFrame self._buffer.extend(frame_data) frames = [] while len(self._buffer) >= self._bytes_per_frame: chunk = bytes(self._buffer[: self._bytes_per_frame]) del self._buffer[: self._bytes_per_frame] arr = self._np.frombuffer(chunk, dtype=self._np.int16).reshape( self._num_channels, self._samples_per_channel ) frame = AudioFrame.from_ndarray( arr, format="s16", layout="mono" if self._num_channels == 1 else "stereo", ) frame.sample_rate = self._sample_rate frames.append(frame) return frames
class FieldBuilder (param: inspect.Parameter,
type_processor: TypeProcessor,
description: str | None)-
Expand source code
class FieldBuilder: """Builds a Pydantic field tuple from a function parameter, its processed type, and docstring description.""" def __init__(self, param: inspect.Parameter, type_processor: TypeProcessor, description: str | None): self.param = param self.type_processor = type_processor self.description = description def build(self) -> tuple[Any, FieldInfo]: field_info = self.type_processor.field_info self._apply_default_if_needed(field_info) self._apply_description_if_needed(field_info) return (self.type_processor.base_type, field_info) def _apply_default_if_needed(self, field_info: FieldInfo) -> None: if (self.param.default is not inspect.Parameter.empty and field_info.default is PydanticUndefined): field_info.default = self.param.default def _apply_description_if_needed(self, field_info: FieldInfo) -> None: if field_info.description is None and self.description is not None: field_info.description = self.descriptionBuilds a Pydantic field tuple from a function parameter, its processed type, and docstring description.
Methods
def build(self) ‑> tuple[typing.Any, pydantic.fields.FieldInfo]-
Expand source code
def build(self) -> tuple[Any, FieldInfo]: field_info = self.type_processor.field_info self._apply_default_if_needed(field_info) self._apply_description_if_needed(field_info) return (self.type_processor.base_type, field_info)
class FunctionTool (*args, **kwargs)-
Expand source code
@runtime_checkable class FunctionTool(Protocol): @property @abstractmethod def _tool_info(self) -> "FunctionToolInfo": ... def __call__(self, *args: Any, **kwargs: Any) -> Any: ...Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: ...Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type checkSee PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto[T](Protocol): def meth(self) -> T: ...Ancestors
- typing.Protocol
- typing.Generic
class FunctionToolInfo (name: str,
description: str | None = None,
parameters_schema: Optional[dict] = None)-
Expand source code
@dataclass class FunctionToolInfo: """Holds metadata about a function tool, including its name, description, and parameter schema.""" name: str description: str | None = None parameters_schema: Optional[dict] = NoneHolds metadata about a function tool, including its name, description, and parameter schema.
Instance variables
var description : str | Nonevar name : strvar parameters_schema : dict | None
class ModelBuilder (func: Callable[..., Any])-
Expand source code
class ModelBuilder: """Constructs a Pydantic BaseModel from a function's signature, type hints, and docstring.""" def __init__(self, func: Callable[..., Any]): self.func = func self.sig = inspect.signature(func) self.hints = get_type_hints(func, include_extras=True) self.docs = parse_from_object(func) self._field_registry = {} def construct(self) -> type[BaseModel]: self._register_all_fields() return create_model(self._generate_class_name(), **self._field_registry) def _register_all_fields(self) -> None: for name, param in self.sig.parameters.items(): if self._should_skip_param(name): continue self._register_field(name, param) def _should_skip_param(self, name: str) -> bool: return name in ("self", "cls") or name not in self.hints def _register_field(self, name: str, param: inspect.Parameter) -> None: type_info = TypeProcessor(self.hints[name]) field_builder = FieldBuilder(param, type_info, self._find_doc_description(name)) self._field_registry[name] = field_builder.build() def _find_doc_description(self, param_name: str) -> str | None: return next( (p.description for p in self.docs.params if p.arg_name == param_name), None ) def _generate_class_name(self) -> str: return "".join(part.title() for part in self.func.__name__.split("_")) + "Args"Constructs a Pydantic BaseModel from a function's signature, type hints, and docstring.
Methods
def construct(self) ‑> type[pydantic.main.BaseModel]-
Expand source code
def construct(self) -> type[BaseModel]: self._register_all_fields() return create_model(self._generate_class_name(), **self._field_registry)
class PipelineComponent (*args, **kwds)-
Expand source code
@enum.unique class PipelineComponent(enum.Enum): """Identifiers for each component slot in the pipeline.""" STT = "stt" LLM = "llm" TTS = "tts" VAD = "vad" TURN_DETECTOR = "turn_detector" AVATAR = "avatar" DENOISE = "denoise" REALTIME_MODEL = "realtime_model"Identifiers for each component slot in the pipeline.
Ancestors
- enum.Enum
Class variables
var AVATARvar DENOISEvar LLMvar REALTIME_MODELvar STTvar TTSvar TURN_DETECTORvar VAD
class PipelineConfig (pipeline_mode: PipelineMode,
realtime_mode: RealtimeMode | None,
is_realtime: bool,
active_components: frozenset[PipelineComponent])-
Expand source code
@dataclass(frozen=True) class PipelineConfig: """ Immutable snapshot of the pipeline's detected configuration. Computed once during Pipeline.__init__ and accessible everywhere via pipeline.config. """ pipeline_mode: PipelineMode realtime_mode: RealtimeMode | None is_realtime: bool active_components: frozenset[PipelineComponent] def has_component(self, component: PipelineComponent) -> bool: """Check whether a specific component is present.""" return component in self.active_components @property def component_names(self) -> list[str]: """Return sorted list of active component value strings (for metrics/logging).""" return sorted(c.value for c in self.active_components)Immutable snapshot of the pipeline's detected configuration. Computed once during Pipeline.init and accessible everywhere via pipeline.config.
Instance variables
var active_components : frozenset[PipelineComponent]prop component_names : list[str]-
Expand source code
@property def component_names(self) -> list[str]: """Return sorted list of active component value strings (for metrics/logging).""" return sorted(c.value for c in self.active_components)Return sorted list of active component value strings (for metrics/logging).
var is_realtime : boolvar pipeline_mode : PipelineModevar realtime_mode : RealtimeMode | None
Methods
def has_component(self,
component: PipelineComponent) ‑> bool-
Expand source code
def has_component(self, component: PipelineComponent) -> bool: """Check whether a specific component is present.""" return component in self.active_componentsCheck whether a specific component is present.
class PipelineMode (*args, **kwds)-
Expand source code
@enum.unique class PipelineMode(enum.Enum): """The overall pipeline architecture mode based on which components are present.""" REALTIME = "realtime" FULL_CASCADING = "full_cascading" LLM_TTS_ONLY = "llm_tts_only" STT_LLM_ONLY = "stt_llm_only" LLM_ONLY = "llm_only" STT_ONLY = "stt_only" TTS_ONLY = "tts_only" STT_TTS_ONLY = "stt_tts_only" HYBRID = "hybrid" PARTIAL_CASCADING = "partial_cascading"The overall pipeline architecture mode based on which components are present.
Ancestors
- enum.Enum
Class variables
var FULL_CASCADINGvar HYBRIDvar LLM_ONLYvar LLM_TTS_ONLYvar PARTIAL_CASCADINGvar REALTIMEvar STT_LLM_ONLYvar STT_ONLYvar STT_TTS_ONLYvar TTS_ONLY
class RawFunctionTool (*args, **kwargs)-
Expand source code
class RawFunctionTool(Protocol): """Protocol for raw function tool without framework wrapper""" def __call__(self, *args: Any, **kwargs: Any) -> Any: ...Protocol for raw function tool without framework wrapper
Ancestors
- typing.Protocol
- typing.Generic
class RealtimeMode (*args, **kwds)-
Expand source code
@enum.unique class RealtimeMode(enum.Enum): """The realtime sub-mode when a RealtimeBaseModel is used as the LLM.""" FULL_S2S = "full_s2s" HYBRID_STT = "hybrid_stt" HYBRID_TTS = "hybrid_tts" LLM_ONLY = "llm_only"The realtime sub-mode when a RealtimeBaseModel is used as the LLM.
Ancestors
- enum.Enum
Class variables
var FULL_S2Svar HYBRID_STTvar HYBRID_TTSvar LLM_ONLY
class ToolError (*args, **kwargs)-
Expand source code
class ToolError(Exception): """Exception raised when a tool execution fails""" passException raised when a tool execution fails
Ancestors
- builtins.Exception
- builtins.BaseException
class TypeProcessor (hint: Any)-
Expand source code
class TypeProcessor: """Extracts the base type and Pydantic FieldInfo from a possibly Annotated type hint.""" def __init__(self, hint: Any): self.original_hint = hint self.base_type = hint self.field_info = Field() self._process_annotation() def _process_annotation(self) -> None: if get_origin(self.original_hint) is Annotated: args = get_args(self.original_hint) self.base_type = args[0] self._extract_field_info_from_args(args[1:]) def _extract_field_info_from_args(self, args: tuple) -> None: for arg in args: if isinstance(arg, FieldInfo): self.field_info = arg breakExtracts the base type and Pydantic FieldInfo from a possibly Annotated type hint.
class UserState (*args, **kwds)-
Expand source code
@enum.unique class UserState(enum.Enum): """Represents the current state of the user in a conversation session.""" IDLE = "idle" SPEAKING = "speaking" LISTENING = "listening"Represents the current state of the user in a conversation session.
Ancestors
- enum.Enum
Class variables
var IDLEvar LISTENINGvar SPEAKING