Package agents
Sub-modules
agents.a2a
agents.agent
agents.agent_session
agents.backend
-
Backend communication module for VideoSDK Agents …
agents.cascading_pipeline
agents.cli
agents.console_mode
agents.conversation_flow
agents.debug
agents.denoise
agents.eou
agents.event_bus
agents.event_emitter
agents.execution
-
Execution module for VideoSDK Agents …
agents.images
agents.init_config
-
Agent initialization configuration utilities …
agents.job
agents.llm
agents.mcp
agents.metrics
agents.pipeline
agents.realtime_base_model
agents.realtime_pipeline
agents.room
agents.stt
agents.tts
agents.utils
agents.vad
agents.worker
Functions
def build_gemini_schema(function_tool: FunctionTool) ‑> google.genai.types.FunctionDeclaration
-
Expand source code
def build_gemini_schema(function_tool: FunctionTool) -> types.FunctionDeclaration: """Build Gemini-compatible schema from a function tool""" tool_info = get_tool_info(function_tool) parameter_json_schema_for_gemini: Optional[dict[str, Any]] = None if tool_info.parameters_schema is not None: if tool_info.parameters_schema and tool_info.parameters_schema.get("properties", True) is not None: simplified_schema = simplify_gemini_schema(tool_info.parameters_schema) parameter_json_schema_for_gemini = simplified_schema else: openai_schema = build_openai_schema(function_tool) if openai_schema.get("parameters") and openai_schema["parameters"].get("properties", True) is not None: simplified_schema = simplify_gemini_schema(openai_schema["parameters"]) parameter_json_schema_for_gemini = simplified_schema func_declaration = types.FunctionDeclaration( name=tool_info.name, description=tool_info.description or "", parameters=parameter_json_schema_for_gemini ) return func_declaration
Build Gemini-compatible schema from a function tool
def build_nova_sonic_schema(function_tool: FunctionTool) ‑> dict[str, typing.Any]
-
Expand source code
def build_nova_sonic_schema(function_tool: FunctionTool) -> dict[str, Any]: """Build Amazon Nova Sonic-compatible schema from a function tool""" tool_info = get_tool_info(function_tool) params_schema_to_use: Optional[dict] = None if tool_info.parameters_schema is not None: params_schema_to_use = tool_info.parameters_schema else: model = build_pydantic_args_model(function_tool) params_schema_to_use = model.model_json_schema() final_params_schema_for_nova = params_schema_to_use if params_schema_to_use is not None else {"type": "object", "properties": {}} input_schema_json_string = json.dumps(final_params_schema_for_nova) description = tool_info.description or tool_info.name return { "toolSpec": { "name": tool_info.name, "description": description, "inputSchema": { "json": input_schema_json_string } } }
Build Amazon Nova Sonic-compatible schema from a function tool
def build_openai_schema(function_tool: FunctionTool) ‑> dict[str, typing.Any]
-
Expand source code
def build_openai_schema(function_tool: FunctionTool) -> dict[str, Any]: """Build OpenAI-compatible schema from a function tool""" tool_info = get_tool_info(function_tool) params_schema_to_use: Optional[dict] = None if tool_info.parameters_schema is not None: params_schema_to_use = tool_info.parameters_schema else: model = build_pydantic_args_model(function_tool) params_schema_to_use = model.model_json_schema() final_params_schema = params_schema_to_use if params_schema_to_use is not None else {"type": "object", "properties": {}} return { "name": tool_info.name, "description": tool_info.description or "", "parameters": final_params_schema, "type": "function", }
Build OpenAI-compatible schema from a function tool
def encode(frame: av.VideoFrame,
options: EncodeOptions) ‑> bytes-
Expand source code
def encode(frame: av.VideoFrame, options: EncodeOptions) -> bytes: """Encode with optimized pipeline""" img = frame.to_image() if options.resize_options: img = img.resize( (options.resize_options.width, options.resize_options.height), resample=PILImage.Resampling.LANCZOS ) buffer = io.BytesIO() img.save(buffer, format=options.format, quality=options.quality, optimize=True, subsampling=0, qtables="web_high" ) return buffer.getvalue()
Encode with optimized pipeline
def function_tool(func: Optional[Callable] = None, *, name: Optional[str] = None)
-
Expand source code
def function_tool(func: Optional[Callable] = None, *, name: Optional[str] = None): """Decorator to mark a function as a tool. Can be used with or without parentheses.""" def create_wrapper(fn: Callable) -> FunctionTool: tool_info = FunctionToolInfo( name=name or fn.__name__, description=fn.__doc__ ) if asyncio.iscoroutinefunction(fn): @wraps(fn) async def async_wrapper(*args, **kwargs): return await fn(*args, **kwargs) setattr(async_wrapper, "_tool_info", tool_info) return async_wrapper else: @wraps(fn) def sync_wrapper(*args, **kwargs): return fn(*args, **kwargs) setattr(sync_wrapper, "_tool_info", tool_info) return sync_wrapper if func is None: return create_wrapper return create_wrapper(func)
Decorator to mark a function as a tool. Can be used with or without parentheses.
def get_tool_info(tool: FunctionTool) ‑> FunctionToolInfo
-
Expand source code
def get_tool_info(tool: FunctionTool) -> FunctionToolInfo: """Get the tool info from a function tool""" if not is_function_tool(tool): raise ValueError("Object is not a function tool") if inspect.ismethod(tool): tool = tool.__func__ return getattr(tool, "_tool_info")
Get the tool info from a function tool
def is_function_tool(obj: Any) ‑> bool
-
Expand source code
def is_function_tool(obj: Any) -> bool: """Check if an object is a function tool""" if inspect.ismethod(obj): obj = obj.__func__ return hasattr(obj, "_tool_info")
Check if an object is a function tool
async def segment_text(chunks: AsyncIterator[str],
delimiters: str = '.?!,;:\n',
keep_delimiter: bool = True,
min_chars: int = 50,
min_words: int = 12,
max_buffer: int = 600) ‑> AsyncIterator[str]-
Expand source code
async def segment_text( chunks: AsyncIterator[str], delimiters: str = ".?!,;:\n", keep_delimiter: bool = True, min_chars: int = 50, min_words: int = 12, max_buffer: int = 600, ) -> AsyncIterator[str]: """ Segment an async stream of text on delimiters or soft boundaries to reduce TTS latency. Yields segments while keeping the delimiter if requested. """ buffer = "" def words_count(s: str) -> int: return len(s.split()) def find_first_delim_index(s: str) -> int: indices = [i for d in delimiters if (i := s.find(d)) != -1] return min(indices) if indices else -1 async for chunk in chunks: if not chunk: continue buffer += chunk while True: di = find_first_delim_index(buffer) if di != -1: seg = buffer[: di + (1 if keep_delimiter else 0)] yield seg buffer = buffer[di + 1 :].lstrip() continue else: if len(buffer) >= max_buffer or words_count(buffer) >= (min_words * 2): target = max(min_chars, min(len(buffer), max_buffer)) cut_idx = buffer.rfind(" ", 0, target) if cut_idx == -1: cut_idx = target seg = buffer[:cut_idx].rstrip() if seg: yield seg buffer = buffer[cut_idx:].lstrip() continue break if buffer: yield buffer
Segment an async stream of text on delimiters or soft boundaries to reduce TTS latency. Yields segments while keeping the delimiter if requested.
def setup_logging(level=20)
-
Expand source code
def setup_logging(level=logging.INFO): """Setup logging configuration for videosdk-agents.""" # Create a formatter formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) # Setup console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) # Get the logger for videosdk.agents logger = logging.getLogger("videosdk.agents") logger.setLevel(level) # Remove existing handlers to avoid duplicates for handler in logger.handlers[:]: logger.removeHandler(handler) # Add our handler logger.addHandler(console_handler) # Prevent propagation to root logger to avoid duplicate messages logger.propagate = False return logger
Setup logging configuration for videosdk-agents.
Classes
class A2AMessage (from_agent: str,
to_agent: str,
type: str,
content: Dict[str, Any],
id: str = <factory>,
timestamp: float = <factory>,
metadata: Dict[str, Any] | None = None)-
Expand source code
@dataclass class A2AMessage: """ Message format for agent-to-agent communication. Attributes: from_agent (str): ID of the agent sending the message. to_agent (str): ID of the agent receiving the message. type (str): Type/category of the message (e.g., "model_query", "model_response"). content (Dict[str, Any]): The actual message content and data. id (str): Unique identifier for the message. Auto-generated if not provided. timestamp (float): Unix timestamp when the message was created. metadata (Optional[Dict[str, Any]]): Additional message metadata. """ from_agent: str to_agent: str type: str content: Dict[str, Any] id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: float = field(default_factory=time.time) metadata: Optional[Dict[str, Any]] = None
Message format for agent-to-agent communication.
Attributes
from_agent
:str
- ID of the agent sending the message.
to_agent
:str
- ID of the agent receiving the message.
type
:str
- Type/category of the message (e.g., "model_query", "model_response").
content
:Dict[str, Any]
- The actual message content and data.
id
:str
- Unique identifier for the message. Auto-generated if not provided.
timestamp
:float
- Unix timestamp when the message was created.
metadata
:Optional[Dict[str, Any]]
- Additional message metadata.
Instance variables
var content : Dict[str, Any]
var from_agent : str
var id : str
var metadata : Dict[str, Any] | None
var timestamp : float
var to_agent : str
var type : str
class Agent (instructions: str,
tools: List[FunctionTool] = None,
agent_id: str = None,
mcp_servers: List[MCPServiceProvider] = None)-
Expand source code
class Agent(EventEmitter[Literal["agent_started"]], ABC): """ Abstract base class for creating custom agents. Inherits from EventEmitter to handle agent events and state updates. """ def __init__(self, instructions: str, tools: List[FunctionTool] = None, agent_id: str = None, mcp_servers: List[MCPServiceProvider] = None): super().__init__() self._tools = tools self._llm = None self._stt = None self._tts = None self.chat_context = ChatContext.empty() self.instructions = instructions self._tools = tools if tools else [] self._mcp_servers = mcp_servers if mcp_servers else [] self._mcp_initialized = False self._register_class_tools() self.register_tools() self.a2a = A2AProtocol(self) self._agent_card = None self.id = agent_id or str(uuid.uuid4()) self.mcp_manager = MCPToolManager() def _register_class_tools(self) -> None: """Internal Method: Register all function tools defined in the class""" for name, attr in inspect.getmembers(self): if is_function_tool(attr): self._tools.append(attr) @property def instructions(self) -> str: """Get the instructions for the agent""" return self._instructions @instructions.setter def instructions(self, value: str) -> None: """Set the instructions for the agent""" self._instructions = value self.chat_context.add_message( role=ChatRole.SYSTEM, content=value ) @property def tools(self) -> List[FunctionTool]: """Get the tools for the agent""" return self._tools def register_tools(self) -> None: """Internal Method: Register external function tools for the agent""" for tool in self._tools: if not is_function_tool(tool): raise ValueError(f"Tool {tool.__name__ if hasattr(tool, '__name__') else tool} is not a valid FunctionTool") async def initialize_mcp(self) -> None: """Internal Method: Initialize the agent, including any MCP server if provided.""" if self._mcp_servers and not self._mcp_initialized: for server in self._mcp_servers: await self.add_server(server) self._mcp_initialized = True async def add_server(self, mcp_server: MCPServiceProvider) -> None: """Internal Method: Initialize the MCP server and register the tools""" await self.mcp_manager.add_mcp_server(mcp_server) self._tools.extend(self.mcp_manager.tools) @abstractmethod async def on_enter(self) -> None: """Called when session starts, to be implemented in your custom agent implementation.""" pass async def register_a2a(self, card: AgentCard) -> None: """ Register the agent for A2A communication""" self._agent_card = card await self.a2a.register(card) async def unregister_a2a(self) -> None: """Unregister the agent from A2A communication""" await self.a2a.unregister() self._agent_card = None @abstractmethod async def on_exit(self) -> None: """Called when session ends, to be implemented in your custom agent implementation.""" pass
Abstract base class for creating custom agents. Inherits from EventEmitter to handle agent events and state updates.
Ancestors
- EventEmitter
- typing.Generic
- abc.ABC
Instance variables
prop instructions : str
-
Expand source code
@property def instructions(self) -> str: """Get the instructions for the agent""" return self._instructions
Get the instructions for the agent
prop tools : List[FunctionTool]
-
Expand source code
@property def tools(self) -> List[FunctionTool]: """Get the tools for the agent""" return self._tools
Get the tools for the agent
Methods
async def add_server(self, mcp_server: MCPServiceProvider) ‑> None
-
Expand source code
async def add_server(self, mcp_server: MCPServiceProvider) -> None: """Internal Method: Initialize the MCP server and register the tools""" await self.mcp_manager.add_mcp_server(mcp_server) self._tools.extend(self.mcp_manager.tools)
Internal Method: Initialize the MCP server and register the tools
async def initialize_mcp(self) ‑> None
-
Expand source code
async def initialize_mcp(self) -> None: """Internal Method: Initialize the agent, including any MCP server if provided.""" if self._mcp_servers and not self._mcp_initialized: for server in self._mcp_servers: await self.add_server(server) self._mcp_initialized = True
Internal Method: Initialize the agent, including any MCP server if provided.
async def on_enter(self) ‑> None
-
Expand source code
@abstractmethod async def on_enter(self) -> None: """Called when session starts, to be implemented in your custom agent implementation.""" pass
Called when session starts, to be implemented in your custom agent implementation.
async def on_exit(self) ‑> None
-
Expand source code
@abstractmethod async def on_exit(self) -> None: """Called when session ends, to be implemented in your custom agent implementation.""" pass
Called when session ends, to be implemented in your custom agent implementation.
async def register_a2a(self,
card: AgentCard) ‑> None-
Expand source code
async def register_a2a(self, card: AgentCard) -> None: """ Register the agent for A2A communication""" self._agent_card = card await self.a2a.register(card)
Register the agent for A2A communication
def register_tools(self) ‑> None
-
Expand source code
def register_tools(self) -> None: """Internal Method: Register external function tools for the agent""" for tool in self._tools: if not is_function_tool(tool): raise ValueError(f"Tool {tool.__name__ if hasattr(tool, '__name__') else tool} is not a valid FunctionTool")
Internal Method: Register external function tools for the agent
async def unregister_a2a(self) ‑> None
-
Expand source code
async def unregister_a2a(self) -> None: """Unregister the agent from A2A communication""" await self.a2a.unregister() self._agent_card = None
Unregister the agent from A2A communication
class AgentCard (id: str,
name: str,
domain: str,
capabilities: List[str],
description: str,
metadata: Dict[str, Any] | None = None)-
Expand source code
@dataclass class AgentCard: """ Represents an agent's capabilities and identity for agent-to-agent communication. Attributes: id (str): Unique identifier for the agent. Auto-generated if not provided. name (str): Human-readable name of the agent. domain (str): The domain or category this agent specializes in. capabilities (List[str]): List of capabilities this agent can perform. description (str): Detailed description of the agent's purpose and functionality. metadata (Optional[Dict[str, Any]]): Additional custom metadata for the agent. """ id: str name: str domain: str capabilities: List[str] description: str metadata: Optional[Dict[str, Any]] = None def __post_init__(self): """ Internal method: Automatically generates a UUID if no ID is provided. """ if not self.id: self.id = str(uuid.uuid4())
Represents an agent's capabilities and identity for agent-to-agent communication.
Attributes
id
:str
- Unique identifier for the agent. Auto-generated if not provided.
name
:str
- Human-readable name of the agent.
domain
:str
- The domain or category this agent specializes in.
capabilities
:List[str]
- List of capabilities this agent can perform.
description
:str
- Detailed description of the agent's purpose and functionality.
metadata
:Optional[Dict[str, Any]]
- Additional custom metadata for the agent.
Instance variables
var capabilities : List[str]
var description : str
var domain : str
var id : str
var metadata : Dict[str, Any] | None
var name : str
class AgentSession (agent: Agent,
pipeline: Pipeline,
conversation_flow: Optional[ConversationFlow] = None,
wake_up: Optional[int] = None)-
Expand source code
class AgentSession: """ Manages an agent session with its associated conversation flow and pipeline. """ def __init__( self, agent: Agent, pipeline: Pipeline, conversation_flow: Optional[ConversationFlow] = None, wake_up: Optional[int] = None, ) -> None: """ Initialize an agent session. Args: agent: Instance of an Agent class that handles the core logic pipeline: Pipeline instance to process the agent's operations conversation_flow: ConversationFlow instance to manage conversation state wake_up: Time in seconds after which to trigger wake-up callback if no speech detected """ self.agent = agent self.pipeline = pipeline self.conversation_flow = conversation_flow self.agent.session = self self.wake_up = wake_up self.on_wake_up: Optional[Callable[[], None] | Callable[[], Any]] = None self._wake_up_task: Optional[asyncio.Task] = None self._wake_up_timer_active = False self._closed: bool = False if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) if ( hasattr(self.pipeline, "set_conversation_flow") and self.conversation_flow is not None ): self.pipeline.set_conversation_flow(self.conversation_flow) if hasattr(self.pipeline, 'set_wake_up_callback'): self.pipeline.set_wake_up_callback(self._reset_wake_up_timer) try: job_ctx = get_current_job_context() if job_ctx: job_ctx.add_shutdown_callback(self.close) except Exception: pass def _start_wake_up_timer(self) -> None: if self.wake_up is not None and self.on_wake_up is not None: self._wake_up_timer_active = True self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop()) def _reset_wake_up_timer(self) -> None: if self.wake_up is not None and self.on_wake_up is not None: if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() if self._wake_up_timer_active: self._wake_up_task = asyncio.create_task(self._wake_up_timer_loop()) def _cancel_wake_up_timer(self) -> None: if self._wake_up_task and not self._wake_up_task.done(): self._wake_up_task.cancel() self._wake_up_timer_active = False async def _wake_up_timer_loop(self) -> None: try: await asyncio.sleep(self.wake_up) if self._wake_up_timer_active and self.on_wake_up: if asyncio.iscoroutinefunction(self.on_wake_up): await self.on_wake_up() else: self.on_wake_up() except asyncio.CancelledError: pass async def start(self, **kwargs: Any) -> None: """ Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set) Args: **kwargs: Additional arguments to pass to the pipeline start method """ await self.agent.initialize_mcp() if isinstance(self.pipeline, RealTimePipeline): await realtime_metrics_collector.start_session(self.agent, self.pipeline) else: traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: config_attributes = { "system_instructions": self.agent.instructions, "function_tools": [ get_tool_info(tool).name for tool in ( [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools] if self.agent.mcp_manager else self.agent.tools ) ] if self.agent.tools else [], "mcp_tools": [ get_tool_info(tool).name for tool in self.agent.mcp_manager.tools ] if self.agent.mcp_manager else [], "pipeline": self.pipeline.__class__.__name__, **({ "stt_provider": self.pipeline.stt.__class__.__name__ if self.pipeline.stt else None, "tts_provider": self.pipeline.tts.__class__.__name__ if self.pipeline.tts else None, "llm_provider": self.pipeline.llm.__class__.__name__ if self.pipeline.llm else None, "vad_provider": self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else None, "eou_provider": self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None, "stt_model": self.pipeline.get_component_configs()['stt'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.stt else None, "llm_model": self.pipeline.get_component_configs()['llm'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.llm else None, "tts_model": self.pipeline.get_component_configs()['tts'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.tts else None, "vad_model": self.pipeline.get_component_configs()['vad'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'vad') and self.pipeline.vad else None, "eou_model": self.pipeline.get_component_configs()['eou'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None } if self.pipeline.__class__.__name__ == "CascadingPipeline" else {}), } start_time = time.perf_counter() config_attributes["start_time"] = start_time await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({"start_time": start_time}) if self.pipeline.__class__.__name__ == "CascadingPipeline": configs = self.pipeline.get_component_configs() if hasattr(self.pipeline, 'get_component_configs') else {} cascading_metrics_collector.set_provider_info( llm_provider=self.pipeline.llm.__class__.__name__ if self.pipeline.llm else "", llm_model=configs.get('llm', {}).get('model', "") if self.pipeline.llm else "", stt_provider=self.pipeline.stt.__class__.__name__ if self.pipeline.stt else "", stt_model=configs.get('stt', {}).get('model', "") if self.pipeline.stt else "", tts_provider=self.pipeline.tts.__class__.__name__ if self.pipeline.tts else "", tts_model=configs.get('tts', {}).get('model', "") if self.pipeline.tts else "", vad_provider=self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "", vad_model=configs.get('vad', {}).get('model', "") if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "", eou_provider=self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "", eou_model=configs.get('eou', {}).get('model', "") if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "" ) if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) await self.pipeline.start() await self.agent.on_enter() global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer() async def say(self, message: str) -> None: """ Send an initial message to the agent. """ if not isinstance(self.pipeline, RealTimePipeline): traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_say_called(message) self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message) await self.pipeline.send_message(message) async def close(self) -> None: """ Close the agent session. """ if self._closed: logger.info("Agent session already closed") return self._closed = True if isinstance(self.pipeline, RealTimePipeline): realtime_metrics_collector.finalize_session() traces_flow_manager = realtime_metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() else: traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() self._cancel_wake_up_timer() await self.agent.on_exit() await self.pipeline.cleanup() async def leave(self) -> None: """ Leave the agent session. """ await self.pipeline.leave()
Manages an agent session with its associated conversation flow and pipeline.
Initialize an agent session.
Args
agent
- Instance of an Agent class that handles the core logic
pipeline
- Pipeline instance to process the agent's operations
conversation_flow
- ConversationFlow instance to manage conversation state
wake_up
- Time in seconds after which to trigger wake-up callback if no speech detected
Methods
async def close(self) ‑> None
-
Expand source code
async def close(self) -> None: """ Close the agent session. """ if self._closed: logger.info("Agent session already closed") return self._closed = True if isinstance(self.pipeline, RealTimePipeline): realtime_metrics_collector.finalize_session() traces_flow_manager = realtime_metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() else: traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: start_time = time.perf_counter() await traces_flow_manager.start_agent_session_closed({"start_time": start_time}) traces_flow_manager.end_agent_session_closed() self._cancel_wake_up_timer() await self.agent.on_exit() await self.pipeline.cleanup()
Close the agent session.
async def leave(self) ‑> None
-
Expand source code
async def leave(self) -> None: """ Leave the agent session. """ await self.pipeline.leave()
Leave the agent session.
async def say(self, message: str) ‑> None
-
Expand source code
async def say(self, message: str) -> None: """ Send an initial message to the agent. """ if not isinstance(self.pipeline, RealTimePipeline): traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: traces_flow_manager.agent_say_called(message) self.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message) await self.pipeline.send_message(message)
Send an initial message to the agent.
async def start(self, **kwargs: Any) ‑> None
-
Expand source code
async def start(self, **kwargs: Any) -> None: """ Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set) Args: **kwargs: Additional arguments to pass to the pipeline start method """ await self.agent.initialize_mcp() if isinstance(self.pipeline, RealTimePipeline): await realtime_metrics_collector.start_session(self.agent, self.pipeline) else: traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: config_attributes = { "system_instructions": self.agent.instructions, "function_tools": [ get_tool_info(tool).name for tool in ( [tool for tool in self.agent.tools if tool not in self.agent.mcp_manager.tools] if self.agent.mcp_manager else self.agent.tools ) ] if self.agent.tools else [], "mcp_tools": [ get_tool_info(tool).name for tool in self.agent.mcp_manager.tools ] if self.agent.mcp_manager else [], "pipeline": self.pipeline.__class__.__name__, **({ "stt_provider": self.pipeline.stt.__class__.__name__ if self.pipeline.stt else None, "tts_provider": self.pipeline.tts.__class__.__name__ if self.pipeline.tts else None, "llm_provider": self.pipeline.llm.__class__.__name__ if self.pipeline.llm else None, "vad_provider": self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else None, "eou_provider": self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None, "stt_model": self.pipeline.get_component_configs()['stt'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.stt else None, "llm_model": self.pipeline.get_component_configs()['llm'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.llm else None, "tts_model": self.pipeline.get_component_configs()['tts'].get('model') if hasattr(self.pipeline, 'get_component_configs') and self.pipeline.tts else None, "vad_model": self.pipeline.get_component_configs()['vad'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'vad') and self.pipeline.vad else None, "eou_model": self.pipeline.get_component_configs()['eou'].get('model') if hasattr(self.pipeline, 'get_component_configs') and hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else None } if self.pipeline.__class__.__name__ == "CascadingPipeline" else {}), } start_time = time.perf_counter() config_attributes["start_time"] = start_time await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({"start_time": start_time}) if self.pipeline.__class__.__name__ == "CascadingPipeline": configs = self.pipeline.get_component_configs() if hasattr(self.pipeline, 'get_component_configs') else {} cascading_metrics_collector.set_provider_info( llm_provider=self.pipeline.llm.__class__.__name__ if self.pipeline.llm else "", llm_model=configs.get('llm', {}).get('model', "") if self.pipeline.llm else "", stt_provider=self.pipeline.stt.__class__.__name__ if self.pipeline.stt else "", stt_model=configs.get('stt', {}).get('model', "") if self.pipeline.stt else "", tts_provider=self.pipeline.tts.__class__.__name__ if self.pipeline.tts else "", tts_model=configs.get('tts', {}).get('model', "") if self.pipeline.tts else "", vad_provider=self.pipeline.vad.__class__.__name__ if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "", vad_model=configs.get('vad', {}).get('model', "") if hasattr(self.pipeline, 'vad') and self.pipeline.vad else "", eou_provider=self.pipeline.turn_detector.__class__.__name__ if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "", eou_model=configs.get('eou', {}).get('model', "") if hasattr(self.pipeline, 'turn_detector') and self.pipeline.turn_detector else "" ) if hasattr(self.pipeline, 'set_agent'): self.pipeline.set_agent(self.agent) await self.pipeline.start() await self.agent.on_enter() global_event_emitter.emit("AGENT_STARTED", {"session": self}) if self.on_wake_up is not None: self._start_wake_up_timer()
Start the agent session. This will: 1. Initialize the agent (including MCP tools if configured) 2. Call the agent's on_enter hook 3. Start the pipeline processing 4. Start wake-up timer if configured (but only if callback is set)
Args
**kwargs
- Additional arguments to pass to the pipeline start method
class CascadingPipeline (stt: STT | None = None,
llm: LLM | None = None,
tts: TTS | None = None,
vad: VAD | None = None,
turn_detector: EOU | None = None,
avatar: Any | None = None,
denoise: Denoise | None = None)-
Expand source code
class CascadingPipeline(Pipeline, EventEmitter[Literal["error"]]): """ Cascading pipeline implementation that processes data in sequence (STT -> LLM -> TTS). Inherits from Pipeline base class and adds cascade-specific events. """ def __init__( self, stt: STT | None = None, llm: LLM | None = None, tts: TTS | None = None, vad: VAD | None = None, turn_detector: EOU | None = None, avatar: Any | None = None, denoise: Denoise | None = None, ) -> None: """ Initialize the cascading pipeline. Args: stt: Speech-to-Text processor (optional) llm: Language Model processor (optional) tts: Text-to-Speech processor (optional) vad: Voice Activity Detector (optional) turn_detector: Turn Detector (optional) avatar: Avatar (optional) denoise: Denoise (optional) """ self.stt = stt self.llm = llm self.tts = tts self.vad = vad self.turn_detector = turn_detector self.agent = None self.conversation_flow = None self.avatar = avatar if self.stt: self.stt.on( "error", lambda *args: self.on_component_error( "STT", args[0] if args else "Unknown error" ), ) if self.llm: self.llm.on( "error", lambda *args: self.on_component_error( "LLM", args[0] if args else "Unknown error" ), ) if self.tts: self.tts.on( "error", lambda *args: self.on_component_error( "TTS", args[0] if args else "Unknown error" ), ) if self.vad: self.vad.on( "error", lambda *args: self.on_component_error( "VAD", args[0] if args else "Unknown error" ), ) if self.turn_detector: self.turn_detector.on( "error", lambda *args: self.on_component_error( "TURN-D", args[0] if args else "Unknown error" ), ) self.denoise = denoise super().__init__() def set_agent(self, agent: Agent) -> None: self.agent = agent def _configure_components(self) -> None: if self.loop and self.tts: self.tts.loop = self.loop logger.info("TTS loop configured") job_context = get_current_job_context() if self.avatar and job_context and job_context.room: self.tts.audio_track = ( getattr(job_context.room, "agent_audio_track", None) or job_context.room.audio_track ) logger.info( f"TTS audio track configured from room (avatar mode)") elif hasattr(self, "audio_track"): self.tts.audio_track = self.audio_track logger.info(f"TTS audio track configured from pipeline") else: logger.warning( "No audio track available for TTS configuration") if self.tts.audio_track: logger.info( f"TTS audio track successfully configured: {type(self.tts.audio_track).__name__}" ) else: logger.error( "TTS audio track is None - this will prevent audio playback" ) def set_conversation_flow(self, conversation_flow: ConversationFlow) -> None: logger.info("Setting conversation flow in pipeline") self.conversation_flow = conversation_flow self.conversation_flow.stt = self.stt self.conversation_flow.llm = self.llm self.conversation_flow.tts = self.tts self.conversation_flow.agent = self.agent self.conversation_flow.vad = self.vad self.conversation_flow.turn_detector = self.turn_detector self.conversation_flow.denoise = self.denoise self.conversation_flow.user_speech_callback = self.on_user_speech_started if self.conversation_flow.stt: self.conversation_flow.stt.on_stt_transcript( self.conversation_flow.on_stt_transcript ) if self.conversation_flow.vad: self.conversation_flow.vad.on_vad_event( self.conversation_flow.on_vad_event) async def change_component( self, stt: STT | None = None, llm: LLM | None = None, tts: TTS | None = None, ) -> None: """Dynamically change pipeline components. This will close the old components and set the new ones. """ if stt and self.stt: async with self.conversation_flow.stt_lock: await self.stt.aclose() self.stt = stt self.conversation_flow.stt = stt if self.conversation_flow.stt: self.conversation_flow.stt.on_stt_transcript( self.conversation_flow.on_stt_transcript ) if llm and self.llm: async with self.conversation_flow.llm_lock: await self.llm.aclose() self.llm = llm self.conversation_flow.llm = llm if tts and self.tts: async with self.conversation_flow.tts_lock: await self.tts.aclose() self.tts = tts self._configure_components() self.conversation_flow.tts = tts async def start(self, **kwargs: Any) -> None: if self.conversation_flow: await self.conversation_flow.start() async def send_message(self, message: str) -> None: if self.conversation_flow: await self.conversation_flow.say(message) else: logger.warning("No conversation flow found in pipeline") async def send_text_message(self, message: str) -> None: """ Send a text message directly to the LLM (for A2A communication). This bypasses STT and directly processes the text through the conversation flow. """ if self.conversation_flow: await self.conversation_flow.process_text_input(message) else: await self.send_message(message) async def on_audio_delta(self, audio_data: bytes) -> None: """ Handle incoming audio data from the user """ await self.conversation_flow.send_audio_delta(audio_data) def on_user_speech_started(self) -> None: """ Handle user speech started event """ self._notify_speech_started() async def cleanup(self) -> None: """Cleanup all pipeline components""" if self.stt: await self.stt.aclose() if self.llm: await self.llm.aclose() if self.tts: await self.tts.aclose() def get_component_configs(self) -> dict[str, dict[str, Any]]: """Return a dictionary of component configurations (STT, LLM, TTS) with their instance attributes. Returns: A nested dictionary with keys 'stt', 'llm', 'tts', each containing a dictionary of public instance attributes and extracted model information. """ def extract_model_info(config_dict: Dict[str, Any]) -> Dict[str, Any]: """Helper to extract model-related info from a dictionary with limited nesting.""" model_info = {} model_keys = [ "model", "model_id", "model_name", "voice", "voice_id", "name", ] try: for k, v in config_dict.items(): if k in model_keys and v is not None: model_info[k] = v elif k in ["config", "_config", "voice_config"] and isinstance( v, dict ): for nk, nv in v.items(): if nk in model_keys and nv is not None: model_info[nk] = nv elif k in ["voice_config", "config"] and hasattr(v, "__dict__"): for nk, nv in v.__dict__.items(): if ( nk in model_keys and nv is not None and not nk.startswith("_") ): model_info[nk] = nv except Exception as e: pass return model_info configs: Dict[str, Dict[str, Any]] = {} for comp_name, comp in [ ("stt", self.stt), ("llm", self.llm), ("tts", self.tts), ("vad", self.vad), ("eou", self.turn_detector), ]: if comp: try: configs[comp_name] = { k: v for k, v in comp.__dict__.items() if not k.startswith("_") and not callable(v) } model_info = extract_model_info(comp.__dict__) if model_info: if "model" not in configs[comp_name] and "model" in model_info: configs[comp_name]["model"] = model_info["model"] elif "model" not in configs[comp_name] and "name" in model_info: configs[comp_name]["model"] = model_info["name"] configs[comp_name].update( { k: v for k, v in model_info.items() if k != "model" and k != "name" and k not in configs[comp_name] } ) if comp_name == "vad" and "model" not in configs[comp_name]: if hasattr(comp, "_model_sample_rate"): configs[comp_name][ "model" ] = f"silero_vad_{comp._model_sample_rate}hz" else: configs[comp_name]["model"] = "silero_vad" elif comp_name == "eou" and "model" not in configs[comp_name]: class_name = comp.__class__.__name__ if "VideoSDK" in class_name: configs[comp_name]["model"] = "videosdk_turn_detector" elif "TurnDetector" in class_name: configs[comp_name]["model"] = "turnsense_model" else: configs[comp_name]["model"] = "turn_detector" except Exception as e: configs[comp_name] = configs.get(comp_name, {}) sensitive_keys = ["api_key", "token", "secret", "key", "password", "credential"] for comp in configs.values(): for key in sensitive_keys: comp.pop(key, None) return configs def on_component_error(self, source: str, error_data: Any) -> None: """Handle error events from components (STT, LLM, TTS, VAD, TURN-D)""" from .metrics import cascading_metrics_collector cascading_metrics_collector.add_error(source, str(error_data)) logger.error(f"[{source}] Component error: {error_data}")
Cascading pipeline implementation that processes data in sequence (STT -> LLM -> TTS). Inherits from Pipeline base class and adds cascade-specific events.
Initialize the cascading pipeline.
Args
stt
- Speech-to-Text processor (optional)
llm
- Language Model processor (optional)
tts
- Text-to-Speech processor (optional)
vad
- Voice Activity Detector (optional)
turn_detector
- Turn Detector (optional)
avatar
- Avatar (optional)
denoise
- Denoise (optional)
Ancestors
- Pipeline
- EventEmitter
- typing.Generic
- abc.ABC
Methods
async def change_component(self,
stt: STT | None = None,
llm: LLM | None = None,
tts: TTS | None = None) ‑> None-
Expand source code
async def change_component( self, stt: STT | None = None, llm: LLM | None = None, tts: TTS | None = None, ) -> None: """Dynamically change pipeline components. This will close the old components and set the new ones. """ if stt and self.stt: async with self.conversation_flow.stt_lock: await self.stt.aclose() self.stt = stt self.conversation_flow.stt = stt if self.conversation_flow.stt: self.conversation_flow.stt.on_stt_transcript( self.conversation_flow.on_stt_transcript ) if llm and self.llm: async with self.conversation_flow.llm_lock: await self.llm.aclose() self.llm = llm self.conversation_flow.llm = llm if tts and self.tts: async with self.conversation_flow.tts_lock: await self.tts.aclose() self.tts = tts self._configure_components() self.conversation_flow.tts = tts
Dynamically change pipeline components. This will close the old components and set the new ones.
async def cleanup(self) ‑> None
-
Expand source code
async def cleanup(self) -> None: """Cleanup all pipeline components""" if self.stt: await self.stt.aclose() if self.llm: await self.llm.aclose() if self.tts: await self.tts.aclose()
Cleanup all pipeline components
def get_component_configs(self) ‑> dict[str, dict[str, typing.Any]]
-
Expand source code
def get_component_configs(self) -> dict[str, dict[str, Any]]: """Return a dictionary of component configurations (STT, LLM, TTS) with their instance attributes. Returns: A nested dictionary with keys 'stt', 'llm', 'tts', each containing a dictionary of public instance attributes and extracted model information. """ def extract_model_info(config_dict: Dict[str, Any]) -> Dict[str, Any]: """Helper to extract model-related info from a dictionary with limited nesting.""" model_info = {} model_keys = [ "model", "model_id", "model_name", "voice", "voice_id", "name", ] try: for k, v in config_dict.items(): if k in model_keys and v is not None: model_info[k] = v elif k in ["config", "_config", "voice_config"] and isinstance( v, dict ): for nk, nv in v.items(): if nk in model_keys and nv is not None: model_info[nk] = nv elif k in ["voice_config", "config"] and hasattr(v, "__dict__"): for nk, nv in v.__dict__.items(): if ( nk in model_keys and nv is not None and not nk.startswith("_") ): model_info[nk] = nv except Exception as e: pass return model_info configs: Dict[str, Dict[str, Any]] = {} for comp_name, comp in [ ("stt", self.stt), ("llm", self.llm), ("tts", self.tts), ("vad", self.vad), ("eou", self.turn_detector), ]: if comp: try: configs[comp_name] = { k: v for k, v in comp.__dict__.items() if not k.startswith("_") and not callable(v) } model_info = extract_model_info(comp.__dict__) if model_info: if "model" not in configs[comp_name] and "model" in model_info: configs[comp_name]["model"] = model_info["model"] elif "model" not in configs[comp_name] and "name" in model_info: configs[comp_name]["model"] = model_info["name"] configs[comp_name].update( { k: v for k, v in model_info.items() if k != "model" and k != "name" and k not in configs[comp_name] } ) if comp_name == "vad" and "model" not in configs[comp_name]: if hasattr(comp, "_model_sample_rate"): configs[comp_name][ "model" ] = f"silero_vad_{comp._model_sample_rate}hz" else: configs[comp_name]["model"] = "silero_vad" elif comp_name == "eou" and "model" not in configs[comp_name]: class_name = comp.__class__.__name__ if "VideoSDK" in class_name: configs[comp_name]["model"] = "videosdk_turn_detector" elif "TurnDetector" in class_name: configs[comp_name]["model"] = "turnsense_model" else: configs[comp_name]["model"] = "turn_detector" except Exception as e: configs[comp_name] = configs.get(comp_name, {}) sensitive_keys = ["api_key", "token", "secret", "key", "password", "credential"] for comp in configs.values(): for key in sensitive_keys: comp.pop(key, None) return configs
Return a dictionary of component configurations (STT, LLM, TTS) with their instance attributes.
Returns
A nested dictionary with keys 'stt', 'llm', 'tts', each containing a dictionary of public instance attributes and extracted model information.
def on_component_error(self, source: str, error_data: Any) ‑> None
-
Expand source code
def on_component_error(self, source: str, error_data: Any) -> None: """Handle error events from components (STT, LLM, TTS, VAD, TURN-D)""" from .metrics import cascading_metrics_collector cascading_metrics_collector.add_error(source, str(error_data)) logger.error(f"[{source}] Component error: {error_data}")
Handle error events from components (STT, LLM, TTS, VAD, TURN-D)
def on_user_speech_started(self) ‑> None
-
Expand source code
def on_user_speech_started(self) -> None: """ Handle user speech started event """ self._notify_speech_started()
Handle user speech started event
async def send_text_message(self, message: str) ‑> None
-
Expand source code
async def send_text_message(self, message: str) -> None: """ Send a text message directly to the LLM (for A2A communication). This bypasses STT and directly processes the text through the conversation flow. """ if self.conversation_flow: await self.conversation_flow.process_text_input(message) else: await self.send_message(message)
Send a text message directly to the LLM (for A2A communication). This bypasses STT and directly processes the text through the conversation flow.
def set_agent(self, agent: Agent) ‑> None
-
Expand source code
def set_agent(self, agent: Agent) -> None: self.agent = agent
def set_conversation_flow(self,
conversation_flow: ConversationFlow) ‑> None-
Expand source code
def set_conversation_flow(self, conversation_flow: ConversationFlow) -> None: logger.info("Setting conversation flow in pipeline") self.conversation_flow = conversation_flow self.conversation_flow.stt = self.stt self.conversation_flow.llm = self.llm self.conversation_flow.tts = self.tts self.conversation_flow.agent = self.agent self.conversation_flow.vad = self.vad self.conversation_flow.turn_detector = self.turn_detector self.conversation_flow.denoise = self.denoise self.conversation_flow.user_speech_callback = self.on_user_speech_started if self.conversation_flow.stt: self.conversation_flow.stt.on_stt_transcript( self.conversation_flow.on_stt_transcript ) if self.conversation_flow.vad: self.conversation_flow.vad.on_vad_event( self.conversation_flow.on_vad_event)
Inherited members
class ChatContext (items: Optional[List[ChatItem]] = None)
-
Expand source code
class ChatContext: """ Manages a conversation context for LLM interactions. """ def __init__(self, items: Optional[List[ChatItem]] = None): """ Initialize the chat context. Args: items (Optional[List[ChatItem]]): Initial list of chat items. If None, starts with empty context. """ self._items: List[ChatItem] = items or [] @classmethod def empty(cls) -> ChatContext: """ Create an empty chat context. Returns: ChatContext: A new empty chat context instance. """ return cls([]) @property def items(self) -> List[ChatItem]: """ Get all items in the context. Returns: List[ChatItem]: List of all conversation items (messages, function calls, outputs). """ return self._items def add_message( self, role: ChatRole, content: Union[str, List[ChatContent]], message_id: Optional[str] = None, created_at: Optional[float] = None, ) -> ChatMessage: """ Add a new message to the context. Args: role (ChatRole): The role of the message sender. content (Union[str, List[ChatContent]]): The message content as text or content items. message_id (Optional[str], optional): Custom message ID. Auto-generated if not provided. created_at (Optional[float], optional): Custom creation timestamp. Uses current time if not provided. Returns: ChatMessage: The newly created and added message. """ if isinstance(content, str): content = [content] message = ChatMessage( role=role, content=content, id=message_id or f"msg_{int(time.time())}", created_at=created_at or time.time(), ) self._items.append(message) return message def add_function_call( self, name: str, arguments: str, call_id: Optional[str] = None ) -> FunctionCall: """ Add a function call to the context. Args: name (str): Name of the function to be called. arguments (str): JSON string containing the function arguments. call_id (Optional[str], optional): Custom call ID. Auto-generated if not provided. Returns: FunctionCall: The newly created and added function call. """ call = FunctionCall( name=name, arguments=arguments, call_id=call_id or f"call_{int(time.time())}" ) self._items.append(call) return call def add_function_output( self, name: str, output: str, call_id: str, is_error: bool = False ) -> FunctionCallOutput: """ Add a function output to the context. Args: name (str): Name of the function that was executed. output (str): The result or output from the function execution. call_id (str): ID linking this output to the original function call. is_error (bool, optional): Flag indicating if the function execution failed. Defaults to False. Returns: FunctionCallOutput: The newly created and added function output. """ function_output = FunctionCallOutput( name=name, output=output, call_id=call_id, is_error=is_error ) self._items.append(function_output) return function_output def get_by_id(self, item_id: str) -> Optional[ChatItem]: """ Find an item by its ID. Args: item_id (str): The ID of the item to find. Returns: Optional[ChatItem]: The found item or None if not found. """ return next( (item for item in self._items if item.id == item_id), None ) def copy( self, *, exclude_function_calls: bool = False, exclude_system_messages: bool = False, tools: Optional[List[FunctionTool]] = None, ) -> ChatContext: """ Create a filtered copy of the chat context. Args: exclude_function_calls (bool, optional): Whether to exclude function calls and outputs. Defaults to False. exclude_system_messages (bool, optional): Whether to exclude system messages. Defaults to False. tools (Optional[List[FunctionTool]], optional): List of available tools to filter function calls by. Defaults to None. Returns: ChatContext: A new ChatContext instance with the filtered items. """ items = [] valid_tool_names = {get_tool_info(tool).name for tool in ( tools or []) if is_function_tool(tool)} for item in self._items: # Skip function calls if excluded if exclude_function_calls and isinstance(item, (FunctionCall, FunctionCallOutput)): continue # Skip system messages if excluded if exclude_system_messages and isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM: continue # Filter by valid tools if tools are provided if tools and isinstance(item, (FunctionCall, FunctionCallOutput)): if item.name not in valid_tool_names: continue items.append(item) return ChatContext(items) def truncate(self, max_items: int) -> ChatContext: """ Truncate the context to the last N items while preserving system message. Args: max_items (int): Maximum number of items to keep in the context. Returns: ChatContext: The current context instance after truncation. """ system_msg = next( (item for item in self._items if isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM), None ) new_items = self._items[-max_items:] while new_items and isinstance(new_items[0], (FunctionCall, FunctionCallOutput)): new_items.pop(0) if system_msg and system_msg not in new_items: new_items.insert(0, system_msg) self._items = new_items return self def to_dict(self) -> dict: """ Convert the context to a dictionary representation. Returns: dict: Dictionary representation of the chat context. """ return { "items": [ { "type": item.type, "id": item.id, **({"role": item.role.value, "content": item.content} if isinstance(item, ChatMessage) else {}), **({"name": item.name, "arguments": item.arguments, "call_id": item.call_id} if isinstance(item, FunctionCall) else {}), **({"name": item.name, "output": item.output, "call_id": item.call_id, "is_error": item.is_error} if isinstance(item, FunctionCallOutput) else {}) } for item in self._items ] } @classmethod def from_dict(cls, data: dict) -> ChatContext: """ Create a ChatContext from a dictionary representation. Args: data (dict): Dictionary containing the serialized chat context data. Returns: ChatContext: A new ChatContext instance reconstructed from the data. """ items = [] for item_data in data["items"]: if item_data["type"] == "message": items.append(ChatMessage( role=ChatRole(item_data["role"]), content=item_data["content"], id=item_data["id"] )) elif item_data["type"] == "function_call": items.append(FunctionCall( name=item_data["name"], arguments=item_data["arguments"], call_id=item_data["call_id"], id=item_data["id"] )) elif item_data["type"] == "function_call_output": items.append(FunctionCallOutput( name=item_data["name"], output=item_data["output"], call_id=item_data["call_id"], is_error=item_data.get("is_error", False), id=item_data["id"] )) return cls(items)
Manages a conversation context for LLM interactions.
Initialize the chat context.
Args
items
:Optional[List[ChatItem]]
- Initial list of chat items. If None, starts with empty context.
Static methods
def empty() ‑> ChatContext
def from_dict(data: dict) ‑> ChatContext
-
Create a ChatContext from a dictionary representation.
Args
data
:dict
- Dictionary containing the serialized chat context data.
Returns
ChatContext
- A new ChatContext instance reconstructed from the data.
Instance variables
prop items : List[ChatItem]
-
Expand source code
@property def items(self) -> List[ChatItem]: """ Get all items in the context. Returns: List[ChatItem]: List of all conversation items (messages, function calls, outputs). """ return self._items
Get all items in the context.
Returns
List[ChatItem]
- List of all conversation items (messages, function calls, outputs).
Methods
def add_function_call(self, name: str, arguments: str, call_id: Optional[str] = None) ‑> FunctionCall
-
Expand source code
def add_function_call( self, name: str, arguments: str, call_id: Optional[str] = None ) -> FunctionCall: """ Add a function call to the context. Args: name (str): Name of the function to be called. arguments (str): JSON string containing the function arguments. call_id (Optional[str], optional): Custom call ID. Auto-generated if not provided. Returns: FunctionCall: The newly created and added function call. """ call = FunctionCall( name=name, arguments=arguments, call_id=call_id or f"call_{int(time.time())}" ) self._items.append(call) return call
Add a function call to the context.
Args
name
:str
- Name of the function to be called.
arguments
:str
- JSON string containing the function arguments.
call_id
:Optional[str]
, optional- Custom call ID. Auto-generated if not provided.
Returns
FunctionCall
- The newly created and added function call.
def add_function_output(self, name: str, output: str, call_id: str, is_error: bool = False) ‑> FunctionCallOutput
-
Expand source code
def add_function_output( self, name: str, output: str, call_id: str, is_error: bool = False ) -> FunctionCallOutput: """ Add a function output to the context. Args: name (str): Name of the function that was executed. output (str): The result or output from the function execution. call_id (str): ID linking this output to the original function call. is_error (bool, optional): Flag indicating if the function execution failed. Defaults to False. Returns: FunctionCallOutput: The newly created and added function output. """ function_output = FunctionCallOutput( name=name, output=output, call_id=call_id, is_error=is_error ) self._items.append(function_output) return function_output
Add a function output to the context.
Args
name
:str
- Name of the function that was executed.
output
:str
- The result or output from the function execution.
call_id
:str
- ID linking this output to the original function call.
is_error
:bool
, optional- Flag indicating if the function execution failed. Defaults to False.
Returns
FunctionCallOutput
- The newly created and added function output.
def add_message(self,
role: ChatRole,
content: Union[str, List[ChatContent]],
message_id: Optional[str] = None,
created_at: Optional[float] = None) ‑> ChatMessage-
Expand source code
def add_message( self, role: ChatRole, content: Union[str, List[ChatContent]], message_id: Optional[str] = None, created_at: Optional[float] = None, ) -> ChatMessage: """ Add a new message to the context. Args: role (ChatRole): The role of the message sender. content (Union[str, List[ChatContent]]): The message content as text or content items. message_id (Optional[str], optional): Custom message ID. Auto-generated if not provided. created_at (Optional[float], optional): Custom creation timestamp. Uses current time if not provided. Returns: ChatMessage: The newly created and added message. """ if isinstance(content, str): content = [content] message = ChatMessage( role=role, content=content, id=message_id or f"msg_{int(time.time())}", created_at=created_at or time.time(), ) self._items.append(message) return message
Add a new message to the context.
Args
role
:ChatRole
- The role of the message sender.
content
:Union[str, List[ChatContent]]
- The message content as text or content items.
message_id
:Optional[str]
, optional- Custom message ID. Auto-generated if not provided.
created_at
:Optional[float]
, optional- Custom creation timestamp. Uses current time if not provided.
Returns
ChatMessage
- The newly created and added message.
def copy(self,
*,
exclude_function_calls: bool = False,
exclude_system_messages: bool = False,
tools: Optional[List[FunctionTool]] = None) ‑> ChatContext-
Expand source code
def copy( self, *, exclude_function_calls: bool = False, exclude_system_messages: bool = False, tools: Optional[List[FunctionTool]] = None, ) -> ChatContext: """ Create a filtered copy of the chat context. Args: exclude_function_calls (bool, optional): Whether to exclude function calls and outputs. Defaults to False. exclude_system_messages (bool, optional): Whether to exclude system messages. Defaults to False. tools (Optional[List[FunctionTool]], optional): List of available tools to filter function calls by. Defaults to None. Returns: ChatContext: A new ChatContext instance with the filtered items. """ items = [] valid_tool_names = {get_tool_info(tool).name for tool in ( tools or []) if is_function_tool(tool)} for item in self._items: # Skip function calls if excluded if exclude_function_calls and isinstance(item, (FunctionCall, FunctionCallOutput)): continue # Skip system messages if excluded if exclude_system_messages and isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM: continue # Filter by valid tools if tools are provided if tools and isinstance(item, (FunctionCall, FunctionCallOutput)): if item.name not in valid_tool_names: continue items.append(item) return ChatContext(items)
Create a filtered copy of the chat context.
Args
exclude_function_calls
:bool
, optional- Whether to exclude function calls and outputs. Defaults to False.
exclude_system_messages
:bool
, optional- Whether to exclude system messages. Defaults to False.
tools
:Optional[List[FunctionTool]]
, optional- List of available tools to filter function calls by. Defaults to None.
Returns
ChatContext
- A new ChatContext instance with the filtered items.
def get_by_id(self, item_id: str) ‑> ChatMessage | FunctionCall | FunctionCallOutput | None
-
Expand source code
def get_by_id(self, item_id: str) -> Optional[ChatItem]: """ Find an item by its ID. Args: item_id (str): The ID of the item to find. Returns: Optional[ChatItem]: The found item or None if not found. """ return next( (item for item in self._items if item.id == item_id), None )
Find an item by its ID.
Args
item_id
:str
- The ID of the item to find.
Returns
Optional[ChatItem]
- The found item or None if not found.
def to_dict(self) ‑> dict
-
Expand source code
def to_dict(self) -> dict: """ Convert the context to a dictionary representation. Returns: dict: Dictionary representation of the chat context. """ return { "items": [ { "type": item.type, "id": item.id, **({"role": item.role.value, "content": item.content} if isinstance(item, ChatMessage) else {}), **({"name": item.name, "arguments": item.arguments, "call_id": item.call_id} if isinstance(item, FunctionCall) else {}), **({"name": item.name, "output": item.output, "call_id": item.call_id, "is_error": item.is_error} if isinstance(item, FunctionCallOutput) else {}) } for item in self._items ] }
Convert the context to a dictionary representation.
Returns
dict
- Dictionary representation of the chat context.
def truncate(self, max_items: int) ‑> ChatContext
-
Expand source code
def truncate(self, max_items: int) -> ChatContext: """ Truncate the context to the last N items while preserving system message. Args: max_items (int): Maximum number of items to keep in the context. Returns: ChatContext: The current context instance after truncation. """ system_msg = next( (item for item in self._items if isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM), None ) new_items = self._items[-max_items:] while new_items and isinstance(new_items[0], (FunctionCall, FunctionCallOutput)): new_items.pop(0) if system_msg and system_msg not in new_items: new_items.insert(0, system_msg) self._items = new_items return self
Truncate the context to the last N items while preserving system message.
Args
max_items
:int
- Maximum number of items to keep in the context.
Returns
ChatContext
- The current context instance after truncation.
class ChatMessage (**data: Any)
-
Expand source code
class ChatMessage(BaseModel): """ Represents a single message in the chat context. Attributes: role (ChatRole): The role of the message sender (system, user, or assistant). content (Union[str, List[ChatContent]]): The message content as text or list of content items. id (str): Unique identifier for the message. Auto-generated if not provided. type (Literal["message"]): Type identifier, always "message". created_at (float): Unix timestamp when the message was created. interrupted (bool): Flag indicating if the message was interrupted during generation. """ role: ChatRole content: Union[str, List[ChatContent]] id: str = Field(default_factory=lambda: f"msg_{int(time.time())}") type: Literal["message"] = "message" created_at: float = Field(default_factory=time.time) interrupted: bool = False
Represents a single message in the chat context.
Attributes
role
:ChatRole
- The role of the message sender (system, user, or assistant).
content
:Union[str, List[ChatContent]]
- The message content as text or list of content items.
id
:str
- Unique identifier for the message. Auto-generated if not provided.
- type (Literal["message"]): Type identifier, always "message".
created_at
:float
- Unix timestamp when the message was created.
interrupted
:bool
- Flag indicating if the message was interrupted during generation.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var content : str | List[str | ImageContent]
var created_at : float
var id : str
var interrupted : bool
var model_config
var role : ChatRole
var type : Literal['message']
class ChatRole (*args, **kwds)
-
Expand source code
class ChatRole(str, Enum): """ Enumeration of chat roles for message classification. Defines the three standard roles used in chat conversations: - SYSTEM: Instructions and context for the AI assistant - USER: Messages from the human user - ASSISTANT: Responses from the AI assistant """ SYSTEM = "system" USER = "user" ASSISTANT = "assistant"
Enumeration of chat roles for message classification.
Defines the three standard roles used in chat conversations: - SYSTEM: Instructions and context for the AI assistant - USER: Messages from the human user - ASSISTANT: Responses from the AI assistant
Ancestors
- builtins.str
- enum.Enum
Class variables
var ASSISTANT
var SYSTEM
var USER
class ConversationFlow (agent: Agent,
stt: STT | None = None,
llm: LLM | None = None,
tts: TTS | None = None,
vad: VAD | None = None,
turn_detector: EOU | None = None,
denoise: Denoise | None = None)-
Expand source code
class ConversationFlow(EventEmitter[Literal["transcription"]], ABC): """ Manages the conversation flow by listening to transcription events. """ def __init__(self, agent: Agent, stt: STT | None = None, llm: LLM | None = None, tts: TTS | None = None, vad: VAD | None = None, turn_detector: EOU | None = None, denoise: Denoise | None = None) -> None: """Initialize conversation flow with event emitter capabilities""" super().__init__() self.transcription_callback: Callable[[ STTResponse], Awaitable[None]] | None = None self.stt = stt self.llm = llm self.tts = tts self.vad = vad self.turn_detector = turn_detector self.agent = agent self.denoise = denoise self._stt_started = False self.stt_lock = asyncio.Lock() self.llm_lock = asyncio.Lock() self.tts_lock = asyncio.Lock() self.user_speech_callback: Callable[[], None] | None = None if self.stt: self.stt.on_stt_transcript(self.on_stt_transcript) if self.vad: self.vad.on_vad_event(self.on_vad_event) self._current_tts_task: asyncio.Task | None = None self._current_llm_task: asyncio.Task | None = None self._partial_response = "" self._is_interrupted = False async def start(self) -> None: global_event_emitter.on("speech_started", self.on_speech_started_stt) global_event_emitter.on("speech_stopped", self.on_speech_stopped_stt) if self.agent and self.agent.instructions: cascading_metrics_collector.set_system_instructions( self.agent.instructions) def on_transcription(self, callback: Callable[[str], None]) -> None: """ Set the callback for transcription events. Args: callback: Function to call when transcription occurs, takes transcribed text as argument """ self.on("transcription_event", lambda data: callback(data["text"])) async def send_audio_delta(self, audio_data: bytes) -> None: """ Send audio delta to the STT """ asyncio.create_task(self._process_audio_delta(audio_data)) async def _process_audio_delta(self, audio_data: bytes) -> None: """Background processing of audio delta""" try: if self.denoise: audio_data = await self.denoise.denoise(audio_data) if self.stt: async with self.stt_lock: await self.stt.process_audio(audio_data) if self.vad: await self.vad.process_audio(audio_data) except Exception as e: self.emit("error", f"Audio processing failed: {str(e)}") async def on_vad_event(self, vad_response: VADResponse) -> None: """Handle VAD events""" if vad_response.event_type == VADEventType.START_OF_SPEECH: await self.on_speech_started() elif vad_response.event_type == VADEventType.END_OF_SPEECH: self.on_speech_stopped() async def on_stt_transcript(self, stt_response: STTResponse) -> None: """Handle STT transcript events""" if stt_response.event_type == SpeechEventType.FINAL: user_text = stt_response.data.text await self._process_final_transcript(user_text) async def _process_final_transcript(self, user_text: str) -> None: """Process final transcript with EOU detection and response generation""" # Fallback: If VAD is missing, this can start the turn. Otherwise, the collector handles it. if not cascading_metrics_collector.data.current_turn: cascading_metrics_collector.on_user_speech_start() cascading_metrics_collector.set_user_transcript(user_text) cascading_metrics_collector.on_stt_complete() # Fallback: If VAD is present but hasn't called on_user_speech_end yet, if self.vad and cascading_metrics_collector.data.is_user_speaking: cascading_metrics_collector.on_user_speech_end() elif not self.vad: cascading_metrics_collector.on_user_speech_end() self.agent.chat_context.add_message( role=ChatRole.USER, content=user_text ) await self.on_turn_start(user_text) if self.turn_detector: cascading_metrics_collector.on_eou_start() eou_detected = self.turn_detector.detect_end_of_utterance( self.agent.chat_context) cascading_metrics_collector.on_eou_complete() if eou_detected: asyncio.create_task( self._generate_and_synthesize_response(user_text)) else: cascading_metrics_collector.complete_current_turn() else: asyncio.create_task( self._generate_and_synthesize_response(user_text)) await self.on_turn_end() async def _generate_and_synthesize_response(self, user_text: str) -> None: """Generate agent response""" self._is_interrupted = False full_response = "" self._partial_response = "" try: llm_stream = self.run(user_text) q = asyncio.Queue(maxsize=50) async def collector(): response_parts = [] try: async for chunk in llm_stream: if self._is_interrupted: logger.info("LLM collection interrupted") await q.put(None) return "".join(response_parts) self._partial_response = "".join(response_parts) await q.put(chunk) response_parts.append(chunk) await q.put(None) return "".join(response_parts) except asyncio.CancelledError: logger.info("LLM collection cancelled") await q.put(None) return "".join(response_parts) async def tts_consumer(): async def tts_stream_gen(): while True: if self._is_interrupted: break chunk = await q.get() if chunk is None: break yield chunk if self.tts: try: await self._synthesize_with_tts(tts_stream_gen()) except asyncio.CancelledError: pass collector_task = asyncio.create_task(collector()) tts_task = asyncio.create_task(tts_consumer()) self._current_llm_task = collector_task self._current_tts_task = tts_task await asyncio.gather(collector_task, tts_task, return_exceptions=True) if not collector_task.cancelled() and not self._is_interrupted: full_response = collector_task.result() else: full_response = self._partial_response if full_response and not self._is_interrupted: cascading_metrics_collector.set_agent_response(full_response) self.agent.chat_context.add_message( role=ChatRole.ASSISTANT, content=full_response ) finally: self._current_tts_task = None self._current_llm_task = None cascading_metrics_collector.complete_current_turn() async def process_with_llm(self) -> AsyncIterator[str]: """ Process the current chat context with LLM and yield response chunks. This method can be called by user implementations to get LLM responses. """ async with self.llm_lock: if not self.llm: return cascading_metrics_collector.on_llm_start() first_chunk_received = False async for llm_chunk_resp in self.llm.chat( self.agent.chat_context, tools=self.agent._tools ): if self._is_interrupted: logger.info("LLM processing interrupted") break if not first_chunk_received: first_chunk_received = True cascading_metrics_collector.on_llm_complete() if llm_chunk_resp.metadata and "function_call" in llm_chunk_resp.metadata: func_call = llm_chunk_resp.metadata["function_call"] cascading_metrics_collector.add_function_tool_call( func_call["name"]) self.agent.chat_context.add_function_call( name=func_call["name"], arguments=json.dumps(func_call["arguments"]), call_id=func_call.get( "call_id", f"call_{int(time.time())}") ) try: tool = next( (t for t in self.agent.tools if is_function_tool( t) and get_tool_info(t).name == func_call["name"]), None ) except Exception as e: logger.error(f"Error while selecting tool: {e}") continue if tool: try: result = await tool(**func_call["arguments"]) self.agent.chat_context.add_function_output( name=func_call["name"], output=json.dumps(result), call_id=func_call.get( "call_id", f"call_{int(time.time())}") ) async for new_resp in self.llm.chat(self.agent.chat_context): if self._is_interrupted: break if new_resp.content: yield new_resp.content except Exception as e: logger.error( f"Error executing function {func_call['name']}: {e}") continue else: if llm_chunk_resp.content: yield llm_chunk_resp.content async def say(self, message: str) -> None: """ Direct TTS synthesis (used for initial messages) """ if self.tts: cascading_metrics_collector.start_new_interaction("") cascading_metrics_collector.set_agent_response(message) try: await self._synthesize_with_tts(message) finally: cascading_metrics_collector.complete_current_turn() async def process_text_input(self, text: str) -> None: """ Process text input directly (for A2A communication). This bypasses STT and directly processes the text through the LLM. """ cascading_metrics_collector.start_new_interaction(text) self.agent.chat_context.add_message( role=ChatRole.USER, content=text ) full_response = "" async for response_chunk in self.process_with_llm(): full_response += response_chunk if full_response: cascading_metrics_collector.set_agent_response(full_response) cascading_metrics_collector.complete_current_turn() global_event_emitter.emit("text_response", {"text": full_response}) async def run(self, transcript: str) -> AsyncIterator[str]: """ Main conversation loop: handle a user turn. Users should implement this method to preprocess transcripts and yield response chunks. """ async for response in self.process_with_llm(): yield response async def on_turn_start(self, transcript: str) -> None: """Called at the start of a user turn.""" pass async def on_turn_end(self) -> None: """Called at the end of a user turn.""" pass def on_speech_started_stt(self, event_data: Any) -> None: if self.user_speech_callback: self.user_speech_callback() def on_speech_stopped_stt(self, event_data: Any) -> None: pass async def on_speech_started(self) -> None: cascading_metrics_collector.on_user_speech_start() if self.user_speech_callback: self.user_speech_callback() if self._stt_started: self._stt_started = False if self.tts: await self._interrupt_tts() async def _interrupt_tts(self) -> None: logger.info("Interrupting TTS and LLM generation") self._is_interrupted = True if self.tts: await self.tts.interrupt() if self.llm: await self._cancel_llm() tasks_to_cancel = [] if self._current_tts_task and not self._current_tts_task.done(): tasks_to_cancel.append(self._current_tts_task) if self._current_llm_task and not self._current_llm_task.done(): tasks_to_cancel.append(self._current_llm_task) if tasks_to_cancel: await graceful_cancel(*tasks_to_cancel) cascading_metrics_collector.on_interrupted() async def _cancel_llm(self) -> None: """Cancel LLM generation""" try: await self.llm.cancel_current_generation() except Exception as e: logger.error(f"LLM cancellation failed: {e}") def on_speech_stopped(self) -> None: if not self._stt_started: cascading_metrics_collector.on_stt_start() self._stt_started = True cascading_metrics_collector.on_user_speech_end() async def _synthesize_with_tts(self, response_gen: AsyncIterator[str] | str) -> None: """ Stream LLM response directly to TTS. """ if not self.tts: return async def on_first_audio_byte(): cascading_metrics_collector.on_tts_first_byte() cascading_metrics_collector.on_agent_speech_start() self.tts.on_first_audio_byte(on_first_audio_byte) self.tts.reset_first_audio_tracking() cascading_metrics_collector.on_tts_start() try: response_iterator: AsyncIterator[str] if isinstance(response_gen, str): async def string_to_iterator(text: str): yield text response_iterator = string_to_iterator(response_gen) else: response_iterator = response_gen await self.tts.synthesize(response_iterator) finally: cascading_metrics_collector.on_agent_speech_end()
Manages the conversation flow by listening to transcription events.
Initialize conversation flow with event emitter capabilities
Ancestors
- EventEmitter
- typing.Generic
- abc.ABC
Methods
async def on_speech_started(self) ‑> None
-
Expand source code
async def on_speech_started(self) -> None: cascading_metrics_collector.on_user_speech_start() if self.user_speech_callback: self.user_speech_callback() if self._stt_started: self._stt_started = False if self.tts: await self._interrupt_tts()
def on_speech_started_stt(self, event_data: Any) ‑> None
-
Expand source code
def on_speech_started_stt(self, event_data: Any) -> None: if self.user_speech_callback: self.user_speech_callback()
def on_speech_stopped(self) ‑> None
-
Expand source code
def on_speech_stopped(self) -> None: if not self._stt_started: cascading_metrics_collector.on_stt_start() self._stt_started = True cascading_metrics_collector.on_user_speech_end()
def on_speech_stopped_stt(self, event_data: Any) ‑> None
-
Expand source code
def on_speech_stopped_stt(self, event_data: Any) -> None: pass
async def on_stt_transcript(self,
stt_response: STTResponse) ‑> None-
Expand source code
async def on_stt_transcript(self, stt_response: STTResponse) -> None: """Handle STT transcript events""" if stt_response.event_type == SpeechEventType.FINAL: user_text = stt_response.data.text await self._process_final_transcript(user_text)
Handle STT transcript events
def on_transcription(self, callback: Callable[[str], None]) ‑> None
-
Expand source code
def on_transcription(self, callback: Callable[[str], None]) -> None: """ Set the callback for transcription events. Args: callback: Function to call when transcription occurs, takes transcribed text as argument """ self.on("transcription_event", lambda data: callback(data["text"]))
Set the callback for transcription events.
Args
callback
- Function to call when transcription occurs, takes transcribed text as argument
async def on_turn_end(self) ‑> None
-
Expand source code
async def on_turn_end(self) -> None: """Called at the end of a user turn.""" pass
Called at the end of a user turn.
async def on_turn_start(self, transcript: str) ‑> None
-
Expand source code
async def on_turn_start(self, transcript: str) -> None: """Called at the start of a user turn.""" pass
Called at the start of a user turn.
async def on_vad_event(self,
vad_response: VADResponse) ‑> None-
Expand source code
async def on_vad_event(self, vad_response: VADResponse) -> None: """Handle VAD events""" if vad_response.event_type == VADEventType.START_OF_SPEECH: await self.on_speech_started() elif vad_response.event_type == VADEventType.END_OF_SPEECH: self.on_speech_stopped()
Handle VAD events
async def process_text_input(self, text: str) ‑> None
-
Expand source code
async def process_text_input(self, text: str) -> None: """ Process text input directly (for A2A communication). This bypasses STT and directly processes the text through the LLM. """ cascading_metrics_collector.start_new_interaction(text) self.agent.chat_context.add_message( role=ChatRole.USER, content=text ) full_response = "" async for response_chunk in self.process_with_llm(): full_response += response_chunk if full_response: cascading_metrics_collector.set_agent_response(full_response) cascading_metrics_collector.complete_current_turn() global_event_emitter.emit("text_response", {"text": full_response})
Process text input directly (for A2A communication). This bypasses STT and directly processes the text through the LLM.
async def process_with_llm(self) ‑> AsyncIterator[str]
-
Expand source code
async def process_with_llm(self) -> AsyncIterator[str]: """ Process the current chat context with LLM and yield response chunks. This method can be called by user implementations to get LLM responses. """ async with self.llm_lock: if not self.llm: return cascading_metrics_collector.on_llm_start() first_chunk_received = False async for llm_chunk_resp in self.llm.chat( self.agent.chat_context, tools=self.agent._tools ): if self._is_interrupted: logger.info("LLM processing interrupted") break if not first_chunk_received: first_chunk_received = True cascading_metrics_collector.on_llm_complete() if llm_chunk_resp.metadata and "function_call" in llm_chunk_resp.metadata: func_call = llm_chunk_resp.metadata["function_call"] cascading_metrics_collector.add_function_tool_call( func_call["name"]) self.agent.chat_context.add_function_call( name=func_call["name"], arguments=json.dumps(func_call["arguments"]), call_id=func_call.get( "call_id", f"call_{int(time.time())}") ) try: tool = next( (t for t in self.agent.tools if is_function_tool( t) and get_tool_info(t).name == func_call["name"]), None ) except Exception as e: logger.error(f"Error while selecting tool: {e}") continue if tool: try: result = await tool(**func_call["arguments"]) self.agent.chat_context.add_function_output( name=func_call["name"], output=json.dumps(result), call_id=func_call.get( "call_id", f"call_{int(time.time())}") ) async for new_resp in self.llm.chat(self.agent.chat_context): if self._is_interrupted: break if new_resp.content: yield new_resp.content except Exception as e: logger.error( f"Error executing function {func_call['name']}: {e}") continue else: if llm_chunk_resp.content: yield llm_chunk_resp.content
Process the current chat context with LLM and yield response chunks. This method can be called by user implementations to get LLM responses.
async def run(self, transcript: str) ‑> AsyncIterator[str]
-
Expand source code
async def run(self, transcript: str) -> AsyncIterator[str]: """ Main conversation loop: handle a user turn. Users should implement this method to preprocess transcripts and yield response chunks. """ async for response in self.process_with_llm(): yield response
Main conversation loop: handle a user turn. Users should implement this method to preprocess transcripts and yield response chunks.
async def say(self, message: str) ‑> None
-
Expand source code
async def say(self, message: str) -> None: """ Direct TTS synthesis (used for initial messages) """ if self.tts: cascading_metrics_collector.start_new_interaction("") cascading_metrics_collector.set_agent_response(message) try: await self._synthesize_with_tts(message) finally: cascading_metrics_collector.complete_current_turn()
Direct TTS synthesis (used for initial messages)
async def send_audio_delta(self, audio_data: bytes) ‑> None
-
Expand source code
async def send_audio_delta(self, audio_data: bytes) -> None: """ Send audio delta to the STT """ asyncio.create_task(self._process_audio_delta(audio_data))
Send audio delta to the STT
async def start(self) ‑> None
-
Expand source code
async def start(self) -> None: global_event_emitter.on("speech_started", self.on_speech_started_stt) global_event_emitter.on("speech_stopped", self.on_speech_stopped_stt) if self.agent and self.agent.instructions: cascading_metrics_collector.set_system_instructions( self.agent.instructions)
class CustomAudioStreamTrack (loop)
-
Expand source code
class CustomAudioStreamTrack(CustomAudioTrack): def __init__(self, loop): super().__init__() self.loop = loop self._start = None self._timestamp = 0 self.frame_buffer = [] self.audio_data_buffer = bytearray() self.frame_time = 0 self.sample_rate = 24000 self.channels = 1 self.sample_width = 2 self.time_base_fraction = Fraction(1, self.sample_rate) self.samples = int(AUDIO_PTIME * self.sample_rate) self.chunk_size = int(self.samples * self.channels * self.sample_width) def interrupt(self): self.frame_buffer.clear() self.audio_data_buffer.clear() async def add_new_bytes(self, audio_data: bytes): self.audio_data_buffer += audio_data while len(self.audio_data_buffer) >= self.chunk_size: chunk = self.audio_data_buffer[: self.chunk_size] self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :] try: audio_frame = self.buildAudioFrames(chunk) self.frame_buffer.append(audio_frame) logger.debug( f"Added audio frame to buffer, total frames: {len(self.frame_buffer)}" ) except Exception as e: logger.error(f"Error building audio frame: {e}") break def buildAudioFrames(self, chunk: bytes) -> AudioFrame: if len(chunk) != self.chunk_size: logger.warning( f"Incorrect chunk size received {len(chunk)}, expected {self.chunk_size}" ) data = np.frombuffer(chunk, dtype=np.int16) expected_samples = self.samples * self.channels if len(data) != expected_samples: logger.warning( f"Incorrect number of samples in chunk {len(data)}, expected {expected_samples}" ) data = data.reshape(-1, self.channels) layout = "mono" if self.channels == 1 else "stereo" audio_frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout) return audio_frame def next_timestamp(self): pts = int(self.frame_time) time_base = self.time_base_fraction self.frame_time += self.samples return pts, time_base async def recv(self) -> AudioFrame: try: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time() self._timestamp = 0 else: self._timestamp += self.samples wait = self._start + (self._timestamp / self.sample_rate) - time() if wait > 0: await asyncio.sleep(wait) pts, time_base = self.next_timestamp() if len(self.frame_buffer) > 0: frame = self.frame_buffer.pop(0) else: frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) frame.pts = pts frame.time_base = time_base frame.sample_rate = self.sample_rate return frame except Exception as e: traceback.print_exc() logger.error(f"Error while creating tts->rtc frame: {e}") async def cleanup(self): self.interrupt() self.stop()
A dummy audio track which reads silence.
Ancestors
- videosdk.custom_audio_track.CustomAudioTrack
- vsaiortc.mediastreams.MediaStreamTrack
- pyee.asyncio.AsyncIOEventEmitter
- pyee.base.EventEmitter
Subclasses
Methods
async def add_new_bytes(self, audio_data: bytes)
-
Expand source code
async def add_new_bytes(self, audio_data: bytes): self.audio_data_buffer += audio_data while len(self.audio_data_buffer) >= self.chunk_size: chunk = self.audio_data_buffer[: self.chunk_size] self.audio_data_buffer = self.audio_data_buffer[self.chunk_size :] try: audio_frame = self.buildAudioFrames(chunk) self.frame_buffer.append(audio_frame) logger.debug( f"Added audio frame to buffer, total frames: {len(self.frame_buffer)}" ) except Exception as e: logger.error(f"Error building audio frame: {e}") break
def buildAudioFrames(self, chunk: bytes) ‑> av.audio.frame.AudioFrame
-
Expand source code
def buildAudioFrames(self, chunk: bytes) -> AudioFrame: if len(chunk) != self.chunk_size: logger.warning( f"Incorrect chunk size received {len(chunk)}, expected {self.chunk_size}" ) data = np.frombuffer(chunk, dtype=np.int16) expected_samples = self.samples * self.channels if len(data) != expected_samples: logger.warning( f"Incorrect number of samples in chunk {len(data)}, expected {expected_samples}" ) data = data.reshape(-1, self.channels) layout = "mono" if self.channels == 1 else "stereo" audio_frame = AudioFrame.from_ndarray(data.T, format="s16", layout=layout) return audio_frame
async def cleanup(self)
-
Expand source code
async def cleanup(self): self.interrupt() self.stop()
def interrupt(self)
-
Expand source code
def interrupt(self): self.frame_buffer.clear() self.audio_data_buffer.clear()
def next_timestamp(self)
-
Expand source code
def next_timestamp(self): pts = int(self.frame_time) time_base = self.time_base_fraction self.frame_time += self.samples return pts, time_base
async def recv(self) ‑> av.audio.frame.AudioFrame
-
Expand source code
async def recv(self) -> AudioFrame: try: if self.readyState != "live": raise MediaStreamError if self._start is None: self._start = time() self._timestamp = 0 else: self._timestamp += self.samples wait = self._start + (self._timestamp / self.sample_rate) - time() if wait > 0: await asyncio.sleep(wait) pts, time_base = self.next_timestamp() if len(self.frame_buffer) > 0: frame = self.frame_buffer.pop(0) else: frame = AudioFrame(format="s16", layout="mono", samples=self.samples) for p in frame.planes: p.update(bytes(p.buffer_size)) frame.pts = pts frame.time_base = time_base frame.sample_rate = self.sample_rate return frame except Exception as e: traceback.print_exc() logger.error(f"Error while creating tts->rtc frame: {e}")
Receive the next :class:
~av.audio.frame.AudioFrame
.The base implementation just reads silence, subclass :class:
AudioStreamTrack
to provide a useful implementation.
class DedicatedInferenceResource (resource_id: str, config: Dict[str, Any])
-
Expand source code
class DedicatedInferenceResource(BaseResource): """ Dedicated inference resource that runs AI models in a separate process. This mimics the old IPC system's single shared inference process that handles all STT, LLM, TTS, and VAD tasks for all agent jobs. """ def __init__(self, resource_id: str, config: Dict[str, Any]): super().__init__(resource_id, config) self.process: Optional[Process] = None self.parent_conn: Optional[Connection] = None self.child_conn: Optional[Connection] = None self._process_ready = False self._models_cache: Dict[str, Any] = {} # Inference-specific configuration self.initialize_timeout = config.get("inference_process_timeout", 30.0) self.memory_warn_mb = config.get("inference_memory_warn_mb", 1000.0) self.ping_interval = config.get("ping_interval", 30.0) @property def resource_type(self) -> ResourceType: return ResourceType.PROCESS async def _initialize_impl(self) -> None: """Initialize the dedicated inference process.""" logger.info(f"Initializing dedicated inference process: {self.resource_id}") # Create pipe for communication self.parent_conn, self.child_conn = Pipe() # Start the inference process self.process = Process( target=self._run_inference_process, args=(self.resource_id, self.child_conn, self.config), daemon=True, ) self.process.start() # Wait for process to be ready start_time = time.time() while ( not self._process_ready and (time.time() - start_time) < self.initialize_timeout ): try: if self.parent_conn.poll(): message = self.parent_conn.recv() if message.get("type") == "ready": self._process_ready = True break elif message.get("type") == "error": raise Exception( f"Inference process error: {message.get('error')}" ) await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking inference process readiness: {e}") if not self._process_ready: raise TimeoutError( f"Inference process {self.resource_id} failed to initialize within {self.initialize_timeout}s" ) logger.info( f"Dedicated inference process initialized: {self.resource_id} (PID: {self.process.pid})" ) async def _execute_task_impl( self, task_id: str, config, entrypoint: Callable, args: tuple, kwargs: dict ) -> Any: """Execute inference task in the dedicated process.""" if not self._process_ready: raise RuntimeError(f"Inference process {self.resource_id} is not ready") # Prepare inference data inference_data = { "task_id": task_id, "task_type": config.task_type.value, "model_config": config.data.get("model_config", {}), "input_data": config.data.get("input_data", {}), "timeout": config.timeout, } # Send inference request to process self.parent_conn.send({"type": "inference", "data": inference_data}) # Wait for result start_time = time.time() while (time.time() - start_time) < config.timeout: try: if self.parent_conn.poll(): message = self.parent_conn.recv() if ( message.get("type") == "result" and message.get("task_id") == task_id ): if message.get("status") == "success": return message.get("result") else: raise RuntimeError( message.get("error", "Unknown inference error") ) elif message.get("type") == "error": raise RuntimeError( message.get("error", "Inference process error") ) await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking inference result: {e}") raise TimeoutError( f"Inference task {task_id} timed out after {config.timeout}s" ) async def _shutdown_impl(self) -> None: """Shutdown the dedicated inference process.""" if self.process and self.process.is_alive(): # Send shutdown signal self.parent_conn.send({"type": "shutdown"}) # Wait for graceful shutdown timeout = self.config.get("close_timeout", 60.0) start_time = time.time() while self.process.is_alive() and (time.time() - start_time) < timeout: await asyncio.sleep(0.1) # Force terminate if still alive if self.process.is_alive(): logger.warning( f"Force terminating inference process {self.resource_id}" ) self.process.terminate() self.process.join(timeout=5.0) if self.process.is_alive(): self.process.kill() async def health_check(self) -> bool: """Perform a health check on the dedicated inference process.""" try: if self._shutdown or not self.process or not self.process.is_alive(): return False # Send ping to inference process self.parent_conn.send({"type": "ping"}) # Wait for ping response start_time = time.time() timeout = 5.0 # 5 second timeout for health check while (time.time() - start_time) < timeout: try: if self.parent_conn.poll(): message = self.parent_conn.recv() if message.get("type") == "ping_response": # Update last heartbeat self.last_heartbeat = time.time() return True elif message.get("type") == "error": logger.error( f"Inference process error: {message.get('error')}" ) return False await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking inference process health: {e}") # Timeout - process is unresponsive logger.warning(f"Inference process {self.resource_id} health check timeout") return False except Exception as e: logger.error( f"Health check failed for inference process {self.resource_id}: {e}" ) return False @staticmethod def _run_inference_process( resource_id: str, conn: Connection, config: Dict[str, Any] ): """Run the inference process in a separate process.""" try: # Set up logging logging.basicConfig(level=logging.INFO) logger.info( f"Inference process started: {resource_id} (PID: {os.getpid()})" ) # Set up signal handlers def signal_handler(signum, frame): logger.info("Received shutdown signal") conn.send({"type": "shutdown_ack"}) sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # Send ready signal conn.send({"type": "ready"}) # Model cache for reuse models_cache: Dict[str, Any] = {} async def main_loop(): while True: try: if conn.poll(timeout=1.0): message = conn.recv() message_type = message.get("type") if message_type == "inference": await _handle_inference( conn, message.get("data", {}), models_cache ) elif message_type == "ping": await _handle_ping(conn) elif message_type == "shutdown": logger.info("Received shutdown request") conn.send({"type": "shutdown_ack"}) break else: logger.warning(f"Unknown message type: {message_type}") except Exception as e: logger.error(f"Error in inference process main loop: {e}") conn.send({"type": "error", "error": str(e)}) asyncio.run(main_loop()) except Exception as e: logger.error(f"Fatal error in inference process: {e}") conn.send({"type": "error", "error": str(e)}) sys.exit(1) finally: logger.info("Inference process shutting down") conn.close()
Dedicated inference resource that runs AI models in a separate process.
This mimics the old IPC system's single shared inference process that handles all STT, LLM, TTS, and VAD tasks for all agent jobs.
Ancestors
- BaseResource
- abc.ABC
Methods
async def health_check(self) ‑> bool
-
Expand source code
async def health_check(self) -> bool: """Perform a health check on the dedicated inference process.""" try: if self._shutdown or not self.process or not self.process.is_alive(): return False # Send ping to inference process self.parent_conn.send({"type": "ping"}) # Wait for ping response start_time = time.time() timeout = 5.0 # 5 second timeout for health check while (time.time() - start_time) < timeout: try: if self.parent_conn.poll(): message = self.parent_conn.recv() if message.get("type") == "ping_response": # Update last heartbeat self.last_heartbeat = time.time() return True elif message.get("type") == "error": logger.error( f"Inference process error: {message.get('error')}" ) return False await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking inference process health: {e}") # Timeout - process is unresponsive logger.warning(f"Inference process {self.resource_id} health check timeout") return False except Exception as e: logger.error( f"Health check failed for inference process {self.resource_id}: {e}" ) return False
Perform a health check on the dedicated inference process.
Inherited members
class DirectRoomOptions (room_id: str,
participant_identity: str | None = None,
room_name: str | None = None,
auth_token: str | None = None)-
Expand source code
@dataclass class DirectRoomOptions: """Information for direct room joining without backend registration.""" room_id: str participant_identity: Optional[str] = None room_name: Optional[str] = None auth_token: Optional[str] = None
Information for direct room joining without backend registration.
Instance variables
var auth_token : str | None
var participant_identity : str | None
var room_id : str
var room_name : str | None
class EOU (threshold: float = 0.7)
-
Expand source code
class EOU(EventEmitter[Literal["error"]]): """Base class for End of Utterance Detection implementations""" def __init__(self, threshold: float = 0.7) -> None: super().__init__() self._label = f"{type(self).__module__}.{type(self).__name__}" self._threshold = threshold @property def label(self) -> str: """Get the EOU provider label""" return self._label @property def threshold(self) -> float: """Get the EOU detection threshold""" return self._threshold @abstractmethod def get_eou_probability(self, chat_context: ChatContext) -> float: """ Get the probability score for end of utterance detection. Args: chat_context: Chat context to analyze Returns: float: Probability score (0.0 to 1.0) """ raise NotImplementedError def detect_end_of_utterance(self, chat_context: ChatContext, threshold: Optional[float] = None) -> bool: """ Detect if the given chat context represents an end of utterance. Args: chat_context: Chat context to analyze threshold: Optional threshold override Returns: bool: True if end of utterance is detected, False otherwise """ if threshold is None: threshold = self._threshold probability = self.get_eou_probability(chat_context) return probability >= threshold def set_threshold(self, threshold: float) -> None: """Update the EOU detection threshold""" self._threshold = threshold
Base class for End of Utterance Detection implementations
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop label : str
-
Expand source code
@property def label(self) -> str: """Get the EOU provider label""" return self._label
Get the EOU provider label
prop threshold : float
-
Expand source code
@property def threshold(self) -> float: """Get the EOU detection threshold""" return self._threshold
Get the EOU detection threshold
Methods
def detect_end_of_utterance(self,
chat_context: ChatContext,
threshold: Optional[float] = None) ‑> bool-
Expand source code
def detect_end_of_utterance(self, chat_context: ChatContext, threshold: Optional[float] = None) -> bool: """ Detect if the given chat context represents an end of utterance. Args: chat_context: Chat context to analyze threshold: Optional threshold override Returns: bool: True if end of utterance is detected, False otherwise """ if threshold is None: threshold = self._threshold probability = self.get_eou_probability(chat_context) return probability >= threshold
Detect if the given chat context represents an end of utterance.
Args
chat_context
- Chat context to analyze
threshold
- Optional threshold override
Returns
bool
- True if end of utterance is detected, False otherwise
def get_eou_probability(self,
chat_context: ChatContext) ‑> float-
Expand source code
@abstractmethod def get_eou_probability(self, chat_context: ChatContext) -> float: """ Get the probability score for end of utterance detection. Args: chat_context: Chat context to analyze Returns: float: Probability score (0.0 to 1.0) """ raise NotImplementedError
Get the probability score for end of utterance detection.
Args
chat_context
- Chat context to analyze
Returns
float
- Probability score (0.0 to 1.0)
def set_threshold(self, threshold: float) ‑> None
-
Expand source code
def set_threshold(self, threshold: float) -> None: """Update the EOU detection threshold""" self._threshold = threshold
Update the EOU detection threshold
class EncodeOptions (format: "Literal['JPEG', 'PNG']" = 'JPEG',
resize_options: ResizeOptions = <factory>,
quality: int = 90)-
Expand source code
@dataclass class EncodeOptions: """Configuration settings for converting av.VideoFrame into standard image formats.""" format: Literal["JPEG", "PNG"] = "JPEG" """The encoding format for the image.""" resize_options: ResizeOptions = field(default_factory=lambda: ResizeOptions( width=320, height=240 )) """Settings for adjusting the image size.""" quality: int = 90 """Compression level for the image, ranging from 0 to 100. Applicable only to JPEG."""
Configuration settings for converting av.VideoFrame into standard image formats.
Instance variables
var format : Literal['JPEG', 'PNG']
-
The encoding format for the image.
var quality : int
-
Compression level for the image, ranging from 0 to 100. Applicable only to JPEG.
var resize_options : ResizeOptions
-
Settings for adjusting the image size.
class EventEmitter
-
Expand source code
class EventEmitter(Generic[T]): def __init__(self) -> None: self._handlers: Dict[T, List[Callable[..., Any]]] = {} def on( self, event: T, callback: Callable[..., Any] | None = None ) -> Callable[..., Any]: def register(handler: Callable[..., Any]) -> Callable[..., Any]: if asyncio.iscoroutinefunction(handler): raise ValueError( "Async handlers are not supported. Use a sync wrapper." ) handlers = self._handlers.setdefault(event, []) if handler not in handlers: handlers.append(handler) return handler return register if callback is None else register(callback) def off(self, event: T, callback: Callable[..., Any]) -> None: if event in self._handlers: try: self._handlers[event].remove(callback) except ValueError: pass if not self._handlers[event]: del self._handlers[event] def emit(self, event: T, *args: Any) -> None: callbacks = self._handlers.get(event) if not callbacks: return arguments = args if args else ({},) for cb in callbacks[:]: try: self._invoke(cb, arguments) except Exception as ex: logger.error(f"Handler raised exception on event '{event}': {ex}") def _invoke(self, func: Callable[..., Any], args: tuple[Any, ...]) -> None: code = func.__code__ argcount = code.co_argcount flags = code.co_flags has_varargs = flags & 0x04 != 0 # If the function expects no arguments (only self), don't pass any if argcount == 1 and hasattr(func, "__self__"): # Only self parameter func() elif has_varargs: func(*args) else: func(*args[:argcount])
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Ancestors
- typing.Generic
Subclasses
- Agent
- CascadingPipeline
- ConversationFlow
- Denoise
- EOU
- EventBus
- LLM
- Pipeline
- RealtimeBaseModel
- RealTimePipeline
- STT
- TTS
- VAD
Methods
def emit(self, event: -T, *args: Any) ‑> None
-
Expand source code
def emit(self, event: T, *args: Any) -> None: callbacks = self._handlers.get(event) if not callbacks: return arguments = args if args else ({},) for cb in callbacks[:]: try: self._invoke(cb, arguments) except Exception as ex: logger.error(f"Handler raised exception on event '{event}': {ex}")
def off(self, event: -T, callback: Callable[..., Any]) ‑> None
-
Expand source code
def off(self, event: T, callback: Callable[..., Any]) -> None: if event in self._handlers: try: self._handlers[event].remove(callback) except ValueError: pass if not self._handlers[event]: del self._handlers[event]
def on(self, event: -T, callback: Callable[..., Any] | None = None) ‑> Callable[..., Any]
-
Expand source code
def on( self, event: T, callback: Callable[..., Any] | None = None ) -> Callable[..., Any]: def register(handler: Callable[..., Any]) -> Callable[..., Any]: if asyncio.iscoroutinefunction(handler): raise ValueError( "Async handlers are not supported. Use a sync wrapper." ) handlers = self._handlers.setdefault(event, []) if handler not in handlers: handlers.append(handler) return handler return register if callback is None else register(callback)
class ExecutorType (*args, **kwds)
-
Expand source code
class ExecutorType(Enum): """Type of executor for task processing.""" PROCESS = "process" THREAD = "thread"
Type of executor for task processing.
Ancestors
- enum.Enum
Class variables
var PROCESS
var THREAD
class FunctionCall (**data: Any)
-
Expand source code
class FunctionCall(BaseModel): """ Represents a function call in the chat context. Attributes: id (str): Unique identifier for the function call. Auto-generated if not provided. type (Literal["function_call"]): Type identifier, always "function_call". name (str): Name of the function to be called. arguments (str): JSON string containing the function arguments. call_id (str): Unique identifier linking this call to its output. """ id: str = Field(default_factory=lambda: f"call_{int(time.time())}") type: Literal["function_call"] = "function_call" name: str arguments: str call_id: str
Represents a function call in the chat context.
Attributes
id
:str
- Unique identifier for the function call. Auto-generated if not provided.
- type (Literal["function_call"]): Type identifier, always "function_call".
name
:str
- Name of the function to be called.
arguments
:str
- JSON string containing the function arguments.
call_id
:str
- Unique identifier linking this call to its output.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var arguments : str
var call_id : str
var id : str
var model_config
var name : str
var type : Literal['function_call']
class FunctionCallOutput (**data: Any)
-
Expand source code
class FunctionCallOutput(BaseModel): """ Represents the output of a function call. Attributes: id (str): Unique identifier for the function output. Auto-generated if not provided. type (Literal["function_call_output"]): Type identifier, always "function_call_output". name (str): Name of the function that was called. call_id (str): Identifier linking this output to the original function call. output (str): The result or output from the function execution. is_error (bool): Flag indicating if the function execution failed. """ id: str = Field(default_factory=lambda: f"output_{int(time.time())}") type: Literal["function_call_output"] = "function_call_output" name: str call_id: str output: str is_error: bool = False
Represents the output of a function call.
Attributes
id
:str
- Unique identifier for the function output. Auto-generated if not provided.
- type (Literal["function_call_output"]): Type identifier, always "function_call_output".
name
:str
- Name of the function that was called.
call_id
:str
- Identifier linking this output to the original function call.
output
:str
- The result or output from the function execution.
is_error
:bool
- Flag indicating if the function execution failed.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var call_id : str
var id : str
var is_error : bool
var model_config
var name : str
var output : str
var type : Literal['function_call_output']
class FunctionTool (*args, **kwargs)
-
Expand source code
@runtime_checkable class FunctionTool(Protocol): @property @abstractmethod def _tool_info(self) -> "FunctionToolInfo": ... def __call__(self, *args: Any, **kwargs: Any) -> Any: ...
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: ...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto[T](Protocol): def meth(self) -> T: ...
Ancestors
- typing.Protocol
- typing.Generic
class FunctionToolInfo (name: str,
description: str | None = None,
parameters_schema: Optional[dict] = None)-
Expand source code
@dataclass class FunctionToolInfo: name: str description: str | None = None parameters_schema: Optional[dict] = None
FunctionToolInfo(name: 'str', description: 'str | None' = None, parameters_schema: 'Optional[dict]' = None)
Instance variables
var description : str | None
var name : str
var parameters_schema : dict | None
class HealthMetrics (resource_id: str,
timestamp: float,
memory_usage_mb: float,
cpu_usage_percent: float,
active_tasks: int,
response_time_ms: float,
error_count: int = 0,
success_count: int = 0)-
Expand source code
@dataclass class HealthMetrics: """Health metrics for resource monitoring.""" resource_id: str timestamp: float memory_usage_mb: float cpu_usage_percent: float active_tasks: int response_time_ms: float error_count: int = 0 success_count: int = 0
Health metrics for resource monitoring.
Instance variables
var active_tasks : int
var cpu_usage_percent : float
var error_count : int
var memory_usage_mb : float
var resource_id : str
var response_time_ms : float
var success_count : int
var timestamp : float
class ImageContent (**data: Any)
-
Expand source code
class ImageContent(BaseModel): """ Represents image content in chat messages. Attributes: id (str): Unique identifier for the image. Auto-generated if not provided. type (Literal["image"]): Type identifier, always "image". image (Union[av.VideoFrame, str]): The image data as VideoFrame or URL string. inference_detail (Literal["auto", "high", "low"]): Detail level for LLM image analysis. encode_options (EncodeOptions): Configuration for image encoding and resizing. """ id: str = Field(default_factory=lambda: f"img_{int(time.time())}") type: Literal["image"] = "image" image: Union[av.VideoFrame, str] inference_detail: Literal["auto", "high", "low"] = "auto" encode_options: EncodeOptions = Field( default_factory=lambda: EncodeOptions( format="JPEG", quality=90, resize_options=ResizeOptions( width=320, height=240 ), ) ) class Config: arbitrary_types_allowed = True def to_data_url(self) -> str: """ Convert the image to a data URL format. Returns: str: A data URL string representing the image. """ if isinstance(self.image, str): return self.image encoded_image = images.encode(self.image, self.encode_options) b64_image = base64.b64encode(encoded_image).decode("utf-8") return f"data:image/{self.encode_options.format.lower()};base64,{b64_image}"
Represents image content in chat messages.
Attributes
id
:str
- Unique identifier for the image. Auto-generated if not provided.
- type (Literal["image"]): Type identifier, always "image".
image
:Union[av.VideoFrame, str]
- The image data as VideoFrame or URL string.
- inference_detail (Literal["auto", "high", "low"]): Detail level for LLM image analysis.
encode_options
:EncodeOptions
- Configuration for image encoding and resizing.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var Config
var encode_options : EncodeOptions
var id : str
var image : av.video.frame.VideoFrame | str
var inference_detail : Literal['auto', 'high', 'low']
var model_config
var type : Literal['image']
Methods
def to_data_url(self) ‑> str
-
Expand source code
def to_data_url(self) -> str: """ Convert the image to a data URL format. Returns: str: A data URL string representing the image. """ if isinstance(self.image, str): return self.image encoded_image = images.encode(self.image, self.encode_options) b64_image = base64.b64encode(encoded_image).decode("utf-8") return f"data:image/{self.encode_options.format.lower()};base64,{b64_image}"
Convert the image to a data URL format.
Returns
str
- A data URL string representing the image.
class JobContext (*,
room_options: RoomOptions,
loop: asyncio.events.AbstractEventLoop | None = None)-
Expand source code
class JobContext: def __init__( self, *, room_options: RoomOptions, loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: self.room_options = room_options self._loop = loop or asyncio.get_event_loop() self._pipeline: Optional[Pipeline] = None self.videosdk_auth = self.room_options.auth_token or os.getenv( "VIDEOSDK_AUTH_TOKEN" ) self.room: Optional[VideoSDKHandler] = None self._shutdown_callbacks: list[Callable[[], Coroutine[None, None, None]]] = [] self._is_shutting_down: bool = False self.want_console = (len(sys.argv) > 1 and sys.argv[1].lower() == "console") def _set_pipeline_internal(self, pipeline: Any) -> None: """Internal method called by pipeline constructors""" self._pipeline = pipeline if self.room: self.room.pipeline = pipeline if hasattr(pipeline, "_set_loop_and_audio_track"): pipeline._set_loop_and_audio_track(self._loop, self.room.audio_track) # Ensure our lambda function fix is preserved after pipeline setup # This prevents the pipeline from overriding our event handlers if hasattr(self.room, "meeting") and self.room.meeting: # Re-apply our lambda function fix to ensure it's not overridden self.room.meeting.add_event_listener( self.room._create_meeting_handler() ) async def connect(self) -> None: """Connect to the room""" if self.room_options: if not self.room_options.room_id: self.room_options.room_id = self.get_room_id() custom_camera_video_track = None custom_microphone_audio_track = None sinks = [] avatar = self.room_options.avatar if not avatar and self._pipeline and hasattr(self._pipeline, "avatar"): avatar = self._pipeline.avatar if avatar: await avatar.connect() custom_camera_video_track = avatar.video_track custom_microphone_audio_track = avatar.audio_track sinks.append(avatar) if self.want_console: from .console_mode import setup_console_voice_for_ctx if not self._pipeline: raise RuntimeError("Pipeline must be constructed before ctx.connect() in console mode") cleanup_callback = await setup_console_voice_for_ctx(self) self.add_shutdown_callback(cleanup_callback) else: if self.room_options.join_meeting: self.room = VideoSDKHandler( meeting_id=self.room_options.room_id, auth_token=self.videosdk_auth, name=self.room_options.name, pipeline=self._pipeline, loop=self._loop, vision=self.room_options.vision, recording=self.room_options.recording, custom_camera_video_track=custom_camera_video_track, custom_microphone_audio_track=custom_microphone_audio_track, audio_sinks=sinks, on_room_error= self.room_options.on_room_error, auto_end_session=self.room_options.auto_end_session, session_timeout_seconds=self.room_options.session_timeout_seconds, signaling_base_url=self.room_options.signaling_base_url, ) if self._pipeline and hasattr(self._pipeline, '_set_loop_and_audio_track'): self._pipeline._set_loop_and_audio_track(self._loop, self.room.audio_track) if self.room and self.room_options.join_meeting: self.room.init_meeting() await self.room.join() if self.room_options.playground and self.room_options.join_meeting and not self.want_console: if self.videosdk_auth: playground_url = f"https://playground.videosdk.live?token={self.videosdk_auth}&meetingId={self.room_options.room_id}" print(f"\033[1;36m" + "Agent started in playground mode" + "\033[0m") print("\033[1;75m" + "Interact with agent here at:" + "\033[0m") print("\033[1;4;94m" + playground_url + "\033[0m") else: raise ValueError("VIDEOSDK_AUTH_TOKEN environment variable not found") async def shutdown(self) -> None: """Called by Worker during graceful shutdown""" if self._is_shutting_down: logger.info("JobContext already shutting down") return self._is_shutting_down = True for callback in self._shutdown_callbacks: try: await callback() except Exception as e: logger.error(f"Error in shutdown callback: {e}") if self.room: try: await self.room.leave() except Exception as e: logger.error(f"Error during room leave: {e}") try: if hasattr(self.room, "cleanup"): await self.room.cleanup() except Exception as e: logger.error(f"Error during room cleanup: {e}") self.room = None def add_shutdown_callback( self, callback: Callable[[], Coroutine[None, None, None]] ) -> None: """Add a callback to be called during shutdown""" self._shutdown_callbacks.append(callback) async def wait_for_participant(self, participant_id: str | None = None) -> str: if self.room: return await self.room.wait_for_participant(participant_id) else: raise ValueError("Room not initialized") def get_room_id(self) -> str: """ Creates a new room using the VideoSDK API and returns the room ID. Raises: ValueError: If the VIDEOSDK_AUTH_TOKEN is missing. RuntimeError: If the API request fails or the response is invalid. """ if self.want_console: return None if self.videosdk_auth: url = "https://api.videosdk.live/v2/rooms" headers = {"Authorization": self.videosdk_auth} try: response = requests.post(url, headers=headers) response.raise_for_status() except requests.RequestException as e: raise RuntimeError(f"Failed to create room: {e}") from e data = response.json() room_id = data.get("roomId") if not room_id: raise RuntimeError(f"Unexpected API response, missing roomId: {data}") return room_id else: raise ValueError( "VIDEOSDK_AUTH_TOKEN not found. " "Set it as an environment variable or provide it in room options via auth_token." )
Methods
def add_shutdown_callback(self, callback: Callable[[], Coroutine[None, None, None]]) ‑> None
-
Expand source code
def add_shutdown_callback( self, callback: Callable[[], Coroutine[None, None, None]] ) -> None: """Add a callback to be called during shutdown""" self._shutdown_callbacks.append(callback)
Add a callback to be called during shutdown
async def connect(self) ‑> None
-
Expand source code
async def connect(self) -> None: """Connect to the room""" if self.room_options: if not self.room_options.room_id: self.room_options.room_id = self.get_room_id() custom_camera_video_track = None custom_microphone_audio_track = None sinks = [] avatar = self.room_options.avatar if not avatar and self._pipeline and hasattr(self._pipeline, "avatar"): avatar = self._pipeline.avatar if avatar: await avatar.connect() custom_camera_video_track = avatar.video_track custom_microphone_audio_track = avatar.audio_track sinks.append(avatar) if self.want_console: from .console_mode import setup_console_voice_for_ctx if not self._pipeline: raise RuntimeError("Pipeline must be constructed before ctx.connect() in console mode") cleanup_callback = await setup_console_voice_for_ctx(self) self.add_shutdown_callback(cleanup_callback) else: if self.room_options.join_meeting: self.room = VideoSDKHandler( meeting_id=self.room_options.room_id, auth_token=self.videosdk_auth, name=self.room_options.name, pipeline=self._pipeline, loop=self._loop, vision=self.room_options.vision, recording=self.room_options.recording, custom_camera_video_track=custom_camera_video_track, custom_microphone_audio_track=custom_microphone_audio_track, audio_sinks=sinks, on_room_error= self.room_options.on_room_error, auto_end_session=self.room_options.auto_end_session, session_timeout_seconds=self.room_options.session_timeout_seconds, signaling_base_url=self.room_options.signaling_base_url, ) if self._pipeline and hasattr(self._pipeline, '_set_loop_and_audio_track'): self._pipeline._set_loop_and_audio_track(self._loop, self.room.audio_track) if self.room and self.room_options.join_meeting: self.room.init_meeting() await self.room.join() if self.room_options.playground and self.room_options.join_meeting and not self.want_console: if self.videosdk_auth: playground_url = f"https://playground.videosdk.live?token={self.videosdk_auth}&meetingId={self.room_options.room_id}" print(f"\033[1;36m" + "Agent started in playground mode" + "\033[0m") print("\033[1;75m" + "Interact with agent here at:" + "\033[0m") print("\033[1;4;94m" + playground_url + "\033[0m") else: raise ValueError("VIDEOSDK_AUTH_TOKEN environment variable not found")
Connect to the room
def get_room_id(self) ‑> str
-
Expand source code
def get_room_id(self) -> str: """ Creates a new room using the VideoSDK API and returns the room ID. Raises: ValueError: If the VIDEOSDK_AUTH_TOKEN is missing. RuntimeError: If the API request fails or the response is invalid. """ if self.want_console: return None if self.videosdk_auth: url = "https://api.videosdk.live/v2/rooms" headers = {"Authorization": self.videosdk_auth} try: response = requests.post(url, headers=headers) response.raise_for_status() except requests.RequestException as e: raise RuntimeError(f"Failed to create room: {e}") from e data = response.json() room_id = data.get("roomId") if not room_id: raise RuntimeError(f"Unexpected API response, missing roomId: {data}") return room_id else: raise ValueError( "VIDEOSDK_AUTH_TOKEN not found. " "Set it as an environment variable or provide it in room options via auth_token." )
Creates a new room using the VideoSDK API and returns the room ID.
Raises
ValueError
- If the VIDEOSDK_AUTH_TOKEN is missing.
RuntimeError
- If the API request fails or the response is invalid.
async def shutdown(self) ‑> None
-
Expand source code
async def shutdown(self) -> None: """Called by Worker during graceful shutdown""" if self._is_shutting_down: logger.info("JobContext already shutting down") return self._is_shutting_down = True for callback in self._shutdown_callbacks: try: await callback() except Exception as e: logger.error(f"Error in shutdown callback: {e}") if self.room: try: await self.room.leave() except Exception as e: logger.error(f"Error during room leave: {e}") try: if hasattr(self.room, "cleanup"): await self.room.cleanup() except Exception as e: logger.error(f"Error during room cleanup: {e}") self.room = None
Called by Worker during graceful shutdown
async def wait_for_participant(self, participant_id: str | None = None) ‑> str
-
Expand source code
async def wait_for_participant(self, participant_id: str | None = None) -> str: if self.room: return await self.room.wait_for_participant(participant_id) else: raise ValueError("Room not initialized")
class LLM
-
Expand source code
class LLM(EventEmitter[Literal["error"]]): """ Base class for LLM implementations. """ def __init__(self) -> None: """ Initialize the LLM base class. """ super().__init__() self._label = f"{type(self).__module__}.{type(self).__name__}" @property def label(self) -> str: """ Get the LLM provider label. Returns: str: A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM"). """ return self._label @abstractmethod async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Main method to interact with the LLM. Args: messages (ChatContext): The conversation context containing message history. tools (list[FunctionTool] | None, optional): List of available function tools for the LLM to use. **kwargs (Any): Additional arguments specific to the LLM provider implementation. Returns: AsyncIterator[LLMResponse]: An async iterator yielding LLMResponse objects as they're generated. Raises: NotImplementedError: This method must be implemented by subclasses. """ raise NotImplementedError @abstractmethod async def cancel_current_generation(self) -> None: """ Cancel the current LLM generation if active. Raises: NotImplementedError: This method must be implemented by subclasses. """ # override in subclasses pass async def aclose(self) -> None: """ Cleanup resources. """ await self.cancel_current_generation() pass async def __aenter__(self) -> LLM: """ Async context manager entry point. """ return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """ Async context manager exit point. """ await self.aclose()
Base class for LLM implementations.
Initialize the LLM base class.
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop label : str
-
Expand source code
@property def label(self) -> str: """ Get the LLM provider label. Returns: str: A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM"). """ return self._label
Get the LLM provider label.
Returns
str
- A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM").
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """ Cleanup resources. """ await self.cancel_current_generation() pass
Cleanup resources.
async def cancel_current_generation(self) ‑> None
-
Expand source code
@abstractmethod async def cancel_current_generation(self) -> None: """ Cancel the current LLM generation if active. Raises: NotImplementedError: This method must be implemented by subclasses. """ # override in subclasses pass
Cancel the current LLM generation if active.
Raises
NotImplementedError
- This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
**kwargs: Any) ‑> AsyncIterator[LLMResponse]-
Expand source code
@abstractmethod async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Main method to interact with the LLM. Args: messages (ChatContext): The conversation context containing message history. tools (list[FunctionTool] | None, optional): List of available function tools for the LLM to use. **kwargs (Any): Additional arguments specific to the LLM provider implementation. Returns: AsyncIterator[LLMResponse]: An async iterator yielding LLMResponse objects as they're generated. Raises: NotImplementedError: This method must be implemented by subclasses. """ raise NotImplementedError
Main method to interact with the LLM.
Args
messages
:ChatContext
- The conversation context containing message history.
tools
:list[FunctionTool] | None
, optional- List of available function tools for the LLM to use.
**kwargs
:Any
- Additional arguments specific to the LLM provider implementation.
Returns
AsyncIterator[LLMResponse]
- An async iterator yielding LLMResponse objects as they're generated.
Raises
NotImplementedError
- This method must be implemented by subclasses.
class LLMResponse (**data: Any)
-
Expand source code
class LLMResponse(BaseModel): """ Data model to hold LLM response data. Attributes: content (str): The text content generated by the LLM. role (ChatRole): The role of the response (typically ASSISTANT). metadata (Optional[dict[str, Any]]): Additional response metadata from the LLM provider. """ content: str role: ChatRole metadata: Optional[dict[str, Any]] = None
Data model to hold LLM response data.
Attributes
content
:str
- The text content generated by the LLM.
role
:ChatRole
- The role of the response (typically ASSISTANT).
metadata
:Optional[dict[str, Any]]
- Additional response metadata from the LLM provider.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var content : str
var metadata : dict[str, typing.Any] | None
var model_config
var role : ChatRole
class MCPServerHTTP (endpoint_url: str,
request_headers: Dict[str, Any] | None = None,
connection_timeout: float = 10.0,
stream_read_timeout: float = 300.0,
session_timeout: float = 5.0)-
Expand source code
class MCPServerHTTP(MCPServiceProvider): """ HTTP/Web-based MCP service provider with automatic transport detection. """ def __init__( self, endpoint_url: str, request_headers: Optional[Dict[str, Any]] = None, connection_timeout: float = 10.0, stream_read_timeout: float = 300.0, session_timeout: float = 5.0, ): """ Initialize the HTTP MCP server provider. Args: endpoint_url (str): The HTTP endpoint URL for the MCP server. request_headers (Optional[Dict[str, Any]], optional): Optional HTTP request headers. connection_timeout (float, optional): Connection timeout in seconds. Defaults to 10.0. stream_read_timeout (float, optional): Stream read timeout in seconds. Defaults to 300.0. session_timeout (float, optional): Session timeout in seconds. Defaults to 5.0. """ super().__init__(session_timeout) self.endpoint_url = endpoint_url self.request_headers = request_headers or {} self.connection_timeout = connection_timeout self.stream_read_timeout = stream_read_timeout self.transport_mode = HTTPTransportDetector.detect_transport_mode( endpoint_url) def get_stream_provider(self): """ Get appropriate stream provider based on detected transport. """ timeout_delta = timedelta(seconds=self.connection_timeout) if self.transport_mode == 'streamable_http': return streamablehttp_client( url=self.endpoint_url, headers=self.request_headers, timeout=timeout_delta, ) else: return sse_client( url=self.endpoint_url, headers=self.request_headers, timeout=self.connection_timeout, ) def __repr__(self) -> str: """ String representation of the HTTP MCP server provider. """ return f"MCPServerHTTP(url={self.endpoint_url}, transport={self.transport_mode})"
HTTP/Web-based MCP service provider with automatic transport detection.
Initialize the HTTP MCP server provider.
Args
endpoint_url
:str
- The HTTP endpoint URL for the MCP server.
request_headers
:Optional[Dict[str, Any]]
, optional- Optional HTTP request headers.
connection_timeout
:float
, optional- Connection timeout in seconds. Defaults to 10.0.
stream_read_timeout
:float
, optional- Stream read timeout in seconds. Defaults to 300.0.
session_timeout
:float
, optional- Session timeout in seconds. Defaults to 5.0.
Ancestors
- MCPServiceProvider
- abc.ABC
Methods
def get_stream_provider(self)
-
Expand source code
def get_stream_provider(self): """ Get appropriate stream provider based on detected transport. """ timeout_delta = timedelta(seconds=self.connection_timeout) if self.transport_mode == 'streamable_http': return streamablehttp_client( url=self.endpoint_url, headers=self.request_headers, timeout=timeout_delta, ) else: return sse_client( url=self.endpoint_url, headers=self.request_headers, timeout=self.connection_timeout, )
Get appropriate stream provider based on detected transport.
Inherited members
class MCPServerStdio (executable_path: str,
process_arguments: List[str],
environment_vars: Dict[str, str] | None = None,
working_directory: str | pathlib.Path | None = None,
session_timeout: float = 5.0)-
Expand source code
class MCPServerStdio(MCPServiceProvider): """ Process-based MCP service provider for local applications. """ def __init__( self, executable_path: str, process_arguments: List[str], environment_vars: Optional[Dict[str, str]] = None, working_directory: Optional[str | Path] = None, session_timeout: float = 5.0, ): """ Initialize the stdio MCP server provider. Args: executable_path (str): Path to the executable MCP server. process_arguments (List[str]): Command line arguments to pass to the executable. environment_vars (Optional[Dict[str, str]], optional): Optional environment variables. working_directory (Optional[str | Path], optional): Working directory for the process. session_timeout (float, optional): Session timeout in seconds. Defaults to 5.0. """ super().__init__(session_timeout) self.executable_path = executable_path self.process_arguments = process_arguments self.environment_vars = environment_vars self.working_directory = Path(working_directory) if working_directory and not isinstance( working_directory, Path) else working_directory def get_stream_provider(self): """ Get stdio stream provider for process communication. """ server_params = StdioServerParameters( command=self.executable_path, args=self.process_arguments, env=self.environment_vars, cwd=self.working_directory ) return stdio_client(server_params) def __repr__(self) -> str: """ String representation of the stdio MCP server provider. """ return (f"MCPServerStdio(executable={self.executable_path}, " f"args={self.process_arguments}, cwd={self.working_directory})")
Process-based MCP service provider for local applications.
Initialize the stdio MCP server provider.
Args
executable_path
:str
- Path to the executable MCP server.
process_arguments
:List[str]
- Command line arguments to pass to the executable.
environment_vars
:Optional[Dict[str, str]]
, optional- Optional environment variables.
working_directory
:Optional[str | Path]
, optional- Working directory for the process.
session_timeout
:float
, optional- Session timeout in seconds. Defaults to 5.0.
Ancestors
- MCPServiceProvider
- abc.ABC
Methods
def get_stream_provider(self)
-
Expand source code
def get_stream_provider(self): """ Get stdio stream provider for process communication. """ server_params = StdioServerParameters( command=self.executable_path, args=self.process_arguments, env=self.environment_vars, cwd=self.working_directory ) return stdio_client(server_params)
Get stdio stream provider for process communication.
Inherited members
class Options (executor_type: Any = None,
num_idle_processes: int = 1,
initialize_timeout: float = 10.0,
close_timeout: float = 60.0,
memory_warn_mb: float = 500.0,
memory_limit_mb: float = 0.0,
ping_interval: float = 30.0,
max_processes: int = 1,
agent_id: str = 'VideoSDKAgent',
auth_token: str | None = None,
permissions: Any = None,
max_retry: int = 16,
load_threshold: float = 0.75,
register: bool = False,
signaling_base_url: str = 'api.videosdk.live',
host: str = '0.0.0.0',
port: int = 8081,
log_level: str = 'INFO')-
Expand source code
@dataclass class Options: """Configuration options for WorkerJob execution.""" executor_type: Any = None # Will be set in __post_init__ """Which executor to use to run jobs. Automatically selected based on platform.""" num_idle_processes: int = 1 """Number of idle processes/threads to keep warm.""" initialize_timeout: float = 10.0 """Maximum amount of time to wait for a process/thread to initialize/prewarm""" close_timeout: float = 60.0 """Maximum amount of time to wait for a job to shut down gracefully""" memory_warn_mb: float = 500.0 """Memory warning threshold in MB.""" memory_limit_mb: float = 0.0 """Maximum memory usage for a job in MB. Defaults to 0 (disabled).""" ping_interval: float = 30.0 """Interval between health check pings.""" max_processes: int = 1 """Maximum number of processes/threads.""" agent_id: str = "VideoSDKAgent" """ID of the agent.""" auth_token: Optional[str] = None """VideoSDK authentication token. Uses VIDEOSDK_AUTH_TOKEN env var if not provided.""" permissions: Any = None # Will be set in __post_init__ """Permissions for the agent participant.""" max_retry: int = 16 """Maximum number of times to retry connecting to VideoSDK.""" load_threshold: float = 0.75 """Load threshold above which worker is marked as unavailable.""" register: bool = False """Whether to register with the backend. Defaults to False for local development.""" signaling_base_url: str = "api.videosdk.live" """Signaling base URL for VideoSDK services. Defaults to api.videosdk.live.""" host: str = "0.0.0.0" """Host for the debug HTTP server.""" port: int = 8081 """Port for the debug HTTP server.""" log_level: str = "INFO" """Log level for SDK logging. Options: DEBUG, INFO, WARNING, ERROR. Defaults to INFO.""" def __post_init__(self): """Post-initialization setup.""" # Import here to avoid circular imports from .worker import ExecutorType, WorkerPermissions, _default_executor_type if self.executor_type is None: self.executor_type = _default_executor_type if self.permissions is None: self.permissions = WorkerPermissions() if not self.auth_token: self.auth_token = os.getenv("VIDEOSDK_AUTH_TOKEN")
Configuration options for WorkerJob execution.
Instance variables
var agent_id : str
-
ID of the agent.
var auth_token : str | None
-
VideoSDK authentication token. Uses VIDEOSDK_AUTH_TOKEN env var if not provided.
var close_timeout : float
-
Maximum amount of time to wait for a job to shut down gracefully
var executor_type : Any
-
Which executor to use to run jobs. Automatically selected based on platform.
var host : str
-
Host for the debug HTTP server.
var initialize_timeout : float
-
Maximum amount of time to wait for a process/thread to initialize/prewarm
var load_threshold : float
-
Load threshold above which worker is marked as unavailable.
var log_level : str
-
Log level for SDK logging. Options: DEBUG, INFO, WARNING, ERROR. Defaults to INFO.
var max_processes : int
-
Maximum number of processes/threads.
var max_retry : int
-
Maximum number of times to retry connecting to VideoSDK.
var memory_limit_mb : float
-
Maximum memory usage for a job in MB. Defaults to 0 (disabled).
var memory_warn_mb : float
-
Memory warning threshold in MB.
var num_idle_processes : int
-
Number of idle processes/threads to keep warm.
var permissions : Any
-
Permissions for the agent participant.
var ping_interval : float
-
Interval between health check pings.
var port : int
-
Port for the debug HTTP server.
var register : bool
-
Whether to register with the backend. Defaults to False for local development.
var signaling_base_url : str
-
Signaling base URL for VideoSDK services. Defaults to api.videosdk.live.
class ProcessResource (resource_id: str, config: Dict[str, Any])
-
Expand source code
class ProcessResource(BaseResource): """ Process-based resource for task execution. Uses multiprocessing to create isolated processes for task execution. """ def __init__(self, resource_id: str, config: Dict[str, Any]): super().__init__(resource_id, config) self.process: Optional[Process] = None self.task_queue: Optional[Queue] = None self.result_queue: Optional[Queue] = None self.control_queue: Optional[Queue] = None self._process_ready = False @property def resource_type(self) -> ResourceType: return ResourceType.PROCESS async def _initialize_impl(self) -> None: """Initialize the process resource.""" # Create queues for communication self.task_queue = Queue() self.result_queue = Queue() self.control_queue = Queue() # Start the process self.process = Process( target=self._process_worker, args=( self.resource_id, self.task_queue, self.result_queue, self.control_queue, self.config, ), daemon=True, ) self.process.start() # Wait for process to be ready timeout = self.config.get("initialize_timeout", 10.0) start_time = time.time() while not self._process_ready and (time.time() - start_time) < timeout: try: # Check for ready signal if not self.control_queue.empty(): message = self.control_queue.get_nowait() if message.get("type") == "ready": self._process_ready = True break await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking process readiness: {e}") if not self._process_ready: raise TimeoutError( f"Process {self.resource_id} failed to initialize within {timeout}s" ) async def _execute_task_impl( self, task_id: str, config, entrypoint: Callable, args: tuple, kwargs: dict ) -> Any: """Execute task in the process.""" if not self._process_ready: raise RuntimeError(f"Process {self.resource_id} is not ready") # Send task to process task_data = { "task_id": task_id, "config": config, "entrypoint_name": entrypoint.__name__, "args": args, "kwargs": kwargs, } self.task_queue.put(task_data) # Wait for result timeout = config.timeout start_time = time.time() while (time.time() - start_time) < timeout: try: if not self.result_queue.empty(): result_data = self.result_queue.get_nowait() if result_data.get("task_id") == task_id: if result_data.get("status") == "success": return result_data.get("result") else: raise RuntimeError( result_data.get("error", "Unknown error") ) await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking task result: {e}") raise TimeoutError(f"Task {task_id} timed out after {timeout}s") async def _shutdown_impl(self) -> None: """Shutdown the process resource.""" if self.process and self.process.is_alive(): # Send shutdown signal self.control_queue.put({"type": "shutdown"}) # Wait for graceful shutdown timeout = self.config.get("close_timeout", 60.0) start_time = time.time() while self.process.is_alive() and (time.time() - start_time) < timeout: await asyncio.sleep(0.1) # Force terminate if still alive if self.process.is_alive(): logger.warning(f"Force terminating process {self.resource_id}") self.process.terminate() self.process.join(timeout=5.0) if self.process.is_alive(): self.process.kill() @staticmethod def _process_worker( resource_id: str, task_queue: Queue, result_queue: Queue, control_queue: Queue, config: Dict[str, Any], ): """Worker function that runs in the process.""" try: logger.info(f"Process worker {resource_id} started") # Signal ready control_queue.put({"type": "ready"}) # Main task processing loop while True: try: # Check for shutdown signal if not control_queue.empty(): message = control_queue.get_nowait() if message.get("type") == "shutdown": break # Check for tasks if not task_queue.empty(): task_data = task_queue.get_nowait() task_id = task_data["task_id"] try: # Execute the task # Note: In a real implementation, you'd need to serialize/deserialize the entrypoint # For now, we'll use a simple approach result = {"status": "completed", "task_id": task_id} result_queue.put( { "task_id": task_id, "status": "success", "result": result, } ) except Exception as e: result_queue.put( {"task_id": task_id, "status": "error", "error": str(e)} ) else: time.sleep(0.1) except Exception as e: logger.error(f"Error in process worker {resource_id}: {e}") time.sleep(1.0) logger.info(f"Process worker {resource_id} shutting down") except Exception as e: logger.error(f"Fatal error in process worker {resource_id}: {e}")
Process-based resource for task execution.
Uses multiprocessing to create isolated processes for task execution.
Ancestors
- BaseResource
- abc.ABC
Inherited members
class RealTimePipeline (model: RealtimeBaseModel,
avatar: Any | None = None,
denoise: Denoise | None = None)-
Expand source code
class RealTimePipeline(Pipeline, EventEmitter[Literal["realtime_start", "realtime_end","user_audio_input_data", "user_speech_started", "realtime_model_transcription"]]): """ RealTime pipeline implementation that processes data in real-time. Inherits from Pipeline base class and adds realtime-specific events. """ def __init__( self, model: RealtimeBaseModel, avatar: Any | None = None, denoise: Denoise | None = None, ) -> None: """ Initialize the realtime pipeline. Args: model: Instance of RealtimeBaseModel to process data config: Configuration dictionary with settings like: - response_modalities: List of enabled modalities - silence_threshold_ms: Silence threshold in milliseconds """ self.model = model self.model.audio_track = None self.agent = None self.avatar = avatar self.vision = False self.denoise = denoise super().__init__() self.model.on("error", self.on_model_error) self.model.on("realtime_model_transcription", self.on_realtime_model_transcription) def set_agent(self, agent: Agent) -> None: self.agent = agent if hasattr(self.model, 'set_agent'): self.model.set_agent(agent) def _configure_components(self) -> None: """Configure pipeline components with the loop""" if self.loop: self.model.loop = self.loop job_context = get_current_job_context() if job_context and job_context.room: requested_vision = getattr(job_context.room, 'vision', False) self.vision = requested_vision model_name = self.model.__class__.__name__ if requested_vision and model_name != 'GeminiRealtime': logger.warning(f"Vision mode requested but {model_name} doesn't support video input. Only GeminiRealtime supports vision. Disabling vision.") self.vision = False if self.avatar: self.model.audio_track = getattr(job_context.room, 'agent_audio_track', None) or job_context.room.audio_track elif self.audio_track: self.model.audio_track = self.audio_track async def start(self, **kwargs: Any) -> None: """ Start the realtime pipeline processing. Overrides the abstract start method from Pipeline base class. Args: **kwargs: Additional arguments for pipeline configuration """ await self.model.connect() self.model.on("user_speech_started", self.on_user_speech_started) async def send_message(self, message: str) -> None: """ Send a message through the realtime model. Delegates to the model's send_message implementation. """ await self.model.send_message(message) async def send_text_message(self, message: str) -> None: """ Send a text message through the realtime model. This method specifically handles text-only input when modalities is ["text"]. """ if hasattr(self.model, 'send_text_message'): await self.model.send_text_message(message) else: await self.model.send_message(message) async def on_audio_delta(self, audio_data: bytes): """ Handle incoming audio data from the user """ if self.denoise: audio_data = await self.denoise.denoise(audio_data) await self.model.handle_audio_input(audio_data) async def on_video_delta(self, video_data: av.VideoFrame): """ Handle incoming video data from the user The model's handle_video_input is now expected to handle the av.VideoFrame. """ if self.vision and hasattr(self.model, 'handle_video_input'): await self.model.handle_video_input(video_data) def on_user_speech_started(self, data: dict) -> None: """ Handle user speech started event """ self._notify_speech_started() async def leave(self) -> None: """ Leave the realtime pipeline. """ if self.room is not None: await self.room.leave() def on_model_error(self, error: Exception): """ Handle errors emitted from the model and send to realtime metrics cascading_metrics_collector. """ error_data = {"message": str(error), "timestamp": time.time()} realtime_metrics_collector.set_realtime_model_error(error_data) logger.error(f"Realtime model error: {error_data}") def on_realtime_model_transcription(self, data: dict) -> None: """ Handle realtime model transcription event """ try: self.emit("realtime_model_transcription", data) except Exception: logger.error(f"Realtime model transcription: {data}") async def cleanup(self): """Cleanup resources""" if hasattr(self, 'room') and self.room is not None: try: await self.room.leave() except Exception as e: logger.error(f"Error while leaving room during cleanup: {e}") try: if hasattr(self.room, 'cleanup'): await self.room.cleanup() except Exception as e: logger.error(f"Error while cleaning up room: {e}") if hasattr(self, 'model'): await self.model.aclose()
RealTime pipeline implementation that processes data in real-time. Inherits from Pipeline base class and adds realtime-specific events.
Initialize the realtime pipeline.
Args
model
- Instance of RealtimeBaseModel to process data
config
- Configuration dictionary with settings like: - response_modalities: List of enabled modalities - silence_threshold_ms: Silence threshold in milliseconds
Ancestors
- Pipeline
- EventEmitter
- typing.Generic
- abc.ABC
Methods
async def cleanup(self)
-
Expand source code
async def cleanup(self): """Cleanup resources""" if hasattr(self, 'room') and self.room is not None: try: await self.room.leave() except Exception as e: logger.error(f"Error while leaving room during cleanup: {e}") try: if hasattr(self.room, 'cleanup'): await self.room.cleanup() except Exception as e: logger.error(f"Error while cleaning up room: {e}") if hasattr(self, 'model'): await self.model.aclose()
Cleanup resources
async def leave(self) ‑> None
-
Expand source code
async def leave(self) -> None: """ Leave the realtime pipeline. """ if self.room is not None: await self.room.leave()
Leave the realtime pipeline.
def on_model_error(self, error: Exception)
-
Expand source code
def on_model_error(self, error: Exception): """ Handle errors emitted from the model and send to realtime metrics cascading_metrics_collector. """ error_data = {"message": str(error), "timestamp": time.time()} realtime_metrics_collector.set_realtime_model_error(error_data) logger.error(f"Realtime model error: {error_data}")
Handle errors emitted from the model and send to realtime metrics cascading_metrics_collector.
def on_realtime_model_transcription(self, data: dict) ‑> None
-
Expand source code
def on_realtime_model_transcription(self, data: dict) -> None: """ Handle realtime model transcription event """ try: self.emit("realtime_model_transcription", data) except Exception: logger.error(f"Realtime model transcription: {data}")
Handle realtime model transcription event
def on_user_speech_started(self, data: dict) ‑> None
-
Expand source code
def on_user_speech_started(self, data: dict) -> None: """ Handle user speech started event """ self._notify_speech_started()
Handle user speech started event
async def on_video_delta(self, video_data: av.VideoFrame)
-
Expand source code
async def on_video_delta(self, video_data: av.VideoFrame): """ Handle incoming video data from the user The model's handle_video_input is now expected to handle the av.VideoFrame. """ if self.vision and hasattr(self.model, 'handle_video_input'): await self.model.handle_video_input(video_data)
Handle incoming video data from the user The model's handle_video_input is now expected to handle the av.VideoFrame.
async def send_message(self, message: str) ‑> None
-
Expand source code
async def send_message(self, message: str) -> None: """ Send a message through the realtime model. Delegates to the model's send_message implementation. """ await self.model.send_message(message)
Send a message through the realtime model. Delegates to the model's send_message implementation.
async def send_text_message(self, message: str) ‑> None
-
Expand source code
async def send_text_message(self, message: str) -> None: """ Send a text message through the realtime model. This method specifically handles text-only input when modalities is ["text"]. """ if hasattr(self.model, 'send_text_message'): await self.model.send_text_message(message) else: await self.model.send_message(message)
Send a text message through the realtime model. This method specifically handles text-only input when modalities is ["text"].
def set_agent(self, agent: Agent) ‑> None
-
Expand source code
def set_agent(self, agent: Agent) -> None: self.agent = agent if hasattr(self.model, 'set_agent'): self.model.set_agent(agent)
async def start(self, **kwargs: Any) ‑> None
-
Expand source code
async def start(self, **kwargs: Any) -> None: """ Start the realtime pipeline processing. Overrides the abstract start method from Pipeline base class. Args: **kwargs: Additional arguments for pipeline configuration """ await self.model.connect() self.model.on("user_speech_started", self.on_user_speech_started)
Start the realtime pipeline processing. Overrides the abstract start method from Pipeline base class.
Args
**kwargs
- Additional arguments for pipeline configuration
Inherited members
class RealtimeBaseModel
-
Expand source code
class RealtimeBaseModel(EventEmitter[Union[BaseEventTypes, TEvent]], Generic[TEvent], ABC): """ Base class for realtime models with event emission capabilities. Allows for extension with additional event types through TEvent. """ def __init__(self) -> None: """Initialize the realtime model""" super().__init__() @abstractmethod async def aclose(self) -> None: """Cleanup resources""" pass
Base class for realtime models with event emission capabilities. Allows for extension with additional event types through TEvent.
Initialize the realtime model
Ancestors
- EventEmitter
- typing.Generic
- abc.ABC
Methods
async def aclose(self) ‑> None
-
Expand source code
@abstractmethod async def aclose(self) -> None: """Cleanup resources""" pass
Cleanup resources
class ResizeOptions (width: int, height: int)
-
Expand source code
@dataclass class ResizeOptions: """Configuration for resizing av.VideoFrame during the process of encoding to a standard image format.""" width: int """The target width for resizing""" height: int """The target height for resizing the image."""
Configuration for resizing av.VideoFrame during the process of encoding to a standard image format.
Instance variables
var height : int
-
The target height for resizing the image.
var width : int
-
The target width for resizing
class ResourceConfig (resource_type: ResourceType = ResourceType.PROCESS,
num_idle_resources: int = 2,
max_resources: int = 10,
initialize_timeout: float = 10.0,
close_timeout: float = 60.0,
memory_warn_mb: float = 500.0,
memory_limit_mb: float = 0.0,
ping_interval: float = 30.0,
health_check_interval: float = 5.0,
load_threshold: float = 0.75,
max_concurrent_tasks: int = 1,
executor_type: ExecutorType = ExecutorType.PROCESS,
use_dedicated_inference_process: bool = True,
inference_process_timeout: float = 30.0,
inference_memory_warn_mb: float = 1000.0)-
Expand source code
@dataclass class ResourceConfig: """Configuration for resource management.""" # Resource type and count resource_type: ResourceType = ResourceType.PROCESS num_idle_resources: int = 2 max_resources: int = 10 # Timeouts initialize_timeout: float = 10.0 close_timeout: float = 60.0 # Memory management memory_warn_mb: float = 500.0 memory_limit_mb: float = 0.0 # Health monitoring ping_interval: float = 30.0 health_check_interval: float = 5.0 # Load balancing load_threshold: float = 0.75 max_concurrent_tasks: int = 1 # Platform-specific executor_type: ExecutorType = _default_executor_type # Legacy IPC compatibility use_dedicated_inference_process: bool = ( True # New: Enable dedicated inference process ) inference_process_timeout: float = 30.0 # Longer timeout for AI model loading inference_memory_warn_mb: float = 1000.0 # Higher threshold for AI models
Configuration for resource management.
Instance variables
var close_timeout : float
var executor_type : ExecutorType
var health_check_interval : float
var inference_memory_warn_mb : float
var inference_process_timeout : float
var initialize_timeout : float
var load_threshold : float
var max_concurrent_tasks : int
var max_resources : int
var memory_limit_mb : float
var memory_warn_mb : float
var num_idle_resources : int
var ping_interval : float
var resource_type : ResourceType
var use_dedicated_inference_process : bool
class ResourceInfo (resource_id: str,
resource_type: ResourceType,
status: ResourceStatus,
current_load: float = 0.0,
memory_usage_mb: float = 0.0,
cpu_usage_percent: float = 0.0,
active_tasks: int = 0,
total_tasks_processed: int = 0,
last_heartbeat: float = 0.0,
metadata: Dict[str, Any] = <factory>)-
Expand source code
@dataclass class ResourceInfo: """Information about a resource.""" resource_id: str resource_type: ResourceType status: ResourceStatus current_load: float = 0.0 memory_usage_mb: float = 0.0 cpu_usage_percent: float = 0.0 active_tasks: int = 0 total_tasks_processed: int = 0 last_heartbeat: float = 0.0 # Resource-specific metadata metadata: Dict[str, Any] = field(default_factory=dict)
Information about a resource.
Instance variables
var active_tasks : int
var cpu_usage_percent : float
var current_load : float
var last_heartbeat : float
var memory_usage_mb : float
var metadata : Dict[str, Any]
var resource_id : str
var resource_type : ResourceType
var status : ResourceStatus
var total_tasks_processed : int
class ResourceManager (config: ResourceConfig)
-
Expand source code
class ResourceManager: """ Manages resources for task execution. This class handles: - Resource creation and lifecycle management - Load balancing across resources - Health monitoring and recovery - Resource allocation for tasks - Dedicated inference process management (legacy IPC compatibility) """ def __init__(self, config: ResourceConfig): self.config = config self.resources: List[BaseResource] = [] self._shutdown = False self._health_check_task: Optional[asyncio.Task] = None self._resource_creation_task: Optional[asyncio.Task] = None # Dedicated inference resource (legacy IPC compatibility) self.dedicated_inference_resource: Optional[DedicatedInferenceResource] = None async def start(self): """Start the resource manager.""" logger.info("Starting resource manager") # Create dedicated inference resource if enabled if self.config.use_dedicated_inference_process: await self._create_dedicated_inference_resource() # Start health monitoring self._health_check_task = asyncio.create_task(self._health_check_loop()) # Start resource creation self._resource_creation_task = asyncio.create_task( self._resource_creation_loop() ) # Initialize initial resources await self._create_initial_resources() logger.info("Resource manager started") async def stop(self): """Stop the resource manager.""" logger.info("Stopping resource manager") self._shutdown = True # Cancel background tasks if self._health_check_task: self._health_check_task.cancel() if self._resource_creation_task: self._resource_creation_task.cancel() # Shutdown all resources shutdown_tasks = [] for resource in self.resources: shutdown_tasks.append(resource.shutdown()) # Shutdown dedicated inference resource if self.dedicated_inference_resource: shutdown_tasks.append(self.dedicated_inference_resource.shutdown()) if shutdown_tasks: await asyncio.gather(*shutdown_tasks, return_exceptions=True) logger.info("Resource manager stopped") async def _create_dedicated_inference_resource(self): """Create the dedicated inference resource (legacy IPC compatibility).""" logger.info("Creating dedicated inference resource") inference_config = { "inference_process_timeout": self.config.inference_process_timeout, "inference_memory_warn_mb": self.config.inference_memory_warn_mb, "ping_interval": self.config.ping_interval, "close_timeout": self.config.close_timeout, } self.dedicated_inference_resource = DedicatedInferenceResource( resource_id="dedicated-inference", config=inference_config ) await self.dedicated_inference_resource.initialize() logger.info("Dedicated inference resource created") async def _create_initial_resources(self): """Create initial resources based on configuration.""" initial_count = self.config.num_idle_resources logger.info( f"Creating {initial_count} initial {self.config.resource_type.value} resources" ) for i in range(initial_count): await self._create_resource(self.config.resource_type) async def _create_resource(self, resource_type: ResourceType) -> BaseResource: """Create a new resource of the specified type.""" resource_id = f"{resource_type.value}-{uuid.uuid4().hex[:8]}" config = { "max_concurrent_tasks": self.config.max_concurrent_tasks, "initialize_timeout": self.config.initialize_timeout, "close_timeout": self.config.close_timeout, "health_check_interval": self.config.health_check_interval, } if resource_type == ResourceType.PROCESS: resource = ProcessResource(resource_id, config) elif resource_type == ResourceType.THREAD: resource = ThreadResource(resource_id, config) else: raise ValueError(f"Unsupported resource type: {resource_type}") # Initialize the resource await resource.initialize() # Add to resources list self.resources.append(resource) logger.info(f"Created {resource_type.value} resource: {resource_id}") return resource async def _resource_creation_loop(self): """Background loop for creating resources as needed.""" # Wait a bit longer before starting the loop to allow initial resources to stabilize await asyncio.sleep(10.0) while not self._shutdown: try: # Check if we need more resources available_count = len([r for r in self.resources if r.is_available]) total_count = len(self.resources) # Create more resources if needed if ( available_count < self.config.num_idle_resources and total_count < self.config.max_resources ): logger.info( f"Creating additional {self.config.resource_type.value} resource" ) await self._create_resource(self.config.resource_type) await asyncio.sleep(10.0) # Check every 10 seconds instead of 5 except Exception as e: logger.error(f"Error in resource creation loop: {e}") await asyncio.sleep(5.0) async def _health_check_loop(self): """Background loop for health monitoring.""" while not self._shutdown: try: # Check job resources for resource in self.resources[ : ]: # Copy list to avoid modification during iteration try: is_healthy = await resource.health_check() if not is_healthy: logger.warning( f"Unhealthy resource detected: {resource.resource_id}" ) # Remove unhealthy resource self.resources.remove(resource) await resource.shutdown() # Create replacement if needed if len(self.resources) < self.config.num_idle_resources: await self._create_resource(self.config.resource_type) except Exception as e: logger.error( f"Health check failed for {resource.resource_id}: {e}" ) # Check dedicated inference resource if self.dedicated_inference_resource: try: is_healthy = ( await self.dedicated_inference_resource.health_check() ) if not is_healthy: logger.warning( "Unhealthy dedicated inference resource detected" ) # Recreate inference resource await self.dedicated_inference_resource.shutdown() await self._create_dedicated_inference_resource() except Exception as e: logger.error( f"Health check failed for dedicated inference resource: {e}" ) await asyncio.sleep(self.config.health_check_interval) except Exception as e: logger.error(f"Error in health check loop: {e}") await asyncio.sleep(5.0) async def execute_task( self, task_config: TaskConfig, entrypoint: Callable, *args, **kwargs ) -> TaskResult: """Execute a task using an available resource.""" task_id = str(uuid.uuid4()) # Route inference tasks to dedicated inference resource if ( task_config.task_type == TaskType.INFERENCE and self.dedicated_inference_resource ): logger.info( f"Routing inference task {task_id} to dedicated inference resource" ) return await self.dedicated_inference_resource.execute_task( task_id, task_config, entrypoint, args, kwargs ) # Route other tasks to job resources resource = await self._get_available_resource(task_config.task_type) if not resource: raise RuntimeError("No available resources for task execution") # Execute the task return await resource.execute_task( task_id, task_config, entrypoint, args, kwargs ) async def _get_available_resource( self, task_type: TaskType ) -> Optional[BaseResource]: """Get an available resource for task execution.""" # For now, use simple round-robin selection # In the future, this could be enhanced with load balancing, priority, etc. available_resources = [r for r in self.resources if r.is_available] if available_resources: # Simple round-robin selection # In a real implementation, you might want more sophisticated load balancing return available_resources[0] return None def get_stats(self) -> Dict[str, Any]: """Get resource manager statistics.""" available_resources = [r for r in self.resources if r.is_available] active_resources = [ r for r in self.resources if r.status != ResourceStatus.IDLE ] total_resources = len(self.resources) average_load = ( len(active_resources) / total_resources if total_resources > 0 else 0.0 ) stats = { "total_resources": total_resources, "available_resources": len(available_resources), "active_resources": len(active_resources), "average_load": average_load, "resources": [ { "resource_id": r.get_info().resource_id, "resource_type": r.get_info().resource_type.value, "status": r.get_info().status.value, "current_load": r.get_info().current_load, "memory_usage_mb": r.get_info().memory_usage_mb, "cpu_usage_percent": r.get_info().cpu_usage_percent, "active_tasks": r.get_info().active_tasks, "total_tasks_processed": r.get_info().total_tasks_processed, "last_heartbeat": r.get_info().last_heartbeat, "metadata": r.get_info().metadata, } for r in self.resources ], "dedicated_inference": None, } # Dedicated inference resource stats if self.dedicated_inference_resource: info = self.dedicated_inference_resource.get_info() stats["dedicated_inference"] = { "resource_id": info.resource_id, "resource_type": info.resource_type.value, "status": info.status.value, "current_load": info.current_load, "memory_usage_mb": info.memory_usage_mb, "cpu_usage_percent": info.cpu_usage_percent, "active_tasks": info.active_tasks, "total_tasks_processed": info.total_tasks_processed, "last_heartbeat": info.last_heartbeat, "metadata": info.metadata, } return stats def get_resource_info(self) -> List[ResourceInfo]: """Get information about all resources.""" resource_info = [] # Job resources for resource in self.resources: resource_info.append(resource.get_info()) # Dedicated inference resource if self.dedicated_inference_resource: resource_info.append(self.dedicated_inference_resource.get_info()) return resource_info
Manages resources for task execution.
This class handles: - Resource creation and lifecycle management - Load balancing across resources - Health monitoring and recovery - Resource allocation for tasks - Dedicated inference process management (legacy IPC compatibility)
Methods
async def execute_task(self,
task_config: TaskConfig,
entrypoint: Callable,
*args,
**kwargs) ‑> TaskResult-
Expand source code
async def execute_task( self, task_config: TaskConfig, entrypoint: Callable, *args, **kwargs ) -> TaskResult: """Execute a task using an available resource.""" task_id = str(uuid.uuid4()) # Route inference tasks to dedicated inference resource if ( task_config.task_type == TaskType.INFERENCE and self.dedicated_inference_resource ): logger.info( f"Routing inference task {task_id} to dedicated inference resource" ) return await self.dedicated_inference_resource.execute_task( task_id, task_config, entrypoint, args, kwargs ) # Route other tasks to job resources resource = await self._get_available_resource(task_config.task_type) if not resource: raise RuntimeError("No available resources for task execution") # Execute the task return await resource.execute_task( task_id, task_config, entrypoint, args, kwargs )
Execute a task using an available resource.
def get_resource_info(self) ‑> List[ResourceInfo]
-
Expand source code
def get_resource_info(self) -> List[ResourceInfo]: """Get information about all resources.""" resource_info = [] # Job resources for resource in self.resources: resource_info.append(resource.get_info()) # Dedicated inference resource if self.dedicated_inference_resource: resource_info.append(self.dedicated_inference_resource.get_info()) return resource_info
Get information about all resources.
def get_stats(self) ‑> Dict[str, Any]
-
Expand source code
def get_stats(self) -> Dict[str, Any]: """Get resource manager statistics.""" available_resources = [r for r in self.resources if r.is_available] active_resources = [ r for r in self.resources if r.status != ResourceStatus.IDLE ] total_resources = len(self.resources) average_load = ( len(active_resources) / total_resources if total_resources > 0 else 0.0 ) stats = { "total_resources": total_resources, "available_resources": len(available_resources), "active_resources": len(active_resources), "average_load": average_load, "resources": [ { "resource_id": r.get_info().resource_id, "resource_type": r.get_info().resource_type.value, "status": r.get_info().status.value, "current_load": r.get_info().current_load, "memory_usage_mb": r.get_info().memory_usage_mb, "cpu_usage_percent": r.get_info().cpu_usage_percent, "active_tasks": r.get_info().active_tasks, "total_tasks_processed": r.get_info().total_tasks_processed, "last_heartbeat": r.get_info().last_heartbeat, "metadata": r.get_info().metadata, } for r in self.resources ], "dedicated_inference": None, } # Dedicated inference resource stats if self.dedicated_inference_resource: info = self.dedicated_inference_resource.get_info() stats["dedicated_inference"] = { "resource_id": info.resource_id, "resource_type": info.resource_type.value, "status": info.status.value, "current_load": info.current_load, "memory_usage_mb": info.memory_usage_mb, "cpu_usage_percent": info.cpu_usage_percent, "active_tasks": info.active_tasks, "total_tasks_processed": info.total_tasks_processed, "last_heartbeat": info.last_heartbeat, "metadata": info.metadata, } return stats
Get resource manager statistics.
async def start(self)
-
Expand source code
async def start(self): """Start the resource manager.""" logger.info("Starting resource manager") # Create dedicated inference resource if enabled if self.config.use_dedicated_inference_process: await self._create_dedicated_inference_resource() # Start health monitoring self._health_check_task = asyncio.create_task(self._health_check_loop()) # Start resource creation self._resource_creation_task = asyncio.create_task( self._resource_creation_loop() ) # Initialize initial resources await self._create_initial_resources() logger.info("Resource manager started")
Start the resource manager.
async def stop(self)
-
Expand source code
async def stop(self): """Stop the resource manager.""" logger.info("Stopping resource manager") self._shutdown = True # Cancel background tasks if self._health_check_task: self._health_check_task.cancel() if self._resource_creation_task: self._resource_creation_task.cancel() # Shutdown all resources shutdown_tasks = [] for resource in self.resources: shutdown_tasks.append(resource.shutdown()) # Shutdown dedicated inference resource if self.dedicated_inference_resource: shutdown_tasks.append(self.dedicated_inference_resource.shutdown()) if shutdown_tasks: await asyncio.gather(*shutdown_tasks, return_exceptions=True) logger.info("Resource manager stopped")
Stop the resource manager.
class ResourceStatus (*args, **kwds)
-
Expand source code
class ResourceStatus(Enum): """Status of a resource.""" IDLE = "idle" BUSY = "busy" INITIALIZING = "initializing" SHUTTING_DOWN = "shutting_down" ERROR = "error"
Status of a resource.
Ancestors
- enum.Enum
Class variables
var BUSY
var ERROR
var IDLE
var INITIALIZING
var SHUTTING_DOWN
class ResourceType (*args, **kwds)
-
Expand source code
class ResourceType(Enum): """Type of resource for task execution.""" PROCESS = "process" THREAD = "thread"
Type of resource for task execution.
Ancestors
- enum.Enum
Class variables
var PROCESS
var THREAD
class RoomOptions (room_id: str | None = None,
auth_token: str | None = None,
name: str | None = 'Agent',
playground: bool = True,
vision: bool = False,
recording: bool = False,
avatar: Any | None = None,
join_meeting: bool | None = True,
on_room_error: Callable[[Any], None] | None = None,
auto_end_session: bool = True,
session_timeout_seconds: int | None = 30,
signaling_base_url: str | None = None)-
Expand source code
@dataclass class RoomOptions: room_id: Optional[str] = None auth_token: Optional[str] = None name: Optional[str] = "Agent" playground: bool = True vision: bool = False recording: bool = False avatar: Optional[Any] = None join_meeting: Optional[bool] = True on_room_error: Optional[Callable[[Any], None]] = None # Session management options auto_end_session: bool = True session_timeout_seconds: Optional[int] = 30 # VideoSDK connection options signaling_base_url: Optional[str] = None
RoomOptions(room_id: Optional[str] = None, auth_token: Optional[str] = None, name: Optional[str] = 'Agent', playground: bool = True, vision: bool = False, recording: bool = False, avatar: Optional[Any] = None, join_meeting: Optional[bool] = True, on_room_error: Optional[Callable[[Any], NoneType]] = None, auto_end_session: bool = True, session_timeout_seconds: Optional[int] = 30, signaling_base_url: Optional[str] = None)
Instance variables
var auth_token : str | None
var auto_end_session : bool
var avatar : Any | None
var join_meeting : bool | None
var name : str | None
var on_room_error : Callable[[Any], None] | None
var playground : bool
var recording : bool
var room_id : str | None
var session_timeout_seconds : int | None
var signaling_base_url : str | None
var vision : bool
class STT
-
Expand source code
class STT(EventEmitter[Literal["error"]]): """Base class for Speech-to-Text implementations""" def __init__( self, ) -> None: super().__init__() self._label = f"{type(self).__module__}.{type(self).__name__}" self._transcript_callback: Optional[Callable[[STTResponse], Awaitable[None]]] = None @property def label(self) -> str: """Get the STT provider label""" return self._label def on_stt_transcript(self, callback: Callable[[STTResponse], Awaitable[None]]) -> None: """Set callback for receiving STT transcripts""" self._transcript_callback = callback @abstractmethod async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """ Process audio frames and convert to text Args: audio_frames: Iterator of bytes to process language: Optional language code for recognition **kwargs: Additional provider-specific arguments Returns: AsyncIterator yielding STTResponse objects """ raise NotImplementedError async def aclose(self) -> None: """Cleanup resources""" pass async def __aenter__(self) -> STT: return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: await self.aclose()
Base class for Speech-to-Text implementations
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop label : str
-
Expand source code
@property def label(self) -> str: """Get the STT provider label""" return self._label
Get the STT provider label
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" pass
Cleanup resources
def on_stt_transcript(self,
callback: Callable[[STTResponse], Awaitable[None]]) ‑> None-
Expand source code
def on_stt_transcript(self, callback: Callable[[STTResponse], Awaitable[None]]) -> None: """Set callback for receiving STT transcripts""" self._transcript_callback = callback
Set callback for receiving STT transcripts
async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None
-
Expand source code
@abstractmethod async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """ Process audio frames and convert to text Args: audio_frames: Iterator of bytes to process language: Optional language code for recognition **kwargs: Additional provider-specific arguments Returns: AsyncIterator yielding STTResponse objects """ raise NotImplementedError
Process audio frames and convert to text
Args
audio_frames
- Iterator of bytes to process
language
- Optional language code for recognition
**kwargs
- Additional provider-specific arguments
Returns
AsyncIterator yielding STTResponse objects
class STTResponse (**data: Any)
-
Expand source code
class STTResponse(BaseModel): """Response from STT processing Attributes: event_type: The type of speech event. data: The data from the speech event. metadata: Additional metadata from the speech event. """ event_type: SpeechEventType data: SpeechData metadata: Optional[dict[str, Any]] = None
Response from STT processing
Attributes
event_type
- The type of speech event.
data
- The data from the speech event.
metadata
- Additional metadata from the speech event.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var data : SpeechData
var event_type : SpeechEventType
var metadata : dict[str, typing.Any] | None
var model_config
class SpeechData (text: str,
confidence: float = 0.0,
language: Optional[str] = None,
start_time: float = 0.0,
end_time: float = 0.0)-
Expand source code
@dataclass class SpeechData: """Data structure for speech recognition results Attributes: text: The recognized text. confidence: The confidence level of the recognition. language: The language of the recognized text. start_time: The start time of the speech. end_time: The end time of the speech. """ text: str confidence: float = 0.0 language: Optional[str] = None start_time: float = 0.0 end_time: float = 0.0
Data structure for speech recognition results
Attributes
text
- The recognized text.
confidence
- The confidence level of the recognition.
language
- The language of the recognized text.
start_time
- The start time of the speech.
end_time
- The end time of the speech.
Instance variables
var confidence : float
var end_time : float
var language : str | None
var start_time : float
var text : str
class SpeechEventType (*args, **kwds)
-
Expand source code
class SpeechEventType(str, Enum): """Type of speech event""" START = "start_of_speech" INTERIM = "interim_transcript" FINAL = "final_transcript" END = "end_of_speech"
Type of speech event
Ancestors
- builtins.str
- enum.Enum
Class variables
var END
var FINAL
var INTERIM
var START
class TTS (sample_rate: int = 16000, num_channels: int = 1)
-
Expand source code
class TTS(EventEmitter[Literal["error"]]): """Base class for Text-to-Speech implementations""" def __init__( self, sample_rate: int = 16000, num_channels: int = 1 ) -> None: super().__init__() self._label = f"{type(self).__module__}.{type(self).__name__}" self._sample_rate = sample_rate self._num_channels = num_channels self._first_audio_callback: Optional[Callable[[], Awaitable[None]]] = None @property def label(self) -> str: """Get the TTS provider label""" return self._label @property def sample_rate(self) -> int: """Get audio sample rate""" return self._sample_rate @property def num_channels(self) -> int: """Get number of audio channels""" return self._num_channels def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when first audio byte is produced""" self._first_audio_callback = callback def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" # To be overridden by implementations for TTFB metrics pass @abstractmethod async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any ) -> None: """ Convert text to speech Args: text: Text to convert to speech (either string or async iterator of strings) voice_id: Optional voice identifier **kwargs: Additional provider-specific arguments Returns: None """ raise NotImplementedError @abstractmethod async def interrupt(self) -> None: """Interrupt the TTS process""" raise NotImplementedError async def aclose(self) -> None: """Cleanup resources""" pass async def __aenter__(self) -> TTS: return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: await self.aclose()
Base class for Text-to-Speech implementations
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop label : str
-
Expand source code
@property def label(self) -> str: """Get the TTS provider label""" return self._label
Get the TTS provider label
prop num_channels : int
-
Expand source code
@property def num_channels(self) -> int: """Get number of audio channels""" return self._num_channels
Get number of audio channels
prop sample_rate : int
-
Expand source code
@property def sample_rate(self) -> int: """Get audio sample rate""" return self._sample_rate
Get audio sample rate
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" pass
Cleanup resources
async def interrupt(self) ‑> None
-
Expand source code
@abstractmethod async def interrupt(self) -> None: """Interrupt the TTS process""" raise NotImplementedError
Interrupt the TTS process
def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) ‑> None
-
Expand source code
def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when first audio byte is produced""" self._first_audio_callback = callback
Set callback for when first audio byte is produced
def reset_first_audio_tracking(self) ‑> None
-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" # To be overridden by implementations for TTFB metrics pass
Reset the first audio tracking state for next TTS task
async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
@abstractmethod async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any ) -> None: """ Convert text to speech Args: text: Text to convert to speech (either string or async iterator of strings) voice_id: Optional voice identifier **kwargs: Additional provider-specific arguments Returns: None """ raise NotImplementedError
Convert text to speech
Args
text
- Text to convert to speech (either string or async iterator of strings)
voice_id
- Optional voice identifier
**kwargs
- Additional provider-specific arguments
Returns
None
class TaskConfig (task_type: TaskType,
timeout: float = 300.0,
retry_count: int = 3,
priority: int = 0,
required_memory_mb: float = 100.0,
required_cpu_cores: int = 1,
data: Dict[str, Any] = <factory>)-
Expand source code
@dataclass class TaskConfig: """Configuration for task execution.""" task_type: TaskType timeout: float = 300.0 # 5 minutes default retry_count: int = 3 priority: int = 0 # Higher number = higher priority # Resource requirements required_memory_mb: float = 100.0 required_cpu_cores: int = 1 # Task-specific data data: Dict[str, Any] = field(default_factory=dict)
Configuration for task execution.
Instance variables
var data : Dict[str, Any]
var priority : int
var required_cpu_cores : int
var required_memory_mb : float
var retry_count : int
var task_type : TaskType
var timeout : float
class TaskExecutor (config: ResourceConfig)
-
Expand source code
class TaskExecutor: """ High-level task executor that manages task execution using resources. This class provides a simple interface for executing tasks while handling resource management, retries, and monitoring internally. """ def __init__(self, config: ResourceConfig): self.config = config self.resource_manager = ResourceManager(config) self._shutdown = False self._total_tasks = 0 self._completed_tasks = 0 self._failed_tasks = 0 self._total_execution_time = 0.0 async def start(self): """Start the task executor.""" logger.info("Starting task executor") await self.resource_manager.start() logger.info("Task executor started") async def stop(self): """Stop the task executor.""" logger.info("Stopping task executor") self._shutdown = True # Stop resource manager await self.resource_manager.stop() logger.info("Task executor stopped") async def execute( self, entrypoint: Callable, task_type: TaskType = TaskType.JOB, timeout: float = 300.0, retry_count: int = 3, priority: int = 0, *args, **kwargs, ) -> TaskResult: """ Execute a task using the resource manager. Args: entrypoint: Function to execute task_type: Type of task (inference, meeting, job) timeout: Task timeout in seconds retry_count: Number of retries on failure priority: Task priority (higher = higher priority) *args, **kwargs: Arguments to pass to entrypoint Returns: TaskResult with execution results """ task_config = TaskConfig( task_type=task_type, timeout=timeout, retry_count=retry_count, priority=priority, ) # Execute with retries last_error = None for attempt in range(retry_count + 1): try: result = await self.resource_manager.execute_task( task_config, entrypoint, *args, **kwargs ) # Update stats self._update_stats(result) if result.status == TaskStatus.COMPLETED: return result else: last_error = result.error except Exception as e: last_error = str(e) logger.warning(f"Task execution attempt {attempt + 1} failed: {e}") if attempt < retry_count: await asyncio.sleep(1.0 * (attempt + 1)) # Exponential backoff # All retries failed failed_result = TaskResult( task_id=task_config.task_type.value, status=TaskStatus.FAILED, error=f"All {retry_count + 1} attempts failed. Last error: {last_error}", execution_time=0.0, ) self._update_stats(failed_result) return failed_result def _update_stats(self, result: TaskResult): """Update execution statistics.""" self._total_tasks += 1 if result.status == TaskStatus.COMPLETED: self._completed_tasks += 1 self._total_execution_time += result.execution_time elif result.status == TaskStatus.FAILED: self._failed_tasks += 1 def get_stats(self) -> Dict[str, Any]: """Get executor statistics.""" resource_stats = self.resource_manager.get_stats() average_execution_time = ( self._total_execution_time / self._completed_tasks if self._completed_tasks > 0 else 0.0 ) return { "executor_stats": { "total_tasks": self._total_tasks, "completed_tasks": self._completed_tasks, "failed_tasks": self._failed_tasks, "pending_tasks": 0, "average_execution_time": average_execution_time, "total_execution_time": self._total_execution_time, }, "resource_stats": resource_stats, } def get_resource_info(self) -> List[ResourceInfo]: """Get information about all resources.""" return self.resource_manager.get_resource_info()
High-level task executor that manages task execution using resources.
This class provides a simple interface for executing tasks while handling resource management, retries, and monitoring internally.
Methods
async def execute(self,
entrypoint: Callable,
task_type: TaskType = TaskType.JOB,
timeout: float = 300.0,
retry_count: int = 3,
priority: int = 0,
*args,
**kwargs) ‑> TaskResult-
Expand source code
async def execute( self, entrypoint: Callable, task_type: TaskType = TaskType.JOB, timeout: float = 300.0, retry_count: int = 3, priority: int = 0, *args, **kwargs, ) -> TaskResult: """ Execute a task using the resource manager. Args: entrypoint: Function to execute task_type: Type of task (inference, meeting, job) timeout: Task timeout in seconds retry_count: Number of retries on failure priority: Task priority (higher = higher priority) *args, **kwargs: Arguments to pass to entrypoint Returns: TaskResult with execution results """ task_config = TaskConfig( task_type=task_type, timeout=timeout, retry_count=retry_count, priority=priority, ) # Execute with retries last_error = None for attempt in range(retry_count + 1): try: result = await self.resource_manager.execute_task( task_config, entrypoint, *args, **kwargs ) # Update stats self._update_stats(result) if result.status == TaskStatus.COMPLETED: return result else: last_error = result.error except Exception as e: last_error = str(e) logger.warning(f"Task execution attempt {attempt + 1} failed: {e}") if attempt < retry_count: await asyncio.sleep(1.0 * (attempt + 1)) # Exponential backoff # All retries failed failed_result = TaskResult( task_id=task_config.task_type.value, status=TaskStatus.FAILED, error=f"All {retry_count + 1} attempts failed. Last error: {last_error}", execution_time=0.0, ) self._update_stats(failed_result) return failed_result
Execute a task using the resource manager.
Args
entrypoint
- Function to execute
task_type
- Type of task (inference, meeting, job)
timeout
- Task timeout in seconds
retry_count
- Number of retries on failure
priority
- Task priority (higher = higher priority)
args, *kwargs: Arguments to pass to entrypoint
Returns
TaskResult with execution results
def get_resource_info(self) ‑> List[ResourceInfo]
-
Expand source code
def get_resource_info(self) -> List[ResourceInfo]: """Get information about all resources.""" return self.resource_manager.get_resource_info()
Get information about all resources.
def get_stats(self) ‑> Dict[str, Any]
-
Expand source code
def get_stats(self) -> Dict[str, Any]: """Get executor statistics.""" resource_stats = self.resource_manager.get_stats() average_execution_time = ( self._total_execution_time / self._completed_tasks if self._completed_tasks > 0 else 0.0 ) return { "executor_stats": { "total_tasks": self._total_tasks, "completed_tasks": self._completed_tasks, "failed_tasks": self._failed_tasks, "pending_tasks": 0, "average_execution_time": average_execution_time, "total_execution_time": self._total_execution_time, }, "resource_stats": resource_stats, }
Get executor statistics.
async def start(self)
-
Expand source code
async def start(self): """Start the task executor.""" logger.info("Starting task executor") await self.resource_manager.start() logger.info("Task executor started")
Start the task executor.
async def stop(self)
-
Expand source code
async def stop(self): """Stop the task executor.""" logger.info("Stopping task executor") self._shutdown = True # Stop resource manager await self.resource_manager.stop() logger.info("Task executor stopped")
Stop the task executor.
class TaskResult (task_id: str,
status: TaskStatus,
result: Any | None = None,
error: str | None = None,
execution_time: float = 0.0,
memory_used_mb: float = 0.0)-
Expand source code
@dataclass class TaskResult: """Result of a task execution.""" task_id: str status: TaskStatus result: Optional[Any] = None error: Optional[str] = None execution_time: float = 0.0 memory_used_mb: float = 0.0
Result of a task execution.
Instance variables
var error : str | None
var execution_time : float
var memory_used_mb : float
var result : Any | None
var status : TaskStatus
var task_id : str
class TaskStatus (*args, **kwds)
-
Expand source code
class TaskStatus(Enum): """Status of a task.""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled"
Status of a task.
Ancestors
- enum.Enum
Class variables
var CANCELLED
var COMPLETED
var FAILED
var PENDING
var RUNNING
class TaskType (*args, **kwds)
-
Expand source code
class TaskType(Enum): """Type of task to be executed.""" INFERENCE = "inference" # For AI model inference MEETING = "meeting" # For video meeting tasks JOB = "job" # For general job execution
Type of task to be executed.
Ancestors
- enum.Enum
Class variables
var INFERENCE
var JOB
var MEETING
class TeeCustomAudioStreamTrack (loop, sinks=None, pipeline=None)
-
Expand source code
class TeeCustomAudioStreamTrack(CustomAudioStreamTrack): def __init__(self, loop, sinks=None, pipeline=None): super().__init__(loop) self.sinks = sinks if sinks is not None else [] self.pipeline = pipeline async def add_new_bytes(self, audio_data: bytes): await super().add_new_bytes(audio_data) # Route audio to sinks (avatars, etc.) for sink in self.sinks: if hasattr(sink, "handle_audio_input"): await sink.handle_audio_input(audio_data) # DO NOT route agent's own TTS audio back to pipeline # The pipeline should only receive audio from other participants # This prevents the agent from hearing itself speak
A dummy audio track which reads silence.
Ancestors
- CustomAudioStreamTrack
- videosdk.custom_audio_track.CustomAudioTrack
- vsaiortc.mediastreams.MediaStreamTrack
- pyee.asyncio.AsyncIOEventEmitter
- pyee.base.EventEmitter
Methods
async def add_new_bytes(self, audio_data: bytes)
-
Expand source code
async def add_new_bytes(self, audio_data: bytes): await super().add_new_bytes(audio_data) # Route audio to sinks (avatars, etc.) for sink in self.sinks: if hasattr(sink, "handle_audio_input"): await sink.handle_audio_input(audio_data) # DO NOT route agent's own TTS audio back to pipeline # The pipeline should only receive audio from other participants # This prevents the agent from hearing itself speak
Inherited members
class ThreadResource (resource_id: str, config: Dict[str, Any])
-
Expand source code
class ThreadResource(BaseResource): """ Thread-based resource for task execution. Uses threading for concurrent task execution within the same process. """ def __init__(self, resource_id: str, config: Dict[str, Any]): super().__init__(resource_id, config) self.thread: Optional[threading.Thread] = None self.task_queue: asyncio.Queue = asyncio.Queue() self.result_queue: asyncio.Queue = asyncio.Queue() self.control_queue: asyncio.Queue = asyncio.Queue() self._thread_ready = False self._loop: Optional[asyncio.AbstractEventLoop] = None @property def resource_type(self) -> ResourceType: return ResourceType.THREAD async def _initialize_impl(self) -> None: """Initialize the thread resource.""" # Start the thread self.thread = threading.Thread( target=self._thread_worker, args=( self.resource_id, self.task_queue, self.result_queue, self.control_queue, self.config, ), daemon=True, ) self.thread.start() # Simple readiness check - just wait a bit for thread to start await asyncio.sleep(0.5) # Mark as ready if thread is alive if self.thread.is_alive(): self._thread_ready = True else: raise RuntimeError(f"Thread {self.resource_id} failed to start") async def _execute_task_impl( self, task_id: str, config, entrypoint: Callable, args: tuple, kwargs: dict ) -> Any: """Execute task in the thread.""" if not self._thread_ready: raise RuntimeError(f"Thread {self.resource_id} is not ready") # Send task to thread task_data = { "task_id": task_id, "config": config, "entrypoint": entrypoint, "args": args, "kwargs": kwargs, } await self.task_queue.put(task_data) # Wait for result timeout = config.timeout start_time = time.time() while (time.time() - start_time) < timeout: try: if not self.result_queue.empty(): result_data = await self.result_queue.get() if result_data.get("task_id") == task_id: if result_data.get("status") == "success": return result_data.get("result") else: raise RuntimeError( result_data.get("error", "Unknown error") ) await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking task result: {e}") raise TimeoutError(f"Task {task_id} timed out after {timeout}s") async def _shutdown_impl(self) -> None: """Shutdown the thread resource.""" if self.thread and self.thread.is_alive(): # Send shutdown signal await self.control_queue.put({"type": "shutdown"}) # Wait for graceful shutdown timeout = self.config.get("close_timeout", 60.0) start_time = time.time() while self.thread.is_alive() and (time.time() - start_time) < timeout: await asyncio.sleep(0.1) @staticmethod def _thread_worker( resource_id: str, task_queue: asyncio.Queue, result_queue: asyncio.Queue, control_queue: asyncio.Queue, config: Dict[str, Any], ): """Worker function that runs in the thread.""" try: # Set up event loop for this thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) logger.info(f"Thread worker {resource_id} started") async def worker_main(): # Main task processing loop while True: try: # Check for shutdown signal if not control_queue.empty(): message = await control_queue.get() if message.get("type") == "shutdown": break # Check for tasks if not task_queue.empty(): task_data = await task_queue.get() task_id = task_data["task_id"] entrypoint = task_data["entrypoint"] args = task_data.get("args", ()) kwargs = task_data.get("kwargs", {}) try: # Execute the task if asyncio.iscoroutinefunction(entrypoint): result = await entrypoint(*args, **kwargs) else: result = entrypoint(*args, **kwargs) await result_queue.put( { "task_id": task_id, "status": "success", "result": result, } ) except Exception as e: await result_queue.put( { "task_id": task_id, "status": "error", "error": str(e), } ) else: await asyncio.sleep(0.1) except Exception as e: logger.error(f"Error in thread worker {resource_id}: {e}") await asyncio.sleep(1.0) logger.info(f"Thread worker {resource_id} shutting down") # Run the worker loop.run_until_complete(worker_main()) except Exception as e: logger.error(f"Fatal error in thread worker {resource_id}: {e}") finally: if loop and not loop.is_closed(): loop.close()
Thread-based resource for task execution.
Uses threading for concurrent task execution within the same process.
Ancestors
- BaseResource
- abc.ABC
Inherited members
class VAD (sample_rate: int = 16000,
threshold: float = 0.5,
min_speech_duration: float = 0.5,
min_silence_duration: float = 0.5)-
Expand source code
class VAD(EventEmitter[Literal["error", "info"]]): """Base class for Voice Activity Detection implementations""" def __init__( self, sample_rate: int = 16000, threshold: float = 0.5, min_speech_duration: float = 0.5, min_silence_duration: float = 0.5 ) -> None: super().__init__() self._label = f"{type(self).__module__}.{type(self).__name__}" self._sample_rate = sample_rate self._threshold = threshold self._min_speech_duration = min_speech_duration self._min_silence_duration = min_silence_duration self._vad_callback: Optional[Callable[[VADResponse], Awaitable[None]]] = None @property def label(self) -> str: """Get the VAD provider label""" return self._label @property def sample_rate(self) -> int: """Get audio sample rate""" return self._sample_rate @abstractmethod async def process_audio( self, audio_frames: bytes, **kwargs: Any ) -> None: """ Process audio frames and detect voice activity Args: audio_frames: Iterator of audio frames to process **kwargs: Additional provider-specific arguments Returns: AsyncIterator yielding VADResponse objects """ raise NotImplementedError async def aclose(self) -> None: """Cleanup resources""" pass async def __aenter__(self) -> VAD: return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: await self.aclose() def on_vad_event(self, callback: Callable[[VADResponse], Awaitable[None]]) -> None: """Set callback for receiving VAD events""" self._vad_callback = callback
Base class for Voice Activity Detection implementations
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop label : str
-
Expand source code
@property def label(self) -> str: """Get the VAD provider label""" return self._label
Get the VAD provider label
prop sample_rate : int
-
Expand source code
@property def sample_rate(self) -> int: """Get audio sample rate""" return self._sample_rate
Get audio sample rate
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" pass
Cleanup resources
def on_vad_event(self,
callback: Callable[[VADResponse], Awaitable[None]]) ‑> None-
Expand source code
def on_vad_event(self, callback: Callable[[VADResponse], Awaitable[None]]) -> None: """Set callback for receiving VAD events""" self._vad_callback = callback
Set callback for receiving VAD events
async def process_audio(self, audio_frames: bytes, **kwargs: Any) ‑> None
-
Expand source code
@abstractmethod async def process_audio( self, audio_frames: bytes, **kwargs: Any ) -> None: """ Process audio frames and detect voice activity Args: audio_frames: Iterator of audio frames to process **kwargs: Additional provider-specific arguments Returns: AsyncIterator yielding VADResponse objects """ raise NotImplementedError
Process audio frames and detect voice activity
Args
audio_frames
- Iterator of audio frames to process
**kwargs
- Additional provider-specific arguments
Returns
AsyncIterator yielding VADResponse objects
class VADEventType (*args, **kwds)
-
Expand source code
class VADEventType(str, Enum): START_OF_SPEECH = "start_of_speech" END_OF_SPEECH = "end_of_speech"
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
Ancestors
- builtins.str
- enum.Enum
Class variables
var END_OF_SPEECH
var START_OF_SPEECH
class VADResponse (**data: Any)
-
Expand source code
class VADResponse(BaseModel): """Response from VAD processing""" event_type: VADEventType data: VADData metadata: Optional[dict[str, Any]] = None
Response from VAD processing
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var data : VADData
var event_type : VADEventType
var metadata : dict[str, typing.Any] | None
var model_config
class Worker (options: WorkerOptions)
-
Expand source code
class Worker: """ VideoSDK worker that manages job execution and backend registration. def run(self): job_context = functools.partial(self.job.jobctx) entrypoint = functools.partial(self.job.entrypoint) p = multiprocessing.Process( target=_job_runner, args=(entrypoint, job_context) Automatically selects the appropriate executor type based on platform. """ def __init__(self, options: WorkerOptions): """Initialize the worker.""" self.options = options self._shutdown = False self._draining = False self._worker_load = 0.0 self._current_jobs: Dict[str, RunningJobInfo] = {} self._tasks: Set[asyncio.Task] = set() self.backend_connection: Optional[BackendConnection] = None self.process_manager: Optional[TaskExecutor] = ( None # Changed from ProcessManager ) self._http_server: Optional[HttpServer] = None # Add debounce mechanism for status updates self._last_status_update = 0.0 self._status_update_debounce_seconds = ( 2.0 # Minimum 2 seconds between status updates ) # Initialize tracing self._tracing = Tracing.with_handle("worker") self._worker_load_graph = Tracing.add_graph( title="worker_load", x_label="time", y_label="load", x_type="time", y_range=(0, 1), max_data_points=1000, ) # Validate configuration if not self.options.auth_token: raise ValueError( "auth_token is required, or add VIDEOSDK_AUTH_TOKEN in your environment" ) @staticmethod def run_worker( options: WorkerOptions, simulate_job: Optional[DirectRoomOptions | str] = None ): """ Run a VideoSDK worker with the given options. This is the main entry point for running a VideoSDK worker, providing a high-level interface for worker initialization, job management, and lifecycle control. Args: options: Worker configuration options simulate_job: Optional job simulation for direct room joining Example: ```python from videosdk.agents import Worker, WorkerOptions def my_agent(job_ctx): # Your agent code here pass # Configure worker with custom log level - logging is automatically configured! options = WorkerOptions( entrypoint_fnc=my_agent, log_level="DEBUG" # Options: DEBUG, INFO, WARNING, ERROR ) # Run the worker - no manual logging setup needed! Worker.run_worker(options) ``` """ worker = Worker(options) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async def main_task(): try: await worker.initialize() if simulate_job: # Direct room joining mode - force register=False for simulation logger.info("Simulation mode: forcing register=False") worker.options.register = False await worker.simulate_job(simulate_job) elif options.register: # Backend registration mode await worker._run_backend_mode() else: # Default mode - just keep alive while not worker._shutdown: await asyncio.sleep(1) except asyncio.CancelledError: logger.info("Main task cancelled") except Exception as e: logger.error(f"Worker error: {e}") raise finally: await worker.shutdown() main_future = loop.create_task(main_task()) shutting_down = False def signal_handler(signum, frame): nonlocal shutting_down if shutting_down: # If already shutting down, cancel all tasks more aggressively for task in asyncio.all_tasks(loop): task.cancel() return shutting_down = True logger.info(f"Received signal {signum}. Initiating graceful shutdown...") # Cancel the main task loop.call_soon_threadsafe(main_future.cancel) # Set a timeout for graceful shutdown loop.call_later(3.0, lambda: [task.cancel() for task in asyncio.all_tasks(loop)]) try: signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) loop.run_until_complete(main_future) except KeyboardInterrupt: logger.info("Keyboard interrupt received") if not shutting_down: shutting_down = True if not main_future.done(): main_future.cancel() loop.run_until_complete(worker.shutdown()) finally: try: loop.close() except Exception as e: logger.error(f"Error closing event loop: {e}") if loop.is_closed(): logger.info("Event loop closed successfully") async def initialize(self): """Initialize the worker.""" logger.info("Initializing VideoSDK worker") # Initialize task executor with new execution architecture # Convert ExecutorType to ResourceType resource_type = ( ResourceType.THREAD if self.options.executor_type == ExecutorType.THREAD else ResourceType.PROCESS ) config = ResourceConfig( resource_type=resource_type, num_idle_resources=self.options.num_idle_processes, max_resources=self.options.max_processes, initialize_timeout=self.options.initialize_timeout, close_timeout=self.options.close_timeout, memory_warn_mb=self.options.memory_warn_mb, memory_limit_mb=self.options.memory_limit_mb, ping_interval=self.options.ping_interval, load_threshold=self.options.load_threshold, max_concurrent_tasks=1, # Each resource handles one task at a time executor_type=self.options.executor_type, # Legacy IPC compatibility - dedicated inference process use_dedicated_inference_process=False, # Disable dedicated inference process for now inference_process_timeout=30.0, # Longer timeout for AI model loading inference_memory_warn_mb=1000.0, # Higher threshold for AI models ) self.process_manager = TaskExecutor(config) await self.process_manager.start() # Initialize backend connection if registering if self.options.register: await self._initialize_backend_connection() # Initialize and start debug HTTP server self._http_server = HttpServer( host=self.options.host, port=self.options.port, ) self._http_server.set_worker(self) await self._http_server.start() logger.info("VideoSDK worker initialized successfully") async def _initialize_backend_connection(self): """Initialize connection to the backend registry.""" if not self.options.register: return # Fetch agent init config to get registry URL try: logger.info("Fetching agent init config...") registry_url = await fetch_agent_init_config( auth_token=self.options.auth_token, api_base_url=f"https://{self.options.signaling_base_url}", ) logger.info(f"Using registry URL: {registry_url}") except Exception as e: logger.error(f"Failed to fetch agent init config: {e}") raise RuntimeError(f"Agent init config is mandatory. Error: {e}") self.backend_connection = BackendConnection( auth_token=self.options.auth_token, agent_id=self.options.agent_id, worker_type=self.options.worker_type.value, version="1.0.0", max_retry=self.options.max_retry, backend_url=registry_url, load_threshold=self.options.load_threshold, max_processes=self.options.max_processes, ) # Set up message handlers self.backend_connection.on_register(self._handle_register) self.backend_connection.on_availability(self._handle_availability) self.backend_connection.on_assignment(self._handle_assignment) self.backend_connection.on_termination(self._handle_termination) # Connect to backend await self.backend_connection.connect() async def _run_backend_mode(self): """Run the worker in backend registration mode.""" logger.info("Running in backend registration mode") # Start status update loop status_task = asyncio.create_task(self._status_update_loop()) self._tasks.add(status_task) try: # Keep the worker running while not self._shutdown: await asyncio.sleep(1) finally: status_task.cancel() self._tasks.discard(status_task) def _handle_register(self, worker_id: str, server_info: Dict[str, Any]): """Handle registration response from backend.""" logger.info(f"Registered with backend: {worker_id}") logger.info(f"Server info: {server_info}") def _handle_availability(self, request: AvailabilityRequest): """Handle availability request from backend.""" logger.info(f"Received availability request for job {request.job_id}") asyncio.create_task(self._answer_availability(request)) async def _answer_availability(self, request: AvailabilityRequest): """Answer availability request.""" try: # Check if we can accept the job can_accept = ( not self._draining and self._worker_load < self.options.load_threshold and len(self._current_jobs) < self.options.max_processes ) if can_accept: # Accept the job and provide our auth token response = AvailabilityResponse( job_id=request.job_id, available=True, token=self.options.auth_token, # Provide worker's auth token ) logger.info(f"Accepting job {request.job_id}") else: # Reject the job response = AvailabilityResponse( job_id=request.job_id, available=False, error="Worker at capacity or draining", ) logger.info(f"Rejecting job {request.job_id}") # Send response await self.backend_connection.send_message(response) except Exception as e: logger.error(f"Error handling availability request: {e}") # Send rejection on error response = AvailabilityResponse( job_id=request.job_id, available=False, error=str(e), ) await self.backend_connection.send_message(response) def _handle_assignment(self, assignment: JobAssignment): """Handle job assignment from backend.""" logger.info(f"Received job assignment: {assignment.job_id}") asyncio.create_task(self._handle_job_assignment(assignment)) async def _handle_job_assignment(self, assignment: JobAssignment): """Handle job assignment.""" try: # Create job accept arguments args = JobAcceptArguments( identity=f"agent_{assignment.job_id}", name=self.options.agent_id, metadata="", ) # Launch the job await self._launch_job_from_assignment(assignment, args) except Exception as e: logger.error(f"Error handling job assignment: {e}") # Send job update with error job_update = JobUpdate( job_id=assignment.job_id, status="failed", error=str(e), ) await self.backend_connection.send_message(job_update) async def _handle_termination(self, termination: JobTermination): """Handle job termination request.""" logger.info(f"Received job termination: {termination.job_id}") if termination.job_id in self._current_jobs: job_info = self._current_jobs[termination.job_id] try: await job_info.job.shutdown() logger.info(f"Successfully terminated job {termination.job_id}") except Exception as e: logger.error(f"Error terminating job {termination.job_id}: {e}") # Remove job from current jobs del self._current_jobs[termination.job_id] logger.info( f"Removed job {termination.job_id} from current jobs. Remaining jobs: {len(self._current_jobs)}" ) # Notify registry about job completion if self.backend_connection and self.backend_connection.is_connected: try: job_update = JobUpdate( job_id=termination.job_id, status="completed", error="Job terminated by registry", ) await self.backend_connection.send_message(job_update) logger.info( f"Sent job completion update for terminated job {termination.job_id}" ) except Exception as e: logger.error( f"Failed to send job completion update for terminated job {termination.job_id}: {e}" ) # IMMEDIATELY send status update to reflect reduced job count # This bypasses the debounce mechanism to ensure registry gets correct info await self._send_immediate_status_update() else: logger.warning( f"Job {termination.job_id} not found in current jobs for termination" ) async def _handle_meeting_end(self, job_id: str, reason: str = "meeting_ended"): """Handle meeting end/leave events and inform registry.""" logger.info(f"Meeting ended for job {job_id}, reason: {reason}") logger.info( f"Checking if job {job_id} is in current_jobs: {job_id in self._current_jobs}" ) logger.info(f"Current jobs: {list(self._current_jobs.keys())}") if job_id in self._current_jobs: # Remove job from worker's current jobs job_info = self._current_jobs.pop(job_id, None) if job_info: logger.info( f"Removed job {job_id} from worker's current jobs. Remaining jobs: {len(self._current_jobs)}" ) # Inform registry about job completion if self.backend_connection and self.backend_connection.is_connected: try: job_update = JobUpdate( job_id=job_id, status="completed", error=f"Meeting ended: {reason}", ) await self.backend_connection.send_message(job_update) logger.info( f"Sent job completion update to registry for job {job_id}" ) except Exception as e: logger.error( f"Failed to send job completion update to registry: {e}" ) # IMMEDIATELY send status update to reflect reduced job count # This bypasses the debounce mechanism to ensure registry gets correct info await self._send_immediate_status_update() else: logger.warning(f"Job {job_id} not found in current jobs when meeting ended") async def _send_immediate_status_update(self): """Send an immediate status update, bypassing debounce mechanism.""" if not self.backend_connection or not self.backend_connection.is_connected: return try: # Calculate current load job_count = len(self._current_jobs) load = min(job_count / self.options.max_processes, 1.0) self._worker_load = load logger.info( f"Sending immediate status update - job_count: {job_count}, load: {load}, max_processes: {self.options.max_processes}" ) # Log the actual job IDs for debugging if job_count > 0: job_ids = list(self._current_jobs.keys()) logger.info(f"Active job IDs: {job_ids}") else: logger.info("No active jobs") # Send status update status_msg = WorkerMessage( type="status_update", worker_id=self.backend_connection.worker_id, agent_name=self.options.agent_id, status="available" if not self._draining else "draining", load=load, job_count=job_count, ) await self.backend_connection.send_message(status_msg) logger.info("Immediate status update sent successfully") except Exception as e: logger.error(f"Error sending immediate status update: {e}") def setup_meeting_event_handlers(self, job_context, job_id: str): """Set up meeting event handlers for a specific job.""" if not job_context.room: logger.warning( f"Cannot set up meeting handlers for job {job_id}: room not available" ) # Set up a delayed handler setup that will be called when room becomes available original_connect = job_context.connect def delayed_handler_setup(): if job_context.room: self._setup_meeting_event_handlers_impl(job_context, job_id) else: logger.warning( f"Room still not available for job {job_id} after connect" ) # Override connect method to set up handlers after room is created async def connect_with_handlers(): result = await original_connect() delayed_handler_setup() return result job_context.connect = connect_with_handlers logger.info(f"Set up delayed meeting event handlers for job {job_id}") return # Room is available, set up handlers immediately self._setup_meeting_event_handlers_impl(job_context, job_id) def _setup_meeting_event_handlers_impl(self, job_context, job_id: str): """Internal method to set up the actual meeting event handlers.""" if not job_context.room: logger.warning(f"Room not available for job {job_id} in handler setup") return # Store original event handlers original_on_meeting_left = job_context.room.on_meeting_left # Override the on_meeting_left handler def on_meeting_left_wrapper(data=None): # Call original handler with proper parameter handling if original_on_meeting_left: try: # Check if original handler expects data parameter import inspect sig = inspect.signature(original_on_meeting_left) if len(sig.parameters) > 0: original_on_meeting_left(data) else: original_on_meeting_left() except Exception as e: logger.warning( f"Error calling original on_meeting_left handler: {e}" ) # Handle meeting end for this specific job asyncio.create_task(self._handle_meeting_end(job_id, "meeting_left")) # Set the new handler job_context.room.on_meeting_left = on_meeting_left_wrapper logger.info(f"Set up meeting end handler for job {job_id}") async def _launch_job_from_assignment( self, assignment: JobAssignment, args: JobAcceptArguments ): """Launch a job from backend assignment.""" try: # Use assignment token if available, otherwise fall back to worker's auth token auth_token = ( assignment.token if assignment.token else self.options.auth_token ) # Create room options from assignment (this was already done in _handle_job_assignment) room_options = RoomOptions( room_id=assignment.room_id, name=assignment.room_name, # Use 'name' instead of 'room_name' auth_token=auth_token, signaling_base_url=self.options.signaling_base_url, ) # Apply RoomOptions from assignment if provided if assignment.room_options: logger.info( f"Received room_options from assignment: {assignment.room_options}" ) if "auto_end_session" in assignment.room_options: room_options.auto_end_session = assignment.room_options[ "auto_end_session" ] logger.info( f"Set auto_end_session: {room_options.auto_end_session}" ) if "session_timeout_seconds" in assignment.room_options: room_options.session_timeout_seconds = assignment.room_options[ "session_timeout_seconds" ] logger.info( f"Set session_timeout_seconds: {room_options.session_timeout_seconds}" ) if "playground" in assignment.room_options: room_options.playground = assignment.room_options["playground"] logger.info(f"Set playground: {room_options.playground}") if "vision" in assignment.room_options: room_options.vision = assignment.room_options["vision"] logger.info(f"Set vision: {room_options.vision}") if "join_meeting" in assignment.room_options: room_options.join_meeting = assignment.room_options["join_meeting"] logger.info(f"Set join_meeting: {room_options.join_meeting}") else: logger.warning("No room_options received from assignment") # Create job context job_context = JobContext( room_options=room_options, ) # Create running job info with correct parameters job_info = RunningJobInfo( accept_arguments=args, job=job_context, url=assignment.url, token=auth_token, worker_id=self.backend_connection.worker_id, ) # Store job info BEFORE executing entrypoint self._current_jobs[assignment.job_id] = job_info logger.info( f"Added job {assignment.job_id} to worker's current jobs. Total jobs: {len(self._current_jobs)}" ) # Send job update to registry job_update = JobUpdate( job_id=assignment.job_id, status="running", ) await self.backend_connection.send_message(job_update) # Set up session end callback BEFORE executing entrypoint # This ensures the callback is set up even if entrypoint fails self.setup_session_end_callback(job_context, assignment.job_id) logger.info(f"Session end callback set up for job {assignment.job_id}") # Set up meeting event handlers to ensure proper event handling self.setup_meeting_event_handlers(job_context, assignment.job_id) logger.info(f"Meeting event handlers set up for job {assignment.job_id}") # Execute the job using the worker's entrypoint function logger.info(f"Executing job {assignment.job_id} with entrypoint function") try: # Set the current job context so pipeline auto-registration works from .job import _set_current_job_context, _reset_current_job_context token = _set_current_job_context(job_context) try: # Execute the entrypoint function await self.options.entrypoint_fnc(job_context) logger.info( f"Entrypoint function completed for job {assignment.job_id}" ) finally: pass except Exception as entrypoint_error: logger.error( f"Entrypoint function failed for job {assignment.job_id}: {entrypoint_error}" ) # Don't remove the job from _current_jobs here - let the session end callback handle it # The job should remain active until the session actually ends # Send error update but keep job active error_update = JobUpdate( job_id=assignment.job_id, status="error", error=f"Entrypoint failed: {entrypoint_error}", ) await self.backend_connection.send_message(error_update) # The job should remain in _current_jobs until the session ends # This ensures the registry sees the correct load and job count logger.info( f"Job {assignment.job_id} remains active in worker's current jobs: {len(self._current_jobs)} total jobs" ) except Exception as e: logger.error(f"Error launching job {assignment.job_id}: {e}") # Send error update job_update = JobUpdate( job_id=assignment.job_id, status="failed", error=str(e), ) await self.backend_connection.send_message(job_update) # Remove job from current jobs since it failed to launch self._current_jobs.pop(assignment.job_id, None) logger.info(f"Removed failed job {assignment.job_id} from current jobs") # Send immediate status update to reflect reduced job count await self._send_immediate_status_update() def setup_session_end_callback(self, job_context, job_id: str): """Set up session end callback for automatic session ending.""" if not job_context.room: logger.warning( f"Cannot set up session end callback for job {job_id}: room not available" ) # Set up a delayed callback setup that will be called when room becomes available original_connect = job_context.connect def delayed_callback_setup(): if job_context.room: self._setup_session_end_callback_impl(job_context, job_id) else: logger.warning( f"Room still not available for job {job_id} after connect" ) # Override connect method to set up callback after room is created async def connect_with_callback(): result = await original_connect() delayed_callback_setup() return result job_context.connect = connect_with_callback logger.info(f"Set up delayed session end callback for job {job_id}") return # Room is available, set up callback immediately self._setup_session_end_callback_impl(job_context, job_id) def _setup_session_end_callback_impl(self, job_context, job_id: str): """Internal method to set up the actual session end callback.""" if not job_context.room: logger.warning(f"Room not available for job {job_id} in callback setup") return def on_session_end(reason: str): logger.info(f"Session ended for job {job_id}, reason: {reason}") logger.info(f"Calling _handle_meeting_end for job {job_id}") # Handle session end asynchronously asyncio.create_task( self._handle_meeting_end(job_id, f"session_ended: {reason}") ) # Set the session end callback job_context.room.on_session_end = on_session_end logger.info(f"Session end callback set up for job {job_id}") async def _status_update_loop(self): """Periodic status update loop.""" while not self._shutdown: try: await self._update_worker_status() await asyncio.sleep(self.options.ping_interval) except Exception as e: logger.error(f"Error in status update loop: {e}") await asyncio.sleep(5) # Wait before retrying async def _update_worker_status(self): """Update worker status with backend.""" if not self.backend_connection or not self.backend_connection.is_connected: return # Check debounce - don't send status updates too frequently current_time = time.time() if ( current_time - self._last_status_update < self._status_update_debounce_seconds ): logger.debug("Skipping status update due to debounce") return try: # Calculate current load job_count = len(self._current_jobs) load = min(job_count / self.options.max_processes, 1.0) self._worker_load = load # Add detailed logging to track job count changes logger.info( f"Updating worker status - job_count: {job_count}, load: {load}, max_processes: {self.options.max_processes}" ) # Log the actual job IDs for debugging if job_count > 0: job_ids = list(self._current_jobs.keys()) logger.info(f"Active job IDs: {job_ids}") else: logger.info("No active jobs") # Send status update status_msg = WorkerMessage( type="status_update", worker_id=self.backend_connection.worker_id, agent_name=self.options.agent_id, # Include agent_id status="available" if not self._draining else "draining", load=load, job_count=job_count, ) await self.backend_connection.send_message(status_msg) # Update last status update time self._last_status_update = current_time # Update tracing self._worker_load_graph.add_point(load) except Exception as e: logger.error(f"Error updating worker status: {e}") async def execute_job(self, job_data: Dict[str, Any]) -> Dict[str, Any]: """Execute a job using the task executor.""" if not self.process_manager: raise RuntimeError("Task executor not initialized") # Extract entrypoint function from job data entrypoint = job_data.get("entrypoint", self.options.entrypoint_fnc) # Execute using new task executor result = await self.process_manager.execute( entrypoint=entrypoint, task_type=TaskType.JOB, timeout=job_data.get("timeout", 300.0), retry_count=job_data.get("retry_count", 3), priority=job_data.get("priority", 0), *job_data.get("args", ()), **job_data.get("kwargs", {}), ) # Convert TaskResult to expected format return { "status": result.status.value, "result": result.result, "error": result.error, "execution_time": result.execution_time, "task_id": result.task_id, } async def execute_inference(self, inference_data: Dict[str, Any]) -> Dict[str, Any]: """Execute an inference using the task executor.""" if not self.process_manager: raise RuntimeError("Task executor not initialized") # Extract entrypoint function from inference data entrypoint = inference_data.get("entrypoint", self.options.entrypoint_fnc) # Execute using new task executor result = await self.process_manager.execute( entrypoint=entrypoint, task_type=TaskType.INFERENCE, timeout=inference_data.get("timeout", 300.0), retry_count=inference_data.get("retry_count", 3), priority=inference_data.get("priority", 0), *inference_data.get("args", ()), **inference_data.get("kwargs", {}), ) # Convert TaskResult to expected format return { "status": result.status.value, "result": result.result, "error": result.error, "execution_time": result.execution_time, "task_id": result.task_id, } async def launch_direct_job(self, room_options: "RoomOptions") -> None: """Launch a job directly without backend registration.""" logger.info("Launching direct job") # Create job context job_context = JobContext( room_options=room_options, ) # Set up session end callback if room is created if job_context.room: def on_session_end(reason: str): logger.info(f"Direct job session ended, reason: {reason}") # For direct jobs, we can just log the session end logger.info(f"Session ended: {reason}") job_context.room.on_session_end = on_session_end logger.info("Set up session end callback for direct job") # Execute the job using the worker's entrypoint function await self.options.entrypoint_fnc(job_context) async def simulate_job(self, info: DirectRoomOptions | str) -> None: """Simulate a job for testing purposes.""" if isinstance(info, str): # Simple string format: "room_id" room_id = info info = DirectRoomOptions(room_id=room_id) logger.info(f"Simulating job for room: {info.room_id}") # Create room options room_options = RoomOptions( room_id=info.room_id, name=info.room_name or f"test_room_{info.room_id}", auth_token=info.auth_token or self.options.auth_token, signaling_base_url=self.options.signaling_base_url, auto_end_session=True, ) # Launch the job await self.launch_direct_job(room_options) def get_stats(self) -> Dict[str, Any]: """Get worker statistics.""" # Calculate current load dynamically job_count = len(self._current_jobs) current_load = min(job_count / self.options.max_processes, 1.0) stats = { "worker_load": current_load, "draining": self._draining, "current_jobs": job_count, "max_processes": self.options.max_processes, "agent_id": self.options.agent_id, "register": self.options.register, } if self.backend_connection: stats.update( { "backend_connected": self.backend_connection.is_connected, "worker_id": self.backend_connection.worker_id, } ) if self.process_manager: try: process_stats = self.process_manager.get_stats() logger.debug(f"Process manager stats: {process_stats}") # Get current resource stats and dedicated inference status if "resource_stats" in process_stats: stats["resource_stats"] = process_stats["resource_stats"] logger.debug(f"Resource stats: {process_stats['resource_stats']}") if "dedicated_inference" in process_stats: stats["dedicated_inference"] = process_stats["dedicated_inference"] # Also get current resource info for more detailed stats try: resource_info = self.process_manager.get_resource_info() logger.debug( f"Resource info count: {len(resource_info) if resource_info else 0}" ) if resource_info: stats["resource_info"] = [ { "resource_id": info.resource_id, "resource_type": info.resource_type.value, "status": info.status.value, "current_load": info.current_load, "memory_usage_mb": info.memory_usage_mb, "cpu_usage_percent": info.cpu_usage_percent, "active_tasks": info.active_tasks, "total_tasks_processed": info.total_tasks_processed, "last_heartbeat": info.last_heartbeat, "metadata": info.metadata, } for info in resource_info ] # Add summary of resource status resource_summary = { "total_resources": len(resource_info), "available_resources": len( [r for r in resource_info if r.status == "IDLE"] ), "active_resources": len( [r for r in resource_info if r.status != "IDLE"] ), "dedicated_inference_active": any( r.resource_type == "DEDICATED_INFERENCE" and r.status != "IDLE" for r in resource_info ), } stats["resource_summary"] = resource_summary logger.debug(f"Resource summary: {resource_summary}") except Exception as e: logger.debug(f"Could not get detailed resource info: {e}") except Exception as e: logger.error(f"Error getting process manager stats: {e}") stats["resource_stats"] = {"error": str(e)} stats["dedicated_inference"] = None return stats async def drain(self, timeout: Optional[float] = None) -> None: """Drain the worker - wait for current jobs to finish before shutting down.""" if self._draining: return logger.info("Draining VideoSDK worker") self._draining = True await self._update_worker_status() # Wait for current jobs to complete if self._current_jobs: logger.info( f"Waiting for {len(self._current_jobs)} active jobs to complete" ) if timeout: try: await asyncio.wait_for(self._wait_for_jobs(), timeout) except asyncio.TimeoutError: logger.warning( f"Timeout waiting for jobs to complete after {timeout}s" ) else: await self._wait_for_jobs() async def _wait_for_jobs(self) -> None: """Wait for all current jobs to complete.""" while self._current_jobs: # Wait a bit and check again await asyncio.sleep(1) logger.info(f"Still waiting for {len(self._current_jobs)} jobs to complete") async def _cleanup_all_jobs(self): """Clean up all current jobs and notify registry.""" if not self._current_jobs: return logger.info(f"Cleaning up {len(self._current_jobs)} jobs during shutdown") # Create a copy of jobs to iterate over, as they will be modified jobs_to_clean = list(self._current_jobs.items()) for job_id, job_info in jobs_to_clean: try: logger.info(f"Terminating job {job_id}...") await job_info.job.shutdown() # This calls job.shutdown() logger.info(f"Job {job_id} terminated successfully.") except Exception as e: logger.error(f"Error terminating job {job_id}: {e}") try: if self.backend_connection and self.backend_connection.is_connected: job_update = JobUpdate( job_id=job_id, status="completed", error="Worker shutdown", ) await self.backend_connection.send_message(job_update) logger.info( f"Sent job completion update for job {job_id} during shutdown" ) except Exception as e: logger.error( f"Failed to send job completion update for {job_id}: {e}" ) # Clear all jobs from the worker's state self._current_jobs.clear() logger.info("All jobs cleared from worker") # Send a final status update reflecting zero jobs if self.backend_connection and self.backend_connection.is_connected: await self._send_immediate_status_update() async def shutdown(self): """Shutdown the worker.""" logger.info("Shutting down VideoSDK worker") self._shutdown = True self._draining = True try: # Clean up all jobs first to ensure proper room cleanup await self._cleanup_all_jobs() except Exception as e: logger.error(f"Error during job cleanup: {e}") try: # Send final status update to registry if self.backend_connection and self.backend_connection.is_connected: try: await self._update_worker_status() logger.info("Sent final status update to registry") except Exception as e: logger.warning(f"Failed to send final status update: {e}") # Disconnect from backend if self.backend_connection: logger.info("Disconnecting from backend") await self.backend_connection.disconnect() except Exception as e: logger.error(f"Error during backend cleanup: {e}") try: # Cancel all tasks for task in self._tasks: if not task.done(): task.cancel() # Wait briefly for tasks to complete if self._tasks: done, pending = await asyncio.wait(self._tasks, timeout=2.0) for task in pending: task.cancel() except Exception as e: logger.error(f"Error during task cleanup: {e}") try: # Shutdown task executor if self.process_manager: await self.process_manager.stop() except Exception as e: logger.error(f"Error stopping process manager: {e}") try: # Stop debug HTTP server if self._http_server: await self._http_server.aclose() except Exception as e: logger.error(f"Error stopping HTTP server: {e}") logger.info("VideoSDK worker shutdown complete") async def __aenter__(self): """Async context manager entry.""" await self.initialize() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.shutdown()
VideoSDK worker that manages job execution and backend registration.
def run(self): job_context = functools.partial(self.job.jobctx) entrypoint = functools.partial(self.job.entrypoint) p = multiprocessing.Process( target=_job_runner, args=(entrypoint, job_context) Automatically selects the appropriate executor type based on platform.
Initialize the worker.
Static methods
def run_worker(options: WorkerOptions,
simulate_job: DirectRoomOptions | str | None = None)-
Expand source code
@staticmethod def run_worker( options: WorkerOptions, simulate_job: Optional[DirectRoomOptions | str] = None ): """ Run a VideoSDK worker with the given options. This is the main entry point for running a VideoSDK worker, providing a high-level interface for worker initialization, job management, and lifecycle control. Args: options: Worker configuration options simulate_job: Optional job simulation for direct room joining Example: ```python from videosdk.agents import Worker, WorkerOptions def my_agent(job_ctx): # Your agent code here pass # Configure worker with custom log level - logging is automatically configured! options = WorkerOptions( entrypoint_fnc=my_agent, log_level="DEBUG" # Options: DEBUG, INFO, WARNING, ERROR ) # Run the worker - no manual logging setup needed! Worker.run_worker(options) ``` """ worker = Worker(options) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async def main_task(): try: await worker.initialize() if simulate_job: # Direct room joining mode - force register=False for simulation logger.info("Simulation mode: forcing register=False") worker.options.register = False await worker.simulate_job(simulate_job) elif options.register: # Backend registration mode await worker._run_backend_mode() else: # Default mode - just keep alive while not worker._shutdown: await asyncio.sleep(1) except asyncio.CancelledError: logger.info("Main task cancelled") except Exception as e: logger.error(f"Worker error: {e}") raise finally: await worker.shutdown() main_future = loop.create_task(main_task()) shutting_down = False def signal_handler(signum, frame): nonlocal shutting_down if shutting_down: # If already shutting down, cancel all tasks more aggressively for task in asyncio.all_tasks(loop): task.cancel() return shutting_down = True logger.info(f"Received signal {signum}. Initiating graceful shutdown...") # Cancel the main task loop.call_soon_threadsafe(main_future.cancel) # Set a timeout for graceful shutdown loop.call_later(3.0, lambda: [task.cancel() for task in asyncio.all_tasks(loop)]) try: signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) loop.run_until_complete(main_future) except KeyboardInterrupt: logger.info("Keyboard interrupt received") if not shutting_down: shutting_down = True if not main_future.done(): main_future.cancel() loop.run_until_complete(worker.shutdown()) finally: try: loop.close() except Exception as e: logger.error(f"Error closing event loop: {e}") if loop.is_closed(): logger.info("Event loop closed successfully")
Run a VideoSDK worker with the given options.
This is the main entry point for running a VideoSDK worker, providing a high-level interface for worker initialization, job management, and lifecycle control.
Args
options
- Worker configuration options
simulate_job
- Optional job simulation for direct room joining
Example
from videosdk.agents import Worker, WorkerOptions def my_agent(job_ctx): # Your agent code here pass # Configure worker with custom log level - logging is automatically configured! options = WorkerOptions( entrypoint_fnc=my_agent, log_level="DEBUG" # Options: DEBUG, INFO, WARNING, ERROR ) # Run the worker - no manual logging setup needed! Worker.run_worker(options)
Methods
async def drain(self, timeout: float | None = None) ‑> None
-
Expand source code
async def drain(self, timeout: Optional[float] = None) -> None: """Drain the worker - wait for current jobs to finish before shutting down.""" if self._draining: return logger.info("Draining VideoSDK worker") self._draining = True await self._update_worker_status() # Wait for current jobs to complete if self._current_jobs: logger.info( f"Waiting for {len(self._current_jobs)} active jobs to complete" ) if timeout: try: await asyncio.wait_for(self._wait_for_jobs(), timeout) except asyncio.TimeoutError: logger.warning( f"Timeout waiting for jobs to complete after {timeout}s" ) else: await self._wait_for_jobs()
Drain the worker - wait for current jobs to finish before shutting down.
async def execute_inference(self, inference_data: Dict[str, Any]) ‑> Dict[str, Any]
-
Expand source code
async def execute_inference(self, inference_data: Dict[str, Any]) -> Dict[str, Any]: """Execute an inference using the task executor.""" if not self.process_manager: raise RuntimeError("Task executor not initialized") # Extract entrypoint function from inference data entrypoint = inference_data.get("entrypoint", self.options.entrypoint_fnc) # Execute using new task executor result = await self.process_manager.execute( entrypoint=entrypoint, task_type=TaskType.INFERENCE, timeout=inference_data.get("timeout", 300.0), retry_count=inference_data.get("retry_count", 3), priority=inference_data.get("priority", 0), *inference_data.get("args", ()), **inference_data.get("kwargs", {}), ) # Convert TaskResult to expected format return { "status": result.status.value, "result": result.result, "error": result.error, "execution_time": result.execution_time, "task_id": result.task_id, }
Execute an inference using the task executor.
async def execute_job(self, job_data: Dict[str, Any]) ‑> Dict[str, Any]
-
Expand source code
async def execute_job(self, job_data: Dict[str, Any]) -> Dict[str, Any]: """Execute a job using the task executor.""" if not self.process_manager: raise RuntimeError("Task executor not initialized") # Extract entrypoint function from job data entrypoint = job_data.get("entrypoint", self.options.entrypoint_fnc) # Execute using new task executor result = await self.process_manager.execute( entrypoint=entrypoint, task_type=TaskType.JOB, timeout=job_data.get("timeout", 300.0), retry_count=job_data.get("retry_count", 3), priority=job_data.get("priority", 0), *job_data.get("args", ()), **job_data.get("kwargs", {}), ) # Convert TaskResult to expected format return { "status": result.status.value, "result": result.result, "error": result.error, "execution_time": result.execution_time, "task_id": result.task_id, }
Execute a job using the task executor.
def get_stats(self) ‑> Dict[str, Any]
-
Expand source code
def get_stats(self) -> Dict[str, Any]: """Get worker statistics.""" # Calculate current load dynamically job_count = len(self._current_jobs) current_load = min(job_count / self.options.max_processes, 1.0) stats = { "worker_load": current_load, "draining": self._draining, "current_jobs": job_count, "max_processes": self.options.max_processes, "agent_id": self.options.agent_id, "register": self.options.register, } if self.backend_connection: stats.update( { "backend_connected": self.backend_connection.is_connected, "worker_id": self.backend_connection.worker_id, } ) if self.process_manager: try: process_stats = self.process_manager.get_stats() logger.debug(f"Process manager stats: {process_stats}") # Get current resource stats and dedicated inference status if "resource_stats" in process_stats: stats["resource_stats"] = process_stats["resource_stats"] logger.debug(f"Resource stats: {process_stats['resource_stats']}") if "dedicated_inference" in process_stats: stats["dedicated_inference"] = process_stats["dedicated_inference"] # Also get current resource info for more detailed stats try: resource_info = self.process_manager.get_resource_info() logger.debug( f"Resource info count: {len(resource_info) if resource_info else 0}" ) if resource_info: stats["resource_info"] = [ { "resource_id": info.resource_id, "resource_type": info.resource_type.value, "status": info.status.value, "current_load": info.current_load, "memory_usage_mb": info.memory_usage_mb, "cpu_usage_percent": info.cpu_usage_percent, "active_tasks": info.active_tasks, "total_tasks_processed": info.total_tasks_processed, "last_heartbeat": info.last_heartbeat, "metadata": info.metadata, } for info in resource_info ] # Add summary of resource status resource_summary = { "total_resources": len(resource_info), "available_resources": len( [r for r in resource_info if r.status == "IDLE"] ), "active_resources": len( [r for r in resource_info if r.status != "IDLE"] ), "dedicated_inference_active": any( r.resource_type == "DEDICATED_INFERENCE" and r.status != "IDLE" for r in resource_info ), } stats["resource_summary"] = resource_summary logger.debug(f"Resource summary: {resource_summary}") except Exception as e: logger.debug(f"Could not get detailed resource info: {e}") except Exception as e: logger.error(f"Error getting process manager stats: {e}") stats["resource_stats"] = {"error": str(e)} stats["dedicated_inference"] = None return stats
Get worker statistics.
async def initialize(self)
-
Expand source code
async def initialize(self): """Initialize the worker.""" logger.info("Initializing VideoSDK worker") # Initialize task executor with new execution architecture # Convert ExecutorType to ResourceType resource_type = ( ResourceType.THREAD if self.options.executor_type == ExecutorType.THREAD else ResourceType.PROCESS ) config = ResourceConfig( resource_type=resource_type, num_idle_resources=self.options.num_idle_processes, max_resources=self.options.max_processes, initialize_timeout=self.options.initialize_timeout, close_timeout=self.options.close_timeout, memory_warn_mb=self.options.memory_warn_mb, memory_limit_mb=self.options.memory_limit_mb, ping_interval=self.options.ping_interval, load_threshold=self.options.load_threshold, max_concurrent_tasks=1, # Each resource handles one task at a time executor_type=self.options.executor_type, # Legacy IPC compatibility - dedicated inference process use_dedicated_inference_process=False, # Disable dedicated inference process for now inference_process_timeout=30.0, # Longer timeout for AI model loading inference_memory_warn_mb=1000.0, # Higher threshold for AI models ) self.process_manager = TaskExecutor(config) await self.process_manager.start() # Initialize backend connection if registering if self.options.register: await self._initialize_backend_connection() # Initialize and start debug HTTP server self._http_server = HttpServer( host=self.options.host, port=self.options.port, ) self._http_server.set_worker(self) await self._http_server.start() logger.info("VideoSDK worker initialized successfully")
Initialize the worker.
async def launch_direct_job(self,
room_options: RoomOptions) ‑> None-
Expand source code
async def launch_direct_job(self, room_options: "RoomOptions") -> None: """Launch a job directly without backend registration.""" logger.info("Launching direct job") # Create job context job_context = JobContext( room_options=room_options, ) # Set up session end callback if room is created if job_context.room: def on_session_end(reason: str): logger.info(f"Direct job session ended, reason: {reason}") # For direct jobs, we can just log the session end logger.info(f"Session ended: {reason}") job_context.room.on_session_end = on_session_end logger.info("Set up session end callback for direct job") # Execute the job using the worker's entrypoint function await self.options.entrypoint_fnc(job_context)
Launch a job directly without backend registration.
def setup_meeting_event_handlers(self, job_context, job_id: str)
-
Expand source code
def setup_meeting_event_handlers(self, job_context, job_id: str): """Set up meeting event handlers for a specific job.""" if not job_context.room: logger.warning( f"Cannot set up meeting handlers for job {job_id}: room not available" ) # Set up a delayed handler setup that will be called when room becomes available original_connect = job_context.connect def delayed_handler_setup(): if job_context.room: self._setup_meeting_event_handlers_impl(job_context, job_id) else: logger.warning( f"Room still not available for job {job_id} after connect" ) # Override connect method to set up handlers after room is created async def connect_with_handlers(): result = await original_connect() delayed_handler_setup() return result job_context.connect = connect_with_handlers logger.info(f"Set up delayed meeting event handlers for job {job_id}") return # Room is available, set up handlers immediately self._setup_meeting_event_handlers_impl(job_context, job_id)
Set up meeting event handlers for a specific job.
def setup_session_end_callback(self, job_context, job_id: str)
-
Expand source code
def setup_session_end_callback(self, job_context, job_id: str): """Set up session end callback for automatic session ending.""" if not job_context.room: logger.warning( f"Cannot set up session end callback for job {job_id}: room not available" ) # Set up a delayed callback setup that will be called when room becomes available original_connect = job_context.connect def delayed_callback_setup(): if job_context.room: self._setup_session_end_callback_impl(job_context, job_id) else: logger.warning( f"Room still not available for job {job_id} after connect" ) # Override connect method to set up callback after room is created async def connect_with_callback(): result = await original_connect() delayed_callback_setup() return result job_context.connect = connect_with_callback logger.info(f"Set up delayed session end callback for job {job_id}") return # Room is available, set up callback immediately self._setup_session_end_callback_impl(job_context, job_id)
Set up session end callback for automatic session ending.
async def shutdown(self)
-
Expand source code
async def shutdown(self): """Shutdown the worker.""" logger.info("Shutting down VideoSDK worker") self._shutdown = True self._draining = True try: # Clean up all jobs first to ensure proper room cleanup await self._cleanup_all_jobs() except Exception as e: logger.error(f"Error during job cleanup: {e}") try: # Send final status update to registry if self.backend_connection and self.backend_connection.is_connected: try: await self._update_worker_status() logger.info("Sent final status update to registry") except Exception as e: logger.warning(f"Failed to send final status update: {e}") # Disconnect from backend if self.backend_connection: logger.info("Disconnecting from backend") await self.backend_connection.disconnect() except Exception as e: logger.error(f"Error during backend cleanup: {e}") try: # Cancel all tasks for task in self._tasks: if not task.done(): task.cancel() # Wait briefly for tasks to complete if self._tasks: done, pending = await asyncio.wait(self._tasks, timeout=2.0) for task in pending: task.cancel() except Exception as e: logger.error(f"Error during task cleanup: {e}") try: # Shutdown task executor if self.process_manager: await self.process_manager.stop() except Exception as e: logger.error(f"Error stopping process manager: {e}") try: # Stop debug HTTP server if self._http_server: await self._http_server.aclose() except Exception as e: logger.error(f"Error stopping HTTP server: {e}") logger.info("VideoSDK worker shutdown complete")
Shutdown the worker.
async def simulate_job(self,
info: DirectRoomOptions | str) ‑> None-
Expand source code
async def simulate_job(self, info: DirectRoomOptions | str) -> None: """Simulate a job for testing purposes.""" if isinstance(info, str): # Simple string format: "room_id" room_id = info info = DirectRoomOptions(room_id=room_id) logger.info(f"Simulating job for room: {info.room_id}") # Create room options room_options = RoomOptions( room_id=info.room_id, name=info.room_name or f"test_room_{info.room_id}", auth_token=info.auth_token or self.options.auth_token, signaling_base_url=self.options.signaling_base_url, auto_end_session=True, ) # Launch the job await self.launch_direct_job(room_options)
Simulate a job for testing purposes.
class WorkerJob (entrypoint,
jobctx=None,
options: Options | None = None)-
Expand source code
class WorkerJob: def __init__(self, entrypoint, jobctx=None, options: Optional[Options] = None): """ :param entrypoint: An async function accepting one argument: jobctx :param jobctx: A static object or a callable that returns a context per job :param options: Configuration options for job execution """ if not asyncio.iscoroutinefunction(entrypoint): raise TypeError("entrypoint must be a coroutine function") self.entrypoint = entrypoint self.jobctx = jobctx self.options = options or Options() def start(self): from .worker import Worker, WorkerOptions # Convert JobOptions to WorkerOptions for compatibility worker_options = WorkerOptions( entrypoint_fnc=self.entrypoint, agent_id=self.options.agent_id, auth_token=self.options.auth_token, executor_type=self.options.executor_type, num_idle_processes=self.options.num_idle_processes, initialize_timeout=self.options.initialize_timeout, close_timeout=self.options.close_timeout, memory_warn_mb=self.options.memory_warn_mb, memory_limit_mb=self.options.memory_limit_mb, ping_interval=self.options.ping_interval, max_processes=self.options.max_processes, permissions=self.options.permissions, max_retry=self.options.max_retry, load_threshold=self.options.load_threshold, register=self.options.register, signaling_base_url=self.options.signaling_base_url, host=self.options.host, port=self.options.port, log_level=self.options.log_level, ) # Create worker and run with job context worker = Worker(worker_options) # If register=True, run the worker in backend mode (don't execute entrypoint immediately) if self.options.register: # Run the worker normally (for backend registration mode) Worker.run_worker(worker_options) else: # Direct mode - run entrypoint immediately if we have a job context if self.jobctx: if callable(self.jobctx): job_context = self.jobctx() else: job_context = self.jobctx # Set the current job context and run the entrypoint token = _set_current_job_context(job_context) try: asyncio.run(self.entrypoint(job_context)) finally: _reset_current_job_context(token) else: # No job context provided, run worker normally Worker.run_worker(worker_options)
:param entrypoint: An async function accepting one argument: jobctx :param jobctx: A static object or a callable that returns a context per job :param options: Configuration options for job execution
Methods
def start(self)
-
Expand source code
def start(self): from .worker import Worker, WorkerOptions # Convert JobOptions to WorkerOptions for compatibility worker_options = WorkerOptions( entrypoint_fnc=self.entrypoint, agent_id=self.options.agent_id, auth_token=self.options.auth_token, executor_type=self.options.executor_type, num_idle_processes=self.options.num_idle_processes, initialize_timeout=self.options.initialize_timeout, close_timeout=self.options.close_timeout, memory_warn_mb=self.options.memory_warn_mb, memory_limit_mb=self.options.memory_limit_mb, ping_interval=self.options.ping_interval, max_processes=self.options.max_processes, permissions=self.options.permissions, max_retry=self.options.max_retry, load_threshold=self.options.load_threshold, register=self.options.register, signaling_base_url=self.options.signaling_base_url, host=self.options.host, port=self.options.port, log_level=self.options.log_level, ) # Create worker and run with job context worker = Worker(worker_options) # If register=True, run the worker in backend mode (don't execute entrypoint immediately) if self.options.register: # Run the worker normally (for backend registration mode) Worker.run_worker(worker_options) else: # Direct mode - run entrypoint immediately if we have a job context if self.jobctx: if callable(self.jobctx): job_context = self.jobctx() else: job_context = self.jobctx # Set the current job context and run the entrypoint token = _set_current_job_context(job_context) try: asyncio.run(self.entrypoint(job_context)) finally: _reset_current_job_context(token) else: # No job context provided, run worker normally Worker.run_worker(worker_options)
class WorkerOptions (entrypoint_fnc: Callable[[JobContext], Any],
request_fnc: Callable[[ForwardRef('JobRequest')], Any] | None = None,
initialize_process_fnc: Callable[[Any], Any] | None = None,
executor_type: ExecutorType = ExecutorType.PROCESS,
num_idle_processes: int = 2,
initialize_timeout: float = 10.0,
close_timeout: float = 60.0,
memory_warn_mb: float = 500.0,
memory_limit_mb: float = 0.0,
ping_interval: float = 30.0,
max_processes: int = 10,
agent_id: str = 'VideoSDKAgent',
auth_token: str | None = None,
worker_type: WorkerType = WorkerType.ROOM,
permissions: WorkerPermissions = <factory>,
max_retry: int = 16,
load_threshold: float = 0.75,
register: bool = False,
signaling_base_url: str = 'api.videosdk.live',
host: str = '0.0.0.0',
port: int = 8081,
log_level: str = 'INFO')-
Expand source code
@dataclass class WorkerOptions: """Configuration options for the VideoSDK worker.""" entrypoint_fnc: Callable[[JobContext], Any] """Entrypoint function that will be called when a job is assigned to this worker.""" request_fnc: Optional[Callable[["JobRequest"], Any]] = None """Function to handle job requests and decide whether to accept them.""" initialize_process_fnc: Optional[Callable[[Any], Any]] = None """A function to perform any necessary initialization before the job starts.""" executor_type: ExecutorType = _default_executor_type """Which executor to use to run jobs. Automatically selected based on platform.""" num_idle_processes: int = 2 """Number of idle processes/threads to keep warm.""" initialize_timeout: float = 10.0 """Maximum amount of time to wait for a process/thread to initialize/prewarm""" close_timeout: float = 60.0 """Maximum amount of time to wait for a job to shut down gracefully""" memory_warn_mb: float = 500.0 """Memory warning threshold in MB.""" memory_limit_mb: float = 0.0 """Maximum memory usage for a job in MB. Defaults to 0 (disabled).""" ping_interval: float = 30.0 """Interval between health check pings.""" max_processes: int = 10 """Maximum number of processes/threads.""" agent_id: str = "VideoSDKAgent" """ID of the agent.""" auth_token: Optional[str] = None """VideoSDK authentication token. Uses VIDEOSDK_AUTH_TOKEN env var if not provided. This token is used for both VideoSDK services and registry authentication.""" worker_type: WorkerType = WorkerType.ROOM """Type of worker (room or publisher).""" permissions: WorkerPermissions = field(default_factory=WorkerPermissions) """Permissions for the agent participant.""" max_retry: int = 16 """Maximum number of times to retry connecting to VideoSDK.""" load_threshold: float = 0.75 """Load threshold above which worker is marked as unavailable.""" register: bool = False """Whether to register with the backend. Defaults to False for local development.""" signaling_base_url: str = "api.videosdk.live" """Signaling base URL for VideoSDK services. Defaults to api.videosdk.live.""" host: str = "0.0.0.0" """Host for the debug HTTP server.""" port: int = 8081 """Port for the debug HTTP server.""" log_level: str = "INFO" """Log level for SDK logging. Options: DEBUG, INFO, WARNING, ERROR. Defaults to INFO.""" def __post_init__(self): """Post-initialization setup.""" if not self.auth_token: self.auth_token = os.getenv("VIDEOSDK_AUTH_TOKEN") # Log the selected executor type logger.info(f"Worker configured with {self.executor_type.value} executor")
Configuration options for the VideoSDK worker.
Instance variables
var agent_id : str
-
ID of the agent.
var auth_token : str | None
-
VideoSDK authentication token. Uses VIDEOSDK_AUTH_TOKEN env var if not provided. This token is used for both VideoSDK services and registry authentication.
var close_timeout : float
-
Maximum amount of time to wait for a job to shut down gracefully
var entrypoint_fnc : Callable[[JobContext], Any]
-
Entrypoint function that will be called when a job is assigned to this worker.
var executor_type : ExecutorType
-
Which executor to use to run jobs. Automatically selected based on platform.
var host : str
-
Host for the debug HTTP server.
var initialize_process_fnc : Callable[[Any], Any] | None
-
A function to perform any necessary initialization before the job starts.
var initialize_timeout : float
-
Maximum amount of time to wait for a process/thread to initialize/prewarm
var load_threshold : float
-
Load threshold above which worker is marked as unavailable.
var log_level : str
-
Log level for SDK logging. Options: DEBUG, INFO, WARNING, ERROR. Defaults to INFO.
var max_processes : int
-
Maximum number of processes/threads.
var max_retry : int
-
Maximum number of times to retry connecting to VideoSDK.
var memory_limit_mb : float
-
Maximum memory usage for a job in MB. Defaults to 0 (disabled).
var memory_warn_mb : float
-
Memory warning threshold in MB.
var num_idle_processes : int
-
Number of idle processes/threads to keep warm.
var permissions : WorkerPermissions
-
Permissions for the agent participant.
var ping_interval : float
-
Interval between health check pings.
var port : int
-
Port for the debug HTTP server.
var register : bool
-
Whether to register with the backend. Defaults to False for local development.
var request_fnc : Callable[[JobRequest], Any] | None
-
Function to handle job requests and decide whether to accept them.
var signaling_base_url : str
-
Signaling base URL for VideoSDK services. Defaults to api.videosdk.live.
var worker_type : WorkerType
-
Type of worker (room or publisher).
class WorkerType (*args, **kwds)
-
Expand source code
class WorkerType(Enum): ROOM = "room"
Create a collection of name/value pairs.
Example enumeration:
>>> class Color(Enum): ... RED = 1 ... BLUE = 2 ... GREEN = 3
Access them by:
- attribute access:
Color.RED
- value lookup:
Color(1)
- name lookup:
Color['RED']
Enumerations can be iterated over, and know how many members they have:
>>> len(Color) 3
>>> list(Color) [<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]
Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.
Ancestors
- enum.Enum
Class variables
var ROOM