Module agents.execution
Execution module for VideoSDK Agents.
This module provides a modern resource management architecture for executing agent tasks using processes and threads with optimized resource allocation.
Sub-modules
agents.execution.base_resource-
Base resource class for task execution.
agents.execution.inference_resource-
Dedicated inference resource for AI model processing …
agents.execution.resource_manager-
Resource manager for task execution.
agents.execution.resources-
Concrete resource implementations for process and thread execution.
agents.execution.task_executor-
Task executor for high-level task execution.
agents.execution.types-
Type definitions for the execution module.
Classes
class DedicatedInferenceResource (resource_id: str, config: Dict[str, Any])-
Expand source code
class DedicatedInferenceResource(BaseResource): """ Dedicated inference resource that runs AI models in a separate process. This mimics the old IPC system's single shared inference process that handles all STT, LLM, TTS, and VAD tasks for all agent jobs. """ def __init__(self, resource_id: str, config: Dict[str, Any]): super().__init__(resource_id, config) self.process: Optional[Process] = None self.parent_conn: Optional[Connection] = None self.child_conn: Optional[Connection] = None self._process_ready = False self._models_cache: Dict[str, Any] = {} # Inference-specific configuration self.initialize_timeout = config.get("inference_process_timeout", 30.0) self.memory_warn_mb = config.get("inference_memory_warn_mb", 1000.0) self.ping_interval = config.get("ping_interval", 30.0) @property def resource_type(self) -> ResourceType: return ResourceType.PROCESS async def _initialize_impl(self) -> None: """Initialize the dedicated inference process.""" logger.info(f"Initializing dedicated inference process: {self.resource_id}") # Create pipe for communication self.parent_conn, self.child_conn = Pipe() # Start the inference process self.process = Process( target=self._run_inference_process, args=(self.resource_id, self.child_conn, self.config), daemon=True, ) self.process.start() # Wait for process to be ready start_time = time.time() while ( not self._process_ready and (time.time() - start_time) < self.initialize_timeout ): try: if self.parent_conn.poll(): message = self.parent_conn.recv() if message.get("type") == "ready": self._process_ready = True break elif message.get("type") == "error": raise Exception( f"Inference process error: {message.get('error')}" ) await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking inference process readiness: {e}") if not self._process_ready: raise TimeoutError( f"Inference process {self.resource_id} failed to initialize within {self.initialize_timeout}s" ) logger.info( f"Dedicated inference process initialized: {self.resource_id} (PID: {self.process.pid})" ) async def _execute_task_impl( self, task_id: str, config, entrypoint: Callable, args: tuple, kwargs: dict ) -> Any: """Execute inference task in the dedicated process.""" if not self._process_ready: raise RuntimeError(f"Inference process {self.resource_id} is not ready") # Prepare inference data inference_data = { "task_id": task_id, "task_type": config.task_type.value, "model_config": config.data.get("model_config", {}), "input_data": config.data.get("input_data", {}), "timeout": config.timeout, } # Send inference request to process self.parent_conn.send({"type": "inference", "data": inference_data}) # Wait for result start_time = time.time() while (time.time() - start_time) < config.timeout: try: if self.parent_conn.poll(): message = self.parent_conn.recv() if ( message.get("type") == "result" and message.get("task_id") == task_id ): if message.get("status") == "success": return message.get("result") else: raise RuntimeError( message.get("error", "Unknown inference error") ) elif message.get("type") == "error": raise RuntimeError( message.get("error", "Inference process error") ) await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking inference result: {e}") raise TimeoutError( f"Inference task {task_id} timed out after {config.timeout}s" ) async def _shutdown_impl(self) -> None: """Shutdown the dedicated inference process.""" if self.process and self.process.is_alive(): # Send shutdown signal self.parent_conn.send({"type": "shutdown"}) # Wait for graceful shutdown timeout = self.config.get("close_timeout", 60.0) start_time = time.time() while self.process.is_alive() and (time.time() - start_time) < timeout: await asyncio.sleep(0.1) # Force terminate if still alive if self.process.is_alive(): logger.warning( f"Force terminating inference process {self.resource_id}" ) self.process.terminate() self.process.join(timeout=5.0) if self.process.is_alive(): self.process.kill() async def health_check(self) -> bool: """Perform a health check on the dedicated inference process.""" try: if self._shutdown or not self.process or not self.process.is_alive(): return False # Send ping to inference process self.parent_conn.send({"type": "ping"}) # Wait for ping response start_time = time.time() timeout = 5.0 # 5 second timeout for health check while (time.time() - start_time) < timeout: try: if self.parent_conn.poll(): message = self.parent_conn.recv() if message.get("type") == "ping_response": # Update last heartbeat self.last_heartbeat = time.time() return True elif message.get("type") == "error": logger.error( f"Inference process error: {message.get('error')}" ) return False await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking inference process health: {e}") # Timeout - process is unresponsive logger.warning(f"Inference process {self.resource_id} health check timeout") return False except Exception as e: logger.error( f"Health check failed for inference process {self.resource_id}: {e}" ) return False @staticmethod def _run_inference_process( resource_id: str, conn: Connection, config: Dict[str, Any] ): """Run the inference process in a separate process.""" try: # Set up logging logging.basicConfig(level=logging.INFO) logger.info( f"Inference process started: {resource_id} (PID: {os.getpid()})" ) # Set up signal handlers def signal_handler(signum, frame): logger.info("Received shutdown signal") conn.send({"type": "shutdown_ack"}) sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # Send ready signal conn.send({"type": "ready"}) # Model cache for reuse models_cache: Dict[str, Any] = {} async def main_loop(): while True: try: if conn.poll(timeout=1.0): message = conn.recv() message_type = message.get("type") if message_type == "inference": await _handle_inference( conn, message.get("data", {}), models_cache ) elif message_type == "ping": await _handle_ping(conn) elif message_type == "shutdown": logger.info("Received shutdown request") conn.send({"type": "shutdown_ack"}) break else: logger.warning(f"Unknown message type: {message_type}") except Exception as e: logger.error(f"Error in inference process main loop: {e}") conn.send({"type": "error", "error": str(e)}) asyncio.run(main_loop()) except Exception as e: logger.error(f"Fatal error in inference process: {e}") conn.send({"type": "error", "error": str(e)}) sys.exit(1) finally: logger.info("Inference process shutting down") conn.close()Dedicated inference resource that runs AI models in a separate process.
This mimics the old IPC system's single shared inference process that handles all STT, LLM, TTS, and VAD tasks for all agent jobs.
Ancestors
- BaseResource
- abc.ABC
Methods
async def health_check(self) ‑> bool-
Expand source code
async def health_check(self) -> bool: """Perform a health check on the dedicated inference process.""" try: if self._shutdown or not self.process or not self.process.is_alive(): return False # Send ping to inference process self.parent_conn.send({"type": "ping"}) # Wait for ping response start_time = time.time() timeout = 5.0 # 5 second timeout for health check while (time.time() - start_time) < timeout: try: if self.parent_conn.poll(): message = self.parent_conn.recv() if message.get("type") == "ping_response": # Update last heartbeat self.last_heartbeat = time.time() return True elif message.get("type") == "error": logger.error( f"Inference process error: {message.get('error')}" ) return False await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking inference process health: {e}") # Timeout - process is unresponsive logger.warning(f"Inference process {self.resource_id} health check timeout") return False except Exception as e: logger.error( f"Health check failed for inference process {self.resource_id}: {e}" ) return FalsePerform a health check on the dedicated inference process.
Inherited members
class ExecutorType (*args, **kwds)-
Expand source code
class ExecutorType(Enum): """Type of executor for task processing.""" PROCESS = "process" THREAD = "thread"Type of executor for task processing.
Ancestors
- enum.Enum
Class variables
var PROCESSvar THREAD
class HealthMetrics (resource_id: str,
timestamp: float,
memory_usage_mb: float,
cpu_usage_percent: float,
active_tasks: int,
response_time_ms: float,
error_count: int = 0,
success_count: int = 0)-
Expand source code
@dataclass class HealthMetrics: """Health metrics for resource monitoring.""" resource_id: str timestamp: float memory_usage_mb: float cpu_usage_percent: float active_tasks: int response_time_ms: float error_count: int = 0 success_count: int = 0Health metrics for resource monitoring.
Instance variables
var active_tasks : intvar cpu_usage_percent : floatvar error_count : intvar memory_usage_mb : floatvar resource_id : strvar response_time_ms : floatvar success_count : intvar timestamp : float
class ProcessResource (resource_id: str, config: Dict[str, Any])-
Expand source code
class ProcessResource(BaseResource): """ Process-based resource for task execution. Uses multiprocessing to create isolated processes for task execution. """ def __init__(self, resource_id: str, config: Dict[str, Any]): super().__init__(resource_id, config) self.process: Optional[Process] = None self.task_queue: Optional[Queue] = None self.result_queue: Optional[Queue] = None self.control_queue: Optional[Queue] = None self._process_ready = False @property def resource_type(self) -> ResourceType: return ResourceType.PROCESS async def _initialize_impl(self) -> None: """Initialize the process resource.""" # Create queues for communication self.task_queue = Queue() self.result_queue = Queue() self.control_queue = Queue() # Start the process self.process = Process( target=self._process_worker, args=( self.resource_id, self.task_queue, self.result_queue, self.control_queue, self.config, ), daemon=True, ) self.process.start() # Wait for process to be ready timeout = self.config.get("initialize_timeout", 10.0) start_time = time.time() while not self._process_ready and (time.time() - start_time) < timeout: try: # Check for ready signal if not self.control_queue.empty(): message = self.control_queue.get_nowait() if message.get("type") == "ready": self._process_ready = True break await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking process readiness: {e}") if not self._process_ready: raise TimeoutError( f"Process {self.resource_id} failed to initialize within {timeout}s" ) async def _execute_task_impl( self, task_id: str, config, entrypoint: Callable, args: tuple, kwargs: dict ) -> Any: """Execute task in the process.""" if not self._process_ready: raise RuntimeError(f"Process {self.resource_id} is not ready") # Send task to process task_data = { "task_id": task_id, "config": config, "entrypoint_name": entrypoint.__name__, "args": args, "kwargs": kwargs, } self.task_queue.put(task_data) # Wait for result timeout = config.timeout start_time = time.time() while (time.time() - start_time) < timeout: try: if not self.result_queue.empty(): result_data = self.result_queue.get_nowait() if result_data.get("task_id") == task_id: if result_data.get("status") == "success": return result_data.get("result") else: raise RuntimeError( result_data.get("error", "Unknown error") ) await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking task result: {e}") raise TimeoutError(f"Task {task_id} timed out after {timeout}s") async def _shutdown_impl(self) -> None: """Shutdown the process resource.""" if self.process and self.process.is_alive(): # Send shutdown signal self.control_queue.put({"type": "shutdown"}) # Wait for graceful shutdown timeout = self.config.get("close_timeout", 60.0) start_time = time.time() while self.process.is_alive() and (time.time() - start_time) < timeout: await asyncio.sleep(0.1) # Force terminate if still alive if self.process.is_alive(): logger.warning(f"Force terminating process {self.resource_id}") self.process.terminate() self.process.join(timeout=5.0) if self.process.is_alive(): self.process.kill() @staticmethod def _process_worker( resource_id: str, task_queue: Queue, result_queue: Queue, control_queue: Queue, config: Dict[str, Any], ): """Worker function that runs in the process.""" try: logger.info(f"Process worker {resource_id} started") # Signal ready control_queue.put({"type": "ready"}) # Main task processing loop while True: try: # Check for shutdown signal if not control_queue.empty(): message = control_queue.get_nowait() if message.get("type") == "shutdown": break # Check for tasks if not task_queue.empty(): task_data = task_queue.get_nowait() task_id = task_data["task_id"] try: # Execute the task # Note: In a real implementation, you'd need to serialize/deserialize the entrypoint # For now, we'll use a simple approach result = {"status": "completed", "task_id": task_id} result_queue.put( { "task_id": task_id, "status": "success", "result": result, } ) except Exception as e: result_queue.put( {"task_id": task_id, "status": "error", "error": str(e)} ) else: time.sleep(0.1) except Exception as e: logger.error(f"Error in process worker {resource_id}: {e}") time.sleep(1.0) logger.info(f"Process worker {resource_id} shutting down") except Exception as e: logger.error(f"Fatal error in process worker {resource_id}: {e}")Process-based resource for task execution.
Uses multiprocessing to create isolated processes for task execution.
Ancestors
- BaseResource
- abc.ABC
Inherited members
class ResourceConfig (resource_type: ResourceType = ResourceType.PROCESS,
num_idle_resources: int = 2,
max_resources: int = 10,
initialize_timeout: float = 10.0,
close_timeout: float = 60.0,
memory_warn_mb: float = 500.0,
memory_limit_mb: float = 0.0,
ping_interval: float = 30.0,
health_check_interval: float = 5.0,
load_threshold: float = 0.75,
max_concurrent_tasks: int = 1,
executor_type: ExecutorType = ExecutorType.PROCESS,
use_dedicated_inference_process: bool = True,
inference_process_timeout: float = 30.0,
inference_memory_warn_mb: float = 1000.0)-
Expand source code
@dataclass class ResourceConfig: """Configuration for resource management.""" # Resource type and count resource_type: ResourceType = ResourceType.PROCESS num_idle_resources: int = 2 max_resources: int = 10 # Timeouts initialize_timeout: float = 10.0 close_timeout: float = 60.0 # Memory management memory_warn_mb: float = 500.0 memory_limit_mb: float = 0.0 # Health monitoring ping_interval: float = 30.0 health_check_interval: float = 5.0 # Load balancing load_threshold: float = 0.75 max_concurrent_tasks: int = 1 # Platform-specific executor_type: ExecutorType = _default_executor_type # Legacy IPC compatibility use_dedicated_inference_process: bool = ( True # New: Enable dedicated inference process ) inference_process_timeout: float = 30.0 # Longer timeout for AI model loading inference_memory_warn_mb: float = 1000.0 # Higher threshold for AI modelsConfiguration for resource management.
Instance variables
var close_timeout : floatvar executor_type : ExecutorTypevar health_check_interval : floatvar inference_memory_warn_mb : floatvar inference_process_timeout : floatvar initialize_timeout : floatvar load_threshold : floatvar max_concurrent_tasks : intvar max_resources : intvar memory_limit_mb : floatvar memory_warn_mb : floatvar num_idle_resources : intvar ping_interval : floatvar resource_type : ResourceTypevar use_dedicated_inference_process : bool
class ResourceInfo (resource_id: str,
resource_type: ResourceType,
status: ResourceStatus,
current_load: float = 0.0,
memory_usage_mb: float = 0.0,
cpu_usage_percent: float = 0.0,
active_tasks: int = 0,
total_tasks_processed: int = 0,
last_heartbeat: float = 0.0,
metadata: Dict[str, Any] = <factory>)-
Expand source code
@dataclass class ResourceInfo: """Information about a resource.""" resource_id: str resource_type: ResourceType status: ResourceStatus current_load: float = 0.0 memory_usage_mb: float = 0.0 cpu_usage_percent: float = 0.0 active_tasks: int = 0 total_tasks_processed: int = 0 last_heartbeat: float = 0.0 # Resource-specific metadata metadata: Dict[str, Any] = field(default_factory=dict)Information about a resource.
Instance variables
var active_tasks : intvar cpu_usage_percent : floatvar current_load : floatvar last_heartbeat : floatvar memory_usage_mb : floatvar metadata : Dict[str, Any]var resource_id : strvar resource_type : ResourceTypevar status : ResourceStatusvar total_tasks_processed : int
class ResourceManager (config: ResourceConfig)-
Expand source code
class ResourceManager: """ Manages resources for task execution. This class handles: - Resource creation and lifecycle management - Load balancing across resources - Health monitoring and recovery - Resource allocation for tasks - Dedicated inference process management (legacy IPC compatibility) """ def __init__(self, config: ResourceConfig): self.config = config self.resources: List[BaseResource] = [] self._shutdown = False self._health_check_task: Optional[asyncio.Task] = None self._resource_creation_task: Optional[asyncio.Task] = None # Dedicated inference resource (legacy IPC compatibility) self.dedicated_inference_resource: Optional[DedicatedInferenceResource] = None async def start(self): """Start the resource manager.""" logger.info("Starting resource manager") # Create dedicated inference resource if enabled if self.config.use_dedicated_inference_process: await self._create_dedicated_inference_resource() # Start health monitoring self._health_check_task = asyncio.create_task(self._health_check_loop()) # Start resource creation self._resource_creation_task = asyncio.create_task( self._resource_creation_loop() ) # Initialize initial resources await self._create_initial_resources() logger.info("Resource manager started") async def stop(self): """Stop the resource manager.""" logger.info("Stopping resource manager") self._shutdown = True # Cancel background tasks if self._health_check_task: self._health_check_task.cancel() if self._resource_creation_task: self._resource_creation_task.cancel() # Shutdown all resources shutdown_tasks = [] for resource in self.resources: shutdown_tasks.append(resource.shutdown()) # Shutdown dedicated inference resource if self.dedicated_inference_resource: shutdown_tasks.append(self.dedicated_inference_resource.shutdown()) if shutdown_tasks: await asyncio.gather(*shutdown_tasks, return_exceptions=True) logger.info("Resource manager stopped") async def _create_dedicated_inference_resource(self): """Create the dedicated inference resource (legacy IPC compatibility).""" logger.info("Creating dedicated inference resource") inference_config = { "inference_process_timeout": self.config.inference_process_timeout, "inference_memory_warn_mb": self.config.inference_memory_warn_mb, "ping_interval": self.config.ping_interval, "close_timeout": self.config.close_timeout, } self.dedicated_inference_resource = DedicatedInferenceResource( resource_id="dedicated-inference", config=inference_config ) await self.dedicated_inference_resource.initialize() logger.info("Dedicated inference resource created") async def _create_initial_resources(self): """Create initial resources based on configuration.""" initial_count = self.config.num_idle_resources logger.info( f"Creating {initial_count} initial {self.config.resource_type.value} resources" ) for i in range(initial_count): await self._create_resource(self.config.resource_type) async def _create_resource(self, resource_type: ResourceType) -> BaseResource: """Create a new resource of the specified type.""" resource_id = f"{resource_type.value}-{uuid.uuid4().hex[:8]}" config = { "max_concurrent_tasks": self.config.max_concurrent_tasks, "initialize_timeout": self.config.initialize_timeout, "close_timeout": self.config.close_timeout, "health_check_interval": self.config.health_check_interval, } if resource_type == ResourceType.PROCESS: resource = ProcessResource(resource_id, config) elif resource_type == ResourceType.THREAD: resource = ThreadResource(resource_id, config) else: raise ValueError(f"Unsupported resource type: {resource_type}") # Initialize the resource await resource.initialize() # Add to resources list self.resources.append(resource) logger.info(f"Created {resource_type.value} resource: {resource_id}") return resource async def _resource_creation_loop(self): """Background loop for creating resources as needed.""" # Wait a bit longer before starting the loop to allow initial resources to stabilize await asyncio.sleep(10.0) while not self._shutdown: try: # Check if we need more resources available_count = len([r for r in self.resources if r.is_available]) total_count = len(self.resources) # Create more resources if needed if ( available_count < self.config.num_idle_resources and total_count < self.config.max_resources ): logger.info( f"Creating additional {self.config.resource_type.value} resource" ) await self._create_resource(self.config.resource_type) await asyncio.sleep(10.0) # Check every 10 seconds instead of 5 except Exception as e: logger.error(f"Error in resource creation loop: {e}") await asyncio.sleep(5.0) async def _health_check_loop(self): """Background loop for health monitoring.""" while not self._shutdown: try: # Check job resources for resource in self.resources[ : ]: # Copy list to avoid modification during iteration try: is_healthy = await resource.health_check() if not is_healthy: logger.warning( f"Unhealthy resource detected: {resource.resource_id}" ) # Remove unhealthy resource self.resources.remove(resource) await resource.shutdown() # Create replacement if needed if len(self.resources) < self.config.num_idle_resources: await self._create_resource(self.config.resource_type) except Exception as e: logger.error( f"Health check failed for {resource.resource_id}: {e}" ) # Check dedicated inference resource if self.dedicated_inference_resource: try: is_healthy = ( await self.dedicated_inference_resource.health_check() ) if not is_healthy: logger.warning( "Unhealthy dedicated inference resource detected" ) # Recreate inference resource await self.dedicated_inference_resource.shutdown() await self._create_dedicated_inference_resource() except Exception as e: logger.error( f"Health check failed for dedicated inference resource: {e}" ) await asyncio.sleep(self.config.health_check_interval) except Exception as e: logger.error(f"Error in health check loop: {e}") await asyncio.sleep(5.0) async def execute_task( self, task_config: TaskConfig, entrypoint: Callable, *args, **kwargs ) -> TaskResult: """Execute a task using an available resource.""" task_id = str(uuid.uuid4()) # Route inference tasks to dedicated inference resource if ( task_config.task_type == TaskType.INFERENCE and self.dedicated_inference_resource ): logger.info( f"Routing inference task {task_id} to dedicated inference resource" ) return await self.dedicated_inference_resource.execute_task( task_id, task_config, entrypoint, args, kwargs ) # Route other tasks to job resources resource = await self._get_available_resource(task_config.task_type) if not resource: raise RuntimeError("No available resources for task execution") # Execute the task return await resource.execute_task( task_id, task_config, entrypoint, args, kwargs ) async def _get_available_resource( self, task_type: TaskType ) -> Optional[BaseResource]: """Get an available resource for task execution.""" # For now, use simple round-robin selection # In the future, this could be enhanced with load balancing, priority, etc. available_resources = [r for r in self.resources if r.is_available] if available_resources: # Simple round-robin selection # In a real implementation, you might want more sophisticated load balancing return available_resources[0] return None def get_stats(self) -> Dict[str, Any]: """Get resource manager statistics.""" available_resources = [r for r in self.resources if r.is_available] active_resources = [ r for r in self.resources if r.status != ResourceStatus.IDLE ] total_resources = len(self.resources) average_load = ( len(active_resources) / total_resources if total_resources > 0 else 0.0 ) stats = { "total_resources": total_resources, "available_resources": len(available_resources), "active_resources": len(active_resources), "average_load": average_load, "resources": [ { "resource_id": r.get_info().resource_id, "resource_type": r.get_info().resource_type.value, "status": r.get_info().status.value, "current_load": r.get_info().current_load, "memory_usage_mb": r.get_info().memory_usage_mb, "cpu_usage_percent": r.get_info().cpu_usage_percent, "active_tasks": r.get_info().active_tasks, "total_tasks_processed": r.get_info().total_tasks_processed, "last_heartbeat": r.get_info().last_heartbeat, "metadata": r.get_info().metadata, } for r in self.resources ], "dedicated_inference": None, } # Dedicated inference resource stats if self.dedicated_inference_resource: info = self.dedicated_inference_resource.get_info() stats["dedicated_inference"] = { "resource_id": info.resource_id, "resource_type": info.resource_type.value, "status": info.status.value, "current_load": info.current_load, "memory_usage_mb": info.memory_usage_mb, "cpu_usage_percent": info.cpu_usage_percent, "active_tasks": info.active_tasks, "total_tasks_processed": info.total_tasks_processed, "last_heartbeat": info.last_heartbeat, "metadata": info.metadata, } return stats def get_resource_info(self) -> List[ResourceInfo]: """Get information about all resources.""" resource_info = [] # Job resources for resource in self.resources: resource_info.append(resource.get_info()) # Dedicated inference resource if self.dedicated_inference_resource: resource_info.append(self.dedicated_inference_resource.get_info()) return resource_infoManages resources for task execution.
This class handles: - Resource creation and lifecycle management - Load balancing across resources - Health monitoring and recovery - Resource allocation for tasks - Dedicated inference process management (legacy IPC compatibility)
Methods
async def execute_task(self,
task_config: TaskConfig,
entrypoint: Callable,
*args,
**kwargs) ‑> TaskResult-
Expand source code
async def execute_task( self, task_config: TaskConfig, entrypoint: Callable, *args, **kwargs ) -> TaskResult: """Execute a task using an available resource.""" task_id = str(uuid.uuid4()) # Route inference tasks to dedicated inference resource if ( task_config.task_type == TaskType.INFERENCE and self.dedicated_inference_resource ): logger.info( f"Routing inference task {task_id} to dedicated inference resource" ) return await self.dedicated_inference_resource.execute_task( task_id, task_config, entrypoint, args, kwargs ) # Route other tasks to job resources resource = await self._get_available_resource(task_config.task_type) if not resource: raise RuntimeError("No available resources for task execution") # Execute the task return await resource.execute_task( task_id, task_config, entrypoint, args, kwargs )Execute a task using an available resource.
def get_resource_info(self) ‑> List[ResourceInfo]-
Expand source code
def get_resource_info(self) -> List[ResourceInfo]: """Get information about all resources.""" resource_info = [] # Job resources for resource in self.resources: resource_info.append(resource.get_info()) # Dedicated inference resource if self.dedicated_inference_resource: resource_info.append(self.dedicated_inference_resource.get_info()) return resource_infoGet information about all resources.
def get_stats(self) ‑> Dict[str, Any]-
Expand source code
def get_stats(self) -> Dict[str, Any]: """Get resource manager statistics.""" available_resources = [r for r in self.resources if r.is_available] active_resources = [ r for r in self.resources if r.status != ResourceStatus.IDLE ] total_resources = len(self.resources) average_load = ( len(active_resources) / total_resources if total_resources > 0 else 0.0 ) stats = { "total_resources": total_resources, "available_resources": len(available_resources), "active_resources": len(active_resources), "average_load": average_load, "resources": [ { "resource_id": r.get_info().resource_id, "resource_type": r.get_info().resource_type.value, "status": r.get_info().status.value, "current_load": r.get_info().current_load, "memory_usage_mb": r.get_info().memory_usage_mb, "cpu_usage_percent": r.get_info().cpu_usage_percent, "active_tasks": r.get_info().active_tasks, "total_tasks_processed": r.get_info().total_tasks_processed, "last_heartbeat": r.get_info().last_heartbeat, "metadata": r.get_info().metadata, } for r in self.resources ], "dedicated_inference": None, } # Dedicated inference resource stats if self.dedicated_inference_resource: info = self.dedicated_inference_resource.get_info() stats["dedicated_inference"] = { "resource_id": info.resource_id, "resource_type": info.resource_type.value, "status": info.status.value, "current_load": info.current_load, "memory_usage_mb": info.memory_usage_mb, "cpu_usage_percent": info.cpu_usage_percent, "active_tasks": info.active_tasks, "total_tasks_processed": info.total_tasks_processed, "last_heartbeat": info.last_heartbeat, "metadata": info.metadata, } return statsGet resource manager statistics.
async def start(self)-
Expand source code
async def start(self): """Start the resource manager.""" logger.info("Starting resource manager") # Create dedicated inference resource if enabled if self.config.use_dedicated_inference_process: await self._create_dedicated_inference_resource() # Start health monitoring self._health_check_task = asyncio.create_task(self._health_check_loop()) # Start resource creation self._resource_creation_task = asyncio.create_task( self._resource_creation_loop() ) # Initialize initial resources await self._create_initial_resources() logger.info("Resource manager started")Start the resource manager.
async def stop(self)-
Expand source code
async def stop(self): """Stop the resource manager.""" logger.info("Stopping resource manager") self._shutdown = True # Cancel background tasks if self._health_check_task: self._health_check_task.cancel() if self._resource_creation_task: self._resource_creation_task.cancel() # Shutdown all resources shutdown_tasks = [] for resource in self.resources: shutdown_tasks.append(resource.shutdown()) # Shutdown dedicated inference resource if self.dedicated_inference_resource: shutdown_tasks.append(self.dedicated_inference_resource.shutdown()) if shutdown_tasks: await asyncio.gather(*shutdown_tasks, return_exceptions=True) logger.info("Resource manager stopped")Stop the resource manager.
class ResourceStatus (*args, **kwds)-
Expand source code
class ResourceStatus(Enum): """Status of a resource.""" IDLE = "idle" BUSY = "busy" INITIALIZING = "initializing" SHUTTING_DOWN = "shutting_down" ERROR = "error"Status of a resource.
Ancestors
- enum.Enum
Class variables
var BUSYvar ERRORvar IDLEvar INITIALIZINGvar SHUTTING_DOWN
class ResourceType (*args, **kwds)-
Expand source code
class ResourceType(Enum): """Type of resource for task execution.""" PROCESS = "process" THREAD = "thread"Type of resource for task execution.
Ancestors
- enum.Enum
Class variables
var PROCESSvar THREAD
class TaskConfig (task_type: TaskType,
timeout: float = 300.0,
retry_count: int = 3,
priority: int = 0,
required_memory_mb: float = 100.0,
required_cpu_cores: int = 1,
data: Dict[str, Any] = <factory>)-
Expand source code
@dataclass class TaskConfig: """Configuration for task execution.""" task_type: TaskType timeout: float = 300.0 # 5 minutes default retry_count: int = 3 priority: int = 0 # Higher number = higher priority # Resource requirements required_memory_mb: float = 100.0 required_cpu_cores: int = 1 # Task-specific data data: Dict[str, Any] = field(default_factory=dict)Configuration for task execution.
Instance variables
var data : Dict[str, Any]var priority : intvar required_cpu_cores : intvar required_memory_mb : floatvar retry_count : intvar task_type : TaskTypevar timeout : float
class TaskExecutor (config: ResourceConfig)-
Expand source code
class TaskExecutor: """ High-level task executor that manages task execution using resources. This class provides a simple interface for executing tasks while handling resource management, retries, and monitoring internally. """ def __init__(self, config: ResourceConfig): self.config = config self.resource_manager = ResourceManager(config) self._shutdown = False self._total_tasks = 0 self._completed_tasks = 0 self._failed_tasks = 0 self._total_execution_time = 0.0 async def start(self): """Start the task executor.""" logger.info("Starting task executor") await self.resource_manager.start() logger.info("Task executor started") async def stop(self): """Stop the task executor.""" logger.info("Stopping task executor") self._shutdown = True # Stop resource manager await self.resource_manager.stop() logger.info("Task executor stopped") async def execute( self, entrypoint: Callable, task_type: TaskType = TaskType.JOB, timeout: float = 300.0, retry_count: int = 3, priority: int = 0, *args, **kwargs, ) -> TaskResult: """ Execute a task using the resource manager. Args: entrypoint: Function to execute task_type: Type of task (inference, meeting, job) timeout: Task timeout in seconds retry_count: Number of retries on failure priority: Task priority (higher = higher priority) *args, **kwargs: Arguments to pass to entrypoint Returns: TaskResult with execution results """ task_config = TaskConfig( task_type=task_type, timeout=timeout, retry_count=retry_count, priority=priority, ) # Execute with retries last_error = None for attempt in range(retry_count + 1): try: result = await self.resource_manager.execute_task( task_config, entrypoint, *args, **kwargs ) # Update stats self._update_stats(result) if result.status == TaskStatus.COMPLETED: return result else: last_error = result.error except Exception as e: last_error = str(e) logger.warning(f"Task execution attempt {attempt + 1} failed: {e}") if attempt < retry_count: await asyncio.sleep(1.0 * (attempt + 1)) # Exponential backoff # All retries failed failed_result = TaskResult( task_id=task_config.task_type.value, status=TaskStatus.FAILED, error=f"All {retry_count + 1} attempts failed. Last error: {last_error}", execution_time=0.0, ) self._update_stats(failed_result) return failed_result def _update_stats(self, result: TaskResult): """Update execution statistics.""" self._total_tasks += 1 if result.status == TaskStatus.COMPLETED: self._completed_tasks += 1 self._total_execution_time += result.execution_time elif result.status == TaskStatus.FAILED: self._failed_tasks += 1 def get_stats(self) -> Dict[str, Any]: """Get executor statistics.""" resource_stats = self.resource_manager.get_stats() average_execution_time = ( self._total_execution_time / self._completed_tasks if self._completed_tasks > 0 else 0.0 ) return { "executor_stats": { "total_tasks": self._total_tasks, "completed_tasks": self._completed_tasks, "failed_tasks": self._failed_tasks, "pending_tasks": 0, "average_execution_time": average_execution_time, "total_execution_time": self._total_execution_time, }, "resource_stats": resource_stats, } def get_resource_info(self) -> List[ResourceInfo]: """Get information about all resources.""" return self.resource_manager.get_resource_info()High-level task executor that manages task execution using resources.
This class provides a simple interface for executing tasks while handling resource management, retries, and monitoring internally.
Methods
async def execute(self,
entrypoint: Callable,
task_type: TaskType = TaskType.JOB,
timeout: float = 300.0,
retry_count: int = 3,
priority: int = 0,
*args,
**kwargs) ‑> TaskResult-
Expand source code
async def execute( self, entrypoint: Callable, task_type: TaskType = TaskType.JOB, timeout: float = 300.0, retry_count: int = 3, priority: int = 0, *args, **kwargs, ) -> TaskResult: """ Execute a task using the resource manager. Args: entrypoint: Function to execute task_type: Type of task (inference, meeting, job) timeout: Task timeout in seconds retry_count: Number of retries on failure priority: Task priority (higher = higher priority) *args, **kwargs: Arguments to pass to entrypoint Returns: TaskResult with execution results """ task_config = TaskConfig( task_type=task_type, timeout=timeout, retry_count=retry_count, priority=priority, ) # Execute with retries last_error = None for attempt in range(retry_count + 1): try: result = await self.resource_manager.execute_task( task_config, entrypoint, *args, **kwargs ) # Update stats self._update_stats(result) if result.status == TaskStatus.COMPLETED: return result else: last_error = result.error except Exception as e: last_error = str(e) logger.warning(f"Task execution attempt {attempt + 1} failed: {e}") if attempt < retry_count: await asyncio.sleep(1.0 * (attempt + 1)) # Exponential backoff # All retries failed failed_result = TaskResult( task_id=task_config.task_type.value, status=TaskStatus.FAILED, error=f"All {retry_count + 1} attempts failed. Last error: {last_error}", execution_time=0.0, ) self._update_stats(failed_result) return failed_resultExecute a task using the resource manager.
Args
entrypoint- Function to execute
task_type- Type of task (inference, meeting, job)
timeout- Task timeout in seconds
retry_count- Number of retries on failure
priority- Task priority (higher = higher priority)
args, *kwargs: Arguments to pass to entrypoint
Returns
TaskResult with execution results
def get_resource_info(self) ‑> List[ResourceInfo]-
Expand source code
def get_resource_info(self) -> List[ResourceInfo]: """Get information about all resources.""" return self.resource_manager.get_resource_info()Get information about all resources.
def get_stats(self) ‑> Dict[str, Any]-
Expand source code
def get_stats(self) -> Dict[str, Any]: """Get executor statistics.""" resource_stats = self.resource_manager.get_stats() average_execution_time = ( self._total_execution_time / self._completed_tasks if self._completed_tasks > 0 else 0.0 ) return { "executor_stats": { "total_tasks": self._total_tasks, "completed_tasks": self._completed_tasks, "failed_tasks": self._failed_tasks, "pending_tasks": 0, "average_execution_time": average_execution_time, "total_execution_time": self._total_execution_time, }, "resource_stats": resource_stats, }Get executor statistics.
async def start(self)-
Expand source code
async def start(self): """Start the task executor.""" logger.info("Starting task executor") await self.resource_manager.start() logger.info("Task executor started")Start the task executor.
async def stop(self)-
Expand source code
async def stop(self): """Stop the task executor.""" logger.info("Stopping task executor") self._shutdown = True # Stop resource manager await self.resource_manager.stop() logger.info("Task executor stopped")Stop the task executor.
class TaskResult (task_id: str,
status: TaskStatus,
result: Any | None = None,
error: str | None = None,
execution_time: float = 0.0,
memory_used_mb: float = 0.0)-
Expand source code
@dataclass class TaskResult: """Result of a task execution.""" task_id: str status: TaskStatus result: Optional[Any] = None error: Optional[str] = None execution_time: float = 0.0 memory_used_mb: float = 0.0Result of a task execution.
Instance variables
var error : str | Nonevar execution_time : floatvar memory_used_mb : floatvar result : Any | Nonevar status : TaskStatusvar task_id : str
class TaskStatus (*args, **kwds)-
Expand source code
class TaskStatus(Enum): """Status of a task.""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled"Status of a task.
Ancestors
- enum.Enum
Class variables
var CANCELLEDvar COMPLETEDvar FAILEDvar PENDINGvar RUNNING
class TaskType (*args, **kwds)-
Expand source code
class TaskType(Enum): """Type of task to be executed.""" INFERENCE = "inference" # For AI model inference MEETING = "meeting" # For video meeting tasks JOB = "job" # For general job executionType of task to be executed.
Ancestors
- enum.Enum
Class variables
var INFERENCEvar JOBvar MEETING
class ThreadResource (resource_id: str, config: Dict[str, Any])-
Expand source code
class ThreadResource(BaseResource): """ Thread-based resource for task execution. Uses threading for concurrent task execution within the same process. """ def __init__(self, resource_id: str, config: Dict[str, Any]): super().__init__(resource_id, config) self.thread: Optional[threading.Thread] = None self.task_queue: asyncio.Queue = asyncio.Queue() self.result_queue: asyncio.Queue = asyncio.Queue() self.control_queue: asyncio.Queue = asyncio.Queue() self._thread_ready = False self._loop: Optional[asyncio.AbstractEventLoop] = None @property def resource_type(self) -> ResourceType: return ResourceType.THREAD async def _initialize_impl(self) -> None: """Initialize the thread resource.""" # Start the thread self.thread = threading.Thread( target=self._thread_worker, args=( self.resource_id, self.task_queue, self.result_queue, self.control_queue, self.config, ), daemon=True, ) self.thread.start() # Simple readiness check - just wait a bit for thread to start await asyncio.sleep(0.5) # Mark as ready if thread is alive if self.thread.is_alive(): self._thread_ready = True else: raise RuntimeError(f"Thread {self.resource_id} failed to start") async def _execute_task_impl( self, task_id: str, config, entrypoint: Callable, args: tuple, kwargs: dict ) -> Any: """Execute task in the thread.""" if not self._thread_ready: raise RuntimeError(f"Thread {self.resource_id} is not ready") # Send task to thread task_data = { "task_id": task_id, "config": config, "entrypoint": entrypoint, "args": args, "kwargs": kwargs, } await self.task_queue.put(task_data) # Wait for result timeout = config.timeout start_time = time.time() while (time.time() - start_time) < timeout: try: if not self.result_queue.empty(): result_data = await self.result_queue.get() if result_data.get("task_id") == task_id: if result_data.get("status") == "success": return result_data.get("result") else: raise RuntimeError( result_data.get("error", "Unknown error") ) await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error checking task result: {e}") raise TimeoutError(f"Task {task_id} timed out after {timeout}s") async def _shutdown_impl(self) -> None: """Shutdown the thread resource.""" if self.thread and self.thread.is_alive(): # Send shutdown signal await self.control_queue.put({"type": "shutdown"}) # Wait for graceful shutdown timeout = self.config.get("close_timeout", 60.0) start_time = time.time() while self.thread.is_alive() and (time.time() - start_time) < timeout: await asyncio.sleep(0.1) @staticmethod def _thread_worker( resource_id: str, task_queue: asyncio.Queue, result_queue: asyncio.Queue, control_queue: asyncio.Queue, config: Dict[str, Any], ): """Worker function that runs in the thread.""" try: # Set up event loop for this thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) logger.info(f"Thread worker {resource_id} started") async def worker_main(): # Main task processing loop while True: try: # Check for shutdown signal if not control_queue.empty(): message = await control_queue.get() if message.get("type") == "shutdown": break # Check for tasks if not task_queue.empty(): task_data = await task_queue.get() task_id = task_data["task_id"] entrypoint = task_data["entrypoint"] args = task_data.get("args", ()) kwargs = task_data.get("kwargs", {}) try: # Execute the task if asyncio.iscoroutinefunction(entrypoint): result = await entrypoint(*args, **kwargs) else: result = entrypoint(*args, **kwargs) await result_queue.put( { "task_id": task_id, "status": "success", "result": result, } ) except Exception as e: await result_queue.put( { "task_id": task_id, "status": "error", "error": str(e), } ) else: await asyncio.sleep(0.1) except Exception as e: logger.error(f"Error in thread worker {resource_id}: {e}") await asyncio.sleep(1.0) logger.info(f"Thread worker {resource_id} shutting down") # Run the worker loop.run_until_complete(worker_main()) except Exception as e: logger.error(f"Fatal error in thread worker {resource_id}: {e}") finally: if loop and not loop.is_closed(): loop.close()Thread-based resource for task execution.
Uses threading for concurrent task execution within the same process.
Ancestors
- BaseResource
- abc.ABC
Inherited members