Module agents.execution

Execution module for VideoSDK Agents.

This module provides a modern resource management architecture for executing agent tasks using processes and threads with optimized resource allocation.

Sub-modules

agents.execution.base_resource

Base resource class for task execution.

agents.execution.inference_resource

Dedicated inference resource for AI model processing …

agents.execution.resource_manager

Resource manager for task execution.

agents.execution.resources

Concrete resource implementations for process and thread execution.

agents.execution.task_executor

Task executor for high-level task execution.

agents.execution.types

Type definitions for the execution module.

Classes

class DedicatedInferenceResource (resource_id: str, config: Dict[str, Any])
Expand source code
class DedicatedInferenceResource(BaseResource):
    """
    Dedicated inference resource that runs AI models in a separate process.

    This mimics the old IPC system's single shared inference process that
    handles all STT, LLM, TTS, and VAD tasks for all agent jobs.
    """

    def __init__(self, resource_id: str, config: Dict[str, Any]):
        super().__init__(resource_id, config)
        self.process: Optional[Process] = None
        self.parent_conn: Optional[Connection] = None
        self.child_conn: Optional[Connection] = None
        self._process_ready = False
        self._models_cache: Dict[str, Any] = {}

        # Inference-specific configuration
        self.initialize_timeout = config.get("inference_process_timeout", 30.0)
        self.memory_warn_mb = config.get("inference_memory_warn_mb", 1000.0)
        self.ping_interval = config.get("ping_interval", 30.0)

    @property
    def resource_type(self) -> ResourceType:
        return ResourceType.PROCESS

    async def _initialize_impl(self) -> None:
        """Initialize the dedicated inference process."""
        logger.info(f"Initializing dedicated inference process: {self.resource_id}")

        # Create pipe for communication
        self.parent_conn, self.child_conn = Pipe()

        # Start the inference process
        self.process = Process(
            target=self._run_inference_process,
            args=(self.resource_id, self.child_conn, self.config),
            daemon=True,
        )
        self.process.start()

        # Wait for process to be ready
        start_time = time.time()
        while (
            not self._process_ready
            and (time.time() - start_time) < self.initialize_timeout
        ):
            try:
                if self.parent_conn.poll():
                    message = self.parent_conn.recv()
                    if message.get("type") == "ready":
                        self._process_ready = True
                        break
                    elif message.get("type") == "error":
                        raise Exception(
                            f"Inference process error: {message.get('error')}"
                        )

                await asyncio.sleep(0.1)
            except Exception as e:
                logger.warning(f"Error checking inference process readiness: {e}")

        if not self._process_ready:
            raise TimeoutError(
                f"Inference process {self.resource_id} failed to initialize within {self.initialize_timeout}s"
            )

        logger.info(
            f"Dedicated inference process initialized: {self.resource_id} (PID: {self.process.pid})"
        )

    async def _execute_task_impl(
        self, task_id: str, config, entrypoint: Callable, args: tuple, kwargs: dict
    ) -> Any:
        """Execute inference task in the dedicated process."""
        if not self._process_ready:
            raise RuntimeError(f"Inference process {self.resource_id} is not ready")

        # Prepare inference data
        inference_data = {
            "task_id": task_id,
            "task_type": config.task_type.value,
            "model_config": config.data.get("model_config", {}),
            "input_data": config.data.get("input_data", {}),
            "timeout": config.timeout,
        }

        # Send inference request to process
        self.parent_conn.send({"type": "inference", "data": inference_data})

        # Wait for result
        start_time = time.time()
        while (time.time() - start_time) < config.timeout:
            try:
                if self.parent_conn.poll():
                    message = self.parent_conn.recv()
                    if (
                        message.get("type") == "result"
                        and message.get("task_id") == task_id
                    ):
                        if message.get("status") == "success":
                            return message.get("result")
                        else:
                            raise RuntimeError(
                                message.get("error", "Unknown inference error")
                            )
                    elif message.get("type") == "error":
                        raise RuntimeError(
                            message.get("error", "Inference process error")
                        )

                await asyncio.sleep(0.1)
            except Exception as e:
                logger.warning(f"Error checking inference result: {e}")

        raise TimeoutError(
            f"Inference task {task_id} timed out after {config.timeout}s"
        )

    async def _shutdown_impl(self) -> None:
        """Shutdown the dedicated inference process."""
        if self.process and self.process.is_alive():
            # Send shutdown signal
            self.parent_conn.send({"type": "shutdown"})

            # Wait for graceful shutdown
            timeout = self.config.get("close_timeout", 60.0)
            start_time = time.time()

            while self.process.is_alive() and (time.time() - start_time) < timeout:
                await asyncio.sleep(0.1)

            # Force terminate if still alive
            if self.process.is_alive():
                logger.warning(
                    f"Force terminating inference process {self.resource_id}"
                )
                self.process.terminate()
                self.process.join(timeout=5.0)

                if self.process.is_alive():
                    self.process.kill()

    async def health_check(self) -> bool:
        """Perform a health check on the dedicated inference process."""
        try:
            if self._shutdown or not self.process or not self.process.is_alive():
                return False

            # Send ping to inference process
            self.parent_conn.send({"type": "ping"})

            # Wait for ping response
            start_time = time.time()
            timeout = 5.0  # 5 second timeout for health check

            while (time.time() - start_time) < timeout:
                try:
                    if self.parent_conn.poll():
                        message = self.parent_conn.recv()
                        if message.get("type") == "ping_response":
                            # Update last heartbeat
                            self.last_heartbeat = time.time()
                            return True
                        elif message.get("type") == "error":
                            logger.error(
                                f"Inference process error: {message.get('error')}"
                            )
                            return False

                    await asyncio.sleep(0.1)
                except Exception as e:
                    logger.warning(f"Error checking inference process health: {e}")

            # Timeout - process is unresponsive
            logger.warning(f"Inference process {self.resource_id} health check timeout")
            return False

        except Exception as e:
            logger.error(
                f"Health check failed for inference process {self.resource_id}: {e}"
            )
            return False

    @staticmethod
    def _run_inference_process(
        resource_id: str, conn: Connection, config: Dict[str, Any]
    ):
        """Run the inference process in a separate process."""
        try:
            # Set up logging
            logging.basicConfig(level=logging.INFO)
            logger.info(
                f"Inference process started: {resource_id} (PID: {os.getpid()})"
            )

            # Set up signal handlers
            def signal_handler(signum, frame):
                logger.info("Received shutdown signal")
                conn.send({"type": "shutdown_ack"})
                sys.exit(0)

            signal.signal(signal.SIGTERM, signal_handler)
            signal.signal(signal.SIGINT, signal_handler)

            # Send ready signal
            conn.send({"type": "ready"})

            # Model cache for reuse
            models_cache: Dict[str, Any] = {}

            async def main_loop():
                while True:
                    try:
                        if conn.poll(timeout=1.0):
                            message = conn.recv()
                            message_type = message.get("type")

                            if message_type == "inference":
                                await _handle_inference(
                                    conn, message.get("data", {}), models_cache
                                )
                            elif message_type == "ping":
                                await _handle_ping(conn)
                            elif message_type == "shutdown":
                                logger.info("Received shutdown request")
                                conn.send({"type": "shutdown_ack"})
                                break
                            else:
                                logger.warning(f"Unknown message type: {message_type}")
                    except Exception as e:
                        logger.error(f"Error in inference process main loop: {e}")
                        conn.send({"type": "error", "error": str(e)})

            asyncio.run(main_loop())

        except Exception as e:
            logger.error(f"Fatal error in inference process: {e}")
            conn.send({"type": "error", "error": str(e)})
            sys.exit(1)
        finally:
            logger.info("Inference process shutting down")
            conn.close()

Dedicated inference resource that runs AI models in a separate process.

This mimics the old IPC system's single shared inference process that handles all STT, LLM, TTS, and VAD tasks for all agent jobs.

Ancestors

Methods

async def health_check(self) ‑> bool
Expand source code
async def health_check(self) -> bool:
    """Perform a health check on the dedicated inference process."""
    try:
        if self._shutdown or not self.process or not self.process.is_alive():
            return False

        # Send ping to inference process
        self.parent_conn.send({"type": "ping"})

        # Wait for ping response
        start_time = time.time()
        timeout = 5.0  # 5 second timeout for health check

        while (time.time() - start_time) < timeout:
            try:
                if self.parent_conn.poll():
                    message = self.parent_conn.recv()
                    if message.get("type") == "ping_response":
                        # Update last heartbeat
                        self.last_heartbeat = time.time()
                        return True
                    elif message.get("type") == "error":
                        logger.error(
                            f"Inference process error: {message.get('error')}"
                        )
                        return False

                await asyncio.sleep(0.1)
            except Exception as e:
                logger.warning(f"Error checking inference process health: {e}")

        # Timeout - process is unresponsive
        logger.warning(f"Inference process {self.resource_id} health check timeout")
        return False

    except Exception as e:
        logger.error(
            f"Health check failed for inference process {self.resource_id}: {e}"
        )
        return False

Perform a health check on the dedicated inference process.

Inherited members

class ExecutorType (*args, **kwds)
Expand source code
class ExecutorType(Enum):
    """Type of executor for task processing."""

    PROCESS = "process"
    THREAD = "thread"

Type of executor for task processing.

Ancestors

  • enum.Enum

Class variables

var PROCESS
var THREAD
class HealthMetrics (resource_id: str,
timestamp: float,
memory_usage_mb: float,
cpu_usage_percent: float,
active_tasks: int,
response_time_ms: float,
error_count: int = 0,
success_count: int = 0)
Expand source code
@dataclass
class HealthMetrics:
    """Health metrics for resource monitoring."""

    resource_id: str
    timestamp: float
    memory_usage_mb: float
    cpu_usage_percent: float
    active_tasks: int
    response_time_ms: float
    error_count: int = 0
    success_count: int = 0

Health metrics for resource monitoring.

Instance variables

var active_tasks : int
var cpu_usage_percent : float
var error_count : int
var memory_usage_mb : float
var resource_id : str
var response_time_ms : float
var success_count : int
var timestamp : float
class ProcessResource (resource_id: str, config: Dict[str, Any])
Expand source code
class ProcessResource(BaseResource):
    """
    Process-based resource for task execution.

    Uses multiprocessing to create isolated processes for task execution.
    """

    def __init__(self, resource_id: str, config: Dict[str, Any]):
        super().__init__(resource_id, config)
        self.process: Optional[Process] = None
        self.task_queue: Optional[Queue] = None
        self.result_queue: Optional[Queue] = None
        self.control_queue: Optional[Queue] = None
        self._process_ready = False

    @property
    def resource_type(self) -> ResourceType:
        return ResourceType.PROCESS

    async def _initialize_impl(self) -> None:
        """Initialize the process resource."""
        # Create queues for communication
        self.task_queue = Queue()
        self.result_queue = Queue()
        self.control_queue = Queue()

        # Start the process
        self.process = Process(
            target=self._process_worker,
            args=(
                self.resource_id,
                self.task_queue,
                self.result_queue,
                self.control_queue,
                self.config,
            ),
            daemon=True,
        )
        self.process.start()

        # Wait for process to be ready
        timeout = self.config.get("initialize_timeout", 10.0)
        start_time = time.time()

        while not self._process_ready and (time.time() - start_time) < timeout:
            try:
                # Check for ready signal
                if not self.control_queue.empty():
                    message = self.control_queue.get_nowait()
                    if message.get("type") == "ready":
                        self._process_ready = True
                        break

                await asyncio.sleep(0.1)
            except Exception as e:
                logger.warning(f"Error checking process readiness: {e}")

        if not self._process_ready:
            raise TimeoutError(
                f"Process {self.resource_id} failed to initialize within {timeout}s"
            )

    async def _execute_task_impl(
        self, task_id: str, config, entrypoint: Callable, args: tuple, kwargs: dict
    ) -> Any:
        """Execute task in the process."""
        if not self._process_ready:
            raise RuntimeError(f"Process {self.resource_id} is not ready")

        # Send task to process
        task_data = {
            "task_id": task_id,
            "config": config,
            "entrypoint_name": entrypoint.__name__,
            "args": args,
            "kwargs": kwargs,
        }

        self.task_queue.put(task_data)

        # Wait for result
        timeout = config.timeout
        start_time = time.time()

        while (time.time() - start_time) < timeout:
            try:
                if not self.result_queue.empty():
                    result_data = self.result_queue.get_nowait()
                    if result_data.get("task_id") == task_id:
                        if result_data.get("status") == "success":
                            return result_data.get("result")
                        else:
                            raise RuntimeError(
                                result_data.get("error", "Unknown error")
                            )

                await asyncio.sleep(0.1)
            except Exception as e:
                logger.warning(f"Error checking task result: {e}")

        raise TimeoutError(f"Task {task_id} timed out after {timeout}s")

    async def _shutdown_impl(self) -> None:
        """Shutdown the process resource."""
        if self.process and self.process.is_alive():
            # Send shutdown signal
            self.control_queue.put({"type": "shutdown"})

            # Wait for graceful shutdown
            timeout = self.config.get("close_timeout", 60.0)
            start_time = time.time()

            while self.process.is_alive() and (time.time() - start_time) < timeout:
                await asyncio.sleep(0.1)

            # Force terminate if still alive
            if self.process.is_alive():
                logger.warning(f"Force terminating process {self.resource_id}")
                self.process.terminate()
                self.process.join(timeout=5.0)

                if self.process.is_alive():
                    self.process.kill()

    @staticmethod
    def _process_worker(
        resource_id: str,
        task_queue: Queue,
        result_queue: Queue,
        control_queue: Queue,
        config: Dict[str, Any],
    ):
        """Worker function that runs in the process."""
        try:
            logger.info(f"Process worker {resource_id} started")

            # Signal ready
            control_queue.put({"type": "ready"})

            # Main task processing loop
            while True:
                try:
                    # Check for shutdown signal
                    if not control_queue.empty():
                        message = control_queue.get_nowait()
                        if message.get("type") == "shutdown":
                            break

                    # Check for tasks
                    if not task_queue.empty():
                        task_data = task_queue.get_nowait()
                        task_id = task_data["task_id"]

                        try:
                            # Execute the task
                            # Note: In a real implementation, you'd need to serialize/deserialize the entrypoint
                            # For now, we'll use a simple approach
                            result = {"status": "completed", "task_id": task_id}
                            result_queue.put(
                                {
                                    "task_id": task_id,
                                    "status": "success",
                                    "result": result,
                                }
                            )
                        except Exception as e:
                            result_queue.put(
                                {"task_id": task_id, "status": "error", "error": str(e)}
                            )
                    else:
                        time.sleep(0.1)

                except Exception as e:
                    logger.error(f"Error in process worker {resource_id}: {e}")
                    time.sleep(1.0)

            logger.info(f"Process worker {resource_id} shutting down")

        except Exception as e:
            logger.error(f"Fatal error in process worker {resource_id}: {e}")

Process-based resource for task execution.

Uses multiprocessing to create isolated processes for task execution.

Ancestors

Inherited members

class ResourceConfig (resource_type: ResourceType = ResourceType.PROCESS,
num_idle_resources: int = 2,
max_resources: int = 10,
initialize_timeout: float = 10.0,
close_timeout: float = 60.0,
memory_warn_mb: float = 500.0,
memory_limit_mb: float = 0.0,
ping_interval: float = 30.0,
health_check_interval: float = 5.0,
load_threshold: float = 0.75,
max_concurrent_tasks: int = 1,
executor_type: ExecutorType = ExecutorType.PROCESS,
use_dedicated_inference_process: bool = True,
inference_process_timeout: float = 30.0,
inference_memory_warn_mb: float = 1000.0)
Expand source code
@dataclass
class ResourceConfig:
    """Configuration for resource management."""

    # Resource type and count
    resource_type: ResourceType = ResourceType.PROCESS
    num_idle_resources: int = 2
    max_resources: int = 10

    # Timeouts
    initialize_timeout: float = 10.0
    close_timeout: float = 60.0

    # Memory management
    memory_warn_mb: float = 500.0
    memory_limit_mb: float = 0.0

    # Health monitoring
    ping_interval: float = 30.0
    health_check_interval: float = 5.0

    # Load balancing
    load_threshold: float = 0.75
    max_concurrent_tasks: int = 1

    # Platform-specific
    executor_type: ExecutorType = _default_executor_type

    # Legacy IPC compatibility
    use_dedicated_inference_process: bool = (
        True  # New: Enable dedicated inference process
    )
    inference_process_timeout: float = 30.0  # Longer timeout for AI model loading
    inference_memory_warn_mb: float = 1000.0  # Higher threshold for AI models

Configuration for resource management.

Instance variables

var close_timeout : float
var executor_typeExecutorType
var health_check_interval : float
var inference_memory_warn_mb : float
var inference_process_timeout : float
var initialize_timeout : float
var load_threshold : float
var max_concurrent_tasks : int
var max_resources : int
var memory_limit_mb : float
var memory_warn_mb : float
var num_idle_resources : int
var ping_interval : float
var resource_typeResourceType
var use_dedicated_inference_process : bool
class ResourceInfo (resource_id: str,
resource_type: ResourceType,
status: ResourceStatus,
current_load: float = 0.0,
memory_usage_mb: float = 0.0,
cpu_usage_percent: float = 0.0,
active_tasks: int = 0,
total_tasks_processed: int = 0,
last_heartbeat: float = 0.0,
metadata: Dict[str, Any] = <factory>)
Expand source code
@dataclass
class ResourceInfo:
    """Information about a resource."""

    resource_id: str
    resource_type: ResourceType
    status: ResourceStatus
    current_load: float = 0.0
    memory_usage_mb: float = 0.0
    cpu_usage_percent: float = 0.0
    active_tasks: int = 0
    total_tasks_processed: int = 0
    last_heartbeat: float = 0.0

    # Resource-specific metadata
    metadata: Dict[str, Any] = field(default_factory=dict)

Information about a resource.

Instance variables

var active_tasks : int
var cpu_usage_percent : float
var current_load : float
var last_heartbeat : float
var memory_usage_mb : float
var metadata : Dict[str, Any]
var resource_id : str
var resource_typeResourceType
var statusResourceStatus
var total_tasks_processed : int
class ResourceManager (config: ResourceConfig)
Expand source code
class ResourceManager:
    """
    Manages resources for task execution.

    This class handles:
    - Resource creation and lifecycle management
    - Load balancing across resources
    - Health monitoring and recovery
    - Resource allocation for tasks
    - Dedicated inference process management (legacy IPC compatibility)
    """

    def __init__(self, config: ResourceConfig):
        self.config = config
        self.resources: List[BaseResource] = []
        self._shutdown = False
        self._health_check_task: Optional[asyncio.Task] = None
        self._resource_creation_task: Optional[asyncio.Task] = None

        # Dedicated inference resource (legacy IPC compatibility)
        self.dedicated_inference_resource: Optional[DedicatedInferenceResource] = None

    async def start(self):
        """Start the resource manager."""
        logger.info("Starting resource manager")

        # Create dedicated inference resource if enabled
        if self.config.use_dedicated_inference_process:
            await self._create_dedicated_inference_resource()

        # Start health monitoring
        self._health_check_task = asyncio.create_task(self._health_check_loop())

        # Start resource creation
        self._resource_creation_task = asyncio.create_task(
            self._resource_creation_loop()
        )

        # Initialize initial resources
        await self._create_initial_resources()

        logger.info("Resource manager started")

    async def stop(self):
        """Stop the resource manager."""
        logger.info("Stopping resource manager")
        self._shutdown = True

        # Cancel background tasks
        if self._health_check_task:
            self._health_check_task.cancel()
        if self._resource_creation_task:
            self._resource_creation_task.cancel()

        # Shutdown all resources
        shutdown_tasks = []
        for resource in self.resources:
            shutdown_tasks.append(resource.shutdown())

        # Shutdown dedicated inference resource
        if self.dedicated_inference_resource:
            shutdown_tasks.append(self.dedicated_inference_resource.shutdown())

        if shutdown_tasks:
            await asyncio.gather(*shutdown_tasks, return_exceptions=True)

        logger.info("Resource manager stopped")

    async def _create_dedicated_inference_resource(self):
        """Create the dedicated inference resource (legacy IPC compatibility)."""
        logger.info("Creating dedicated inference resource")

        inference_config = {
            "inference_process_timeout": self.config.inference_process_timeout,
            "inference_memory_warn_mb": self.config.inference_memory_warn_mb,
            "ping_interval": self.config.ping_interval,
            "close_timeout": self.config.close_timeout,
        }

        self.dedicated_inference_resource = DedicatedInferenceResource(
            resource_id="dedicated-inference", config=inference_config
        )

        await self.dedicated_inference_resource.initialize()
        logger.info("Dedicated inference resource created")

    async def _create_initial_resources(self):
        """Create initial resources based on configuration."""
        initial_count = self.config.num_idle_resources
        logger.info(
            f"Creating {initial_count} initial {self.config.resource_type.value} resources"
        )

        for i in range(initial_count):
            await self._create_resource(self.config.resource_type)

    async def _create_resource(self, resource_type: ResourceType) -> BaseResource:
        """Create a new resource of the specified type."""
        resource_id = f"{resource_type.value}-{uuid.uuid4().hex[:8]}"

        config = {
            "max_concurrent_tasks": self.config.max_concurrent_tasks,
            "initialize_timeout": self.config.initialize_timeout,
            "close_timeout": self.config.close_timeout,
            "health_check_interval": self.config.health_check_interval,
        }

        if resource_type == ResourceType.PROCESS:
            resource = ProcessResource(resource_id, config)
        elif resource_type == ResourceType.THREAD:
            resource = ThreadResource(resource_id, config)
        else:
            raise ValueError(f"Unsupported resource type: {resource_type}")

        # Initialize the resource
        await resource.initialize()

        # Add to resources list
        self.resources.append(resource)

        logger.info(f"Created {resource_type.value} resource: {resource_id}")
        return resource

    async def _resource_creation_loop(self):
        """Background loop for creating resources as needed."""
        # Wait a bit longer before starting the loop to allow initial resources to stabilize
        await asyncio.sleep(10.0)

        while not self._shutdown:
            try:
                # Check if we need more resources
                available_count = len([r for r in self.resources if r.is_available])
                total_count = len(self.resources)

                # Create more resources if needed
                if (
                    available_count < self.config.num_idle_resources
                    and total_count < self.config.max_resources
                ):
                    logger.info(
                        f"Creating additional {self.config.resource_type.value} resource"
                    )
                    await self._create_resource(self.config.resource_type)

                await asyncio.sleep(10.0)  # Check every 10 seconds instead of 5

            except Exception as e:
                logger.error(f"Error in resource creation loop: {e}")
                await asyncio.sleep(5.0)

    async def _health_check_loop(self):
        """Background loop for health monitoring."""
        while not self._shutdown:
            try:
                # Check job resources
                for resource in self.resources[
                    :
                ]:  # Copy list to avoid modification during iteration
                    try:
                        is_healthy = await resource.health_check()
                        if not is_healthy:
                            logger.warning(
                                f"Unhealthy resource detected: {resource.resource_id}"
                            )
                            # Remove unhealthy resource
                            self.resources.remove(resource)
                            await resource.shutdown()

                            # Create replacement if needed
                            if len(self.resources) < self.config.num_idle_resources:
                                await self._create_resource(self.config.resource_type)

                    except Exception as e:
                        logger.error(
                            f"Health check failed for {resource.resource_id}: {e}"
                        )

                # Check dedicated inference resource
                if self.dedicated_inference_resource:
                    try:
                        is_healthy = (
                            await self.dedicated_inference_resource.health_check()
                        )
                        if not is_healthy:
                            logger.warning(
                                "Unhealthy dedicated inference resource detected"
                            )
                            # Recreate inference resource
                            await self.dedicated_inference_resource.shutdown()
                            await self._create_dedicated_inference_resource()
                    except Exception as e:
                        logger.error(
                            f"Health check failed for dedicated inference resource: {e}"
                        )

                await asyncio.sleep(self.config.health_check_interval)

            except Exception as e:
                logger.error(f"Error in health check loop: {e}")
                await asyncio.sleep(5.0)

    async def execute_task(
        self, task_config: TaskConfig, entrypoint: Callable, *args, **kwargs
    ) -> TaskResult:
        """Execute a task using an available resource."""
        task_id = str(uuid.uuid4())

        # Route inference tasks to dedicated inference resource
        if (
            task_config.task_type == TaskType.INFERENCE
            and self.dedicated_inference_resource
        ):
            logger.info(
                f"Routing inference task {task_id} to dedicated inference resource"
            )
            return await self.dedicated_inference_resource.execute_task(
                task_id, task_config, entrypoint, args, kwargs
            )

        # Route other tasks to job resources
        resource = await self._get_available_resource(task_config.task_type)
        if not resource:
            raise RuntimeError("No available resources for task execution")

        # Execute the task
        return await resource.execute_task(
            task_id, task_config, entrypoint, args, kwargs
        )

    async def _get_available_resource(
        self, task_type: TaskType
    ) -> Optional[BaseResource]:
        """Get an available resource for task execution."""
        # For now, use simple round-robin selection
        # In the future, this could be enhanced with load balancing, priority, etc.

        available_resources = [r for r in self.resources if r.is_available]

        if available_resources:
            # Simple round-robin selection
            # In a real implementation, you might want more sophisticated load balancing
            return available_resources[0]

        return None

    def get_stats(self) -> Dict[str, Any]:
        """Get resource manager statistics."""
        available_resources = [r for r in self.resources if r.is_available]
        active_resources = [
            r for r in self.resources if r.status != ResourceStatus.IDLE
        ]
        total_resources = len(self.resources)

        average_load = (
            len(active_resources) / total_resources if total_resources > 0 else 0.0
        )

        stats = {
            "total_resources": total_resources,
            "available_resources": len(available_resources),
            "active_resources": len(active_resources),
            "average_load": average_load,
            "resources": [
                {
                    "resource_id": r.get_info().resource_id,
                    "resource_type": r.get_info().resource_type.value,
                    "status": r.get_info().status.value,
                    "current_load": r.get_info().current_load,
                    "memory_usage_mb": r.get_info().memory_usage_mb,
                    "cpu_usage_percent": r.get_info().cpu_usage_percent,
                    "active_tasks": r.get_info().active_tasks,
                    "total_tasks_processed": r.get_info().total_tasks_processed,
                    "last_heartbeat": r.get_info().last_heartbeat,
                    "metadata": r.get_info().metadata,
                }
                for r in self.resources
            ],
            "dedicated_inference": None,
        }

        # Dedicated inference resource stats
        if self.dedicated_inference_resource:
            info = self.dedicated_inference_resource.get_info()
            stats["dedicated_inference"] = {
                "resource_id": info.resource_id,
                "resource_type": info.resource_type.value,
                "status": info.status.value,
                "current_load": info.current_load,
                "memory_usage_mb": info.memory_usage_mb,
                "cpu_usage_percent": info.cpu_usage_percent,
                "active_tasks": info.active_tasks,
                "total_tasks_processed": info.total_tasks_processed,
                "last_heartbeat": info.last_heartbeat,
                "metadata": info.metadata,
            }

        return stats

    def get_resource_info(self) -> List[ResourceInfo]:
        """Get information about all resources."""
        resource_info = []

        # Job resources
        for resource in self.resources:
            resource_info.append(resource.get_info())

        # Dedicated inference resource
        if self.dedicated_inference_resource:
            resource_info.append(self.dedicated_inference_resource.get_info())

        return resource_info

Manages resources for task execution.

This class handles: - Resource creation and lifecycle management - Load balancing across resources - Health monitoring and recovery - Resource allocation for tasks - Dedicated inference process management (legacy IPC compatibility)

Methods

async def execute_task(self,
task_config: TaskConfig,
entrypoint: Callable,
*args,
**kwargs) ‑> TaskResult
Expand source code
async def execute_task(
    self, task_config: TaskConfig, entrypoint: Callable, *args, **kwargs
) -> TaskResult:
    """Execute a task using an available resource."""
    task_id = str(uuid.uuid4())

    # Route inference tasks to dedicated inference resource
    if (
        task_config.task_type == TaskType.INFERENCE
        and self.dedicated_inference_resource
    ):
        logger.info(
            f"Routing inference task {task_id} to dedicated inference resource"
        )
        return await self.dedicated_inference_resource.execute_task(
            task_id, task_config, entrypoint, args, kwargs
        )

    # Route other tasks to job resources
    resource = await self._get_available_resource(task_config.task_type)
    if not resource:
        raise RuntimeError("No available resources for task execution")

    # Execute the task
    return await resource.execute_task(
        task_id, task_config, entrypoint, args, kwargs
    )

Execute a task using an available resource.

def get_resource_info(self) ‑> List[ResourceInfo]
Expand source code
def get_resource_info(self) -> List[ResourceInfo]:
    """Get information about all resources."""
    resource_info = []

    # Job resources
    for resource in self.resources:
        resource_info.append(resource.get_info())

    # Dedicated inference resource
    if self.dedicated_inference_resource:
        resource_info.append(self.dedicated_inference_resource.get_info())

    return resource_info

Get information about all resources.

def get_stats(self) ‑> Dict[str, Any]
Expand source code
def get_stats(self) -> Dict[str, Any]:
    """Get resource manager statistics."""
    available_resources = [r for r in self.resources if r.is_available]
    active_resources = [
        r for r in self.resources if r.status != ResourceStatus.IDLE
    ]
    total_resources = len(self.resources)

    average_load = (
        len(active_resources) / total_resources if total_resources > 0 else 0.0
    )

    stats = {
        "total_resources": total_resources,
        "available_resources": len(available_resources),
        "active_resources": len(active_resources),
        "average_load": average_load,
        "resources": [
            {
                "resource_id": r.get_info().resource_id,
                "resource_type": r.get_info().resource_type.value,
                "status": r.get_info().status.value,
                "current_load": r.get_info().current_load,
                "memory_usage_mb": r.get_info().memory_usage_mb,
                "cpu_usage_percent": r.get_info().cpu_usage_percent,
                "active_tasks": r.get_info().active_tasks,
                "total_tasks_processed": r.get_info().total_tasks_processed,
                "last_heartbeat": r.get_info().last_heartbeat,
                "metadata": r.get_info().metadata,
            }
            for r in self.resources
        ],
        "dedicated_inference": None,
    }

    # Dedicated inference resource stats
    if self.dedicated_inference_resource:
        info = self.dedicated_inference_resource.get_info()
        stats["dedicated_inference"] = {
            "resource_id": info.resource_id,
            "resource_type": info.resource_type.value,
            "status": info.status.value,
            "current_load": info.current_load,
            "memory_usage_mb": info.memory_usage_mb,
            "cpu_usage_percent": info.cpu_usage_percent,
            "active_tasks": info.active_tasks,
            "total_tasks_processed": info.total_tasks_processed,
            "last_heartbeat": info.last_heartbeat,
            "metadata": info.metadata,
        }

    return stats

Get resource manager statistics.

async def start(self)
Expand source code
async def start(self):
    """Start the resource manager."""
    logger.info("Starting resource manager")

    # Create dedicated inference resource if enabled
    if self.config.use_dedicated_inference_process:
        await self._create_dedicated_inference_resource()

    # Start health monitoring
    self._health_check_task = asyncio.create_task(self._health_check_loop())

    # Start resource creation
    self._resource_creation_task = asyncio.create_task(
        self._resource_creation_loop()
    )

    # Initialize initial resources
    await self._create_initial_resources()

    logger.info("Resource manager started")

Start the resource manager.

async def stop(self)
Expand source code
async def stop(self):
    """Stop the resource manager."""
    logger.info("Stopping resource manager")
    self._shutdown = True

    # Cancel background tasks
    if self._health_check_task:
        self._health_check_task.cancel()
    if self._resource_creation_task:
        self._resource_creation_task.cancel()

    # Shutdown all resources
    shutdown_tasks = []
    for resource in self.resources:
        shutdown_tasks.append(resource.shutdown())

    # Shutdown dedicated inference resource
    if self.dedicated_inference_resource:
        shutdown_tasks.append(self.dedicated_inference_resource.shutdown())

    if shutdown_tasks:
        await asyncio.gather(*shutdown_tasks, return_exceptions=True)

    logger.info("Resource manager stopped")

Stop the resource manager.

class ResourceStatus (*args, **kwds)
Expand source code
class ResourceStatus(Enum):
    """Status of a resource."""

    IDLE = "idle"
    BUSY = "busy"
    INITIALIZING = "initializing"
    SHUTTING_DOWN = "shutting_down"
    ERROR = "error"

Status of a resource.

Ancestors

  • enum.Enum

Class variables

var BUSY
var ERROR
var IDLE
var INITIALIZING
var SHUTTING_DOWN
class ResourceType (*args, **kwds)
Expand source code
class ResourceType(Enum):
    """Type of resource for task execution."""

    PROCESS = "process"
    THREAD = "thread"

Type of resource for task execution.

Ancestors

  • enum.Enum

Class variables

var PROCESS
var THREAD
class TaskConfig (task_type: TaskType,
timeout: float = 300.0,
retry_count: int = 3,
priority: int = 0,
required_memory_mb: float = 100.0,
required_cpu_cores: int = 1,
data: Dict[str, Any] = <factory>)
Expand source code
@dataclass
class TaskConfig:
    """Configuration for task execution."""

    task_type: TaskType
    timeout: float = 300.0  # 5 minutes default
    retry_count: int = 3
    priority: int = 0  # Higher number = higher priority

    # Resource requirements
    required_memory_mb: float = 100.0
    required_cpu_cores: int = 1

    # Task-specific data
    data: Dict[str, Any] = field(default_factory=dict)

Configuration for task execution.

Instance variables

var data : Dict[str, Any]
var priority : int
var required_cpu_cores : int
var required_memory_mb : float
var retry_count : int
var task_typeTaskType
var timeout : float
class TaskExecutor (config: ResourceConfig)
Expand source code
class TaskExecutor:
    """
    High-level task executor that manages task execution using resources.

    This class provides a simple interface for executing tasks while
    handling resource management, retries, and monitoring internally.
    """

    def __init__(self, config: ResourceConfig):
        self.config = config
        self.resource_manager = ResourceManager(config)
        self._shutdown = False
        self._total_tasks = 0
        self._completed_tasks = 0
        self._failed_tasks = 0
        self._total_execution_time = 0.0

    async def start(self):
        """Start the task executor."""
        logger.info("Starting task executor")
        await self.resource_manager.start()
        logger.info("Task executor started")

    async def stop(self):
        """Stop the task executor."""
        logger.info("Stopping task executor")
        self._shutdown = True

        # Stop resource manager
        await self.resource_manager.stop()
        logger.info("Task executor stopped")

    async def execute(
        self,
        entrypoint: Callable,
        task_type: TaskType = TaskType.JOB,
        timeout: float = 300.0,
        retry_count: int = 3,
        priority: int = 0,
        *args,
        **kwargs,
    ) -> TaskResult:
        """
        Execute a task using the resource manager.

        Args:
            entrypoint: Function to execute
            task_type: Type of task (inference, meeting, job)
            timeout: Task timeout in seconds
            retry_count: Number of retries on failure
            priority: Task priority (higher = higher priority)
            *args, **kwargs: Arguments to pass to entrypoint

        Returns:
            TaskResult with execution results
        """
        task_config = TaskConfig(
            task_type=task_type,
            timeout=timeout,
            retry_count=retry_count,
            priority=priority,
        )

        # Execute with retries
        last_error = None
        for attempt in range(retry_count + 1):
            try:
                result = await self.resource_manager.execute_task(
                    task_config, entrypoint, *args, **kwargs
                )

                # Update stats
                self._update_stats(result)

                if result.status == TaskStatus.COMPLETED:
                    return result
                else:
                    last_error = result.error

            except Exception as e:
                last_error = str(e)
                logger.warning(f"Task execution attempt {attempt + 1} failed: {e}")

                if attempt < retry_count:
                    await asyncio.sleep(1.0 * (attempt + 1))  # Exponential backoff

        # All retries failed
        failed_result = TaskResult(
            task_id=task_config.task_type.value,
            status=TaskStatus.FAILED,
            error=f"All {retry_count + 1} attempts failed. Last error: {last_error}",
            execution_time=0.0,
        )

        self._update_stats(failed_result)
        return failed_result

    def _update_stats(self, result: TaskResult):
        """Update execution statistics."""
        self._total_tasks += 1

        if result.status == TaskStatus.COMPLETED:
            self._completed_tasks += 1
            self._total_execution_time += result.execution_time
        elif result.status == TaskStatus.FAILED:
            self._failed_tasks += 1

    def get_stats(self) -> Dict[str, Any]:
        """Get executor statistics."""
        resource_stats = self.resource_manager.get_stats()

        average_execution_time = (
            self._total_execution_time / self._completed_tasks
            if self._completed_tasks > 0
            else 0.0
        )

        return {
            "executor_stats": {
                "total_tasks": self._total_tasks,
                "completed_tasks": self._completed_tasks,
                "failed_tasks": self._failed_tasks,
                "pending_tasks": 0,
                "average_execution_time": average_execution_time,
                "total_execution_time": self._total_execution_time,
            },
            "resource_stats": resource_stats,
        }

    def get_resource_info(self) -> List[ResourceInfo]:
        """Get information about all resources."""
        return self.resource_manager.get_resource_info()

High-level task executor that manages task execution using resources.

This class provides a simple interface for executing tasks while handling resource management, retries, and monitoring internally.

Methods

async def execute(self,
entrypoint: Callable,
task_type: TaskType = TaskType.JOB,
timeout: float = 300.0,
retry_count: int = 3,
priority: int = 0,
*args,
**kwargs) ‑> TaskResult
Expand source code
async def execute(
    self,
    entrypoint: Callable,
    task_type: TaskType = TaskType.JOB,
    timeout: float = 300.0,
    retry_count: int = 3,
    priority: int = 0,
    *args,
    **kwargs,
) -> TaskResult:
    """
    Execute a task using the resource manager.

    Args:
        entrypoint: Function to execute
        task_type: Type of task (inference, meeting, job)
        timeout: Task timeout in seconds
        retry_count: Number of retries on failure
        priority: Task priority (higher = higher priority)
        *args, **kwargs: Arguments to pass to entrypoint

    Returns:
        TaskResult with execution results
    """
    task_config = TaskConfig(
        task_type=task_type,
        timeout=timeout,
        retry_count=retry_count,
        priority=priority,
    )

    # Execute with retries
    last_error = None
    for attempt in range(retry_count + 1):
        try:
            result = await self.resource_manager.execute_task(
                task_config, entrypoint, *args, **kwargs
            )

            # Update stats
            self._update_stats(result)

            if result.status == TaskStatus.COMPLETED:
                return result
            else:
                last_error = result.error

        except Exception as e:
            last_error = str(e)
            logger.warning(f"Task execution attempt {attempt + 1} failed: {e}")

            if attempt < retry_count:
                await asyncio.sleep(1.0 * (attempt + 1))  # Exponential backoff

    # All retries failed
    failed_result = TaskResult(
        task_id=task_config.task_type.value,
        status=TaskStatus.FAILED,
        error=f"All {retry_count + 1} attempts failed. Last error: {last_error}",
        execution_time=0.0,
    )

    self._update_stats(failed_result)
    return failed_result

Execute a task using the resource manager.

Args

entrypoint
Function to execute
task_type
Type of task (inference, meeting, job)
timeout
Task timeout in seconds
retry_count
Number of retries on failure
priority
Task priority (higher = higher priority)

args, *kwargs: Arguments to pass to entrypoint

Returns

TaskResult with execution results

def get_resource_info(self) ‑> List[ResourceInfo]
Expand source code
def get_resource_info(self) -> List[ResourceInfo]:
    """Get information about all resources."""
    return self.resource_manager.get_resource_info()

Get information about all resources.

def get_stats(self) ‑> Dict[str, Any]
Expand source code
def get_stats(self) -> Dict[str, Any]:
    """Get executor statistics."""
    resource_stats = self.resource_manager.get_stats()

    average_execution_time = (
        self._total_execution_time / self._completed_tasks
        if self._completed_tasks > 0
        else 0.0
    )

    return {
        "executor_stats": {
            "total_tasks": self._total_tasks,
            "completed_tasks": self._completed_tasks,
            "failed_tasks": self._failed_tasks,
            "pending_tasks": 0,
            "average_execution_time": average_execution_time,
            "total_execution_time": self._total_execution_time,
        },
        "resource_stats": resource_stats,
    }

Get executor statistics.

async def start(self)
Expand source code
async def start(self):
    """Start the task executor."""
    logger.info("Starting task executor")
    await self.resource_manager.start()
    logger.info("Task executor started")

Start the task executor.

async def stop(self)
Expand source code
async def stop(self):
    """Stop the task executor."""
    logger.info("Stopping task executor")
    self._shutdown = True

    # Stop resource manager
    await self.resource_manager.stop()
    logger.info("Task executor stopped")

Stop the task executor.

class TaskResult (task_id: str,
status: TaskStatus,
result: Any | None = None,
error: str | None = None,
execution_time: float = 0.0,
memory_used_mb: float = 0.0)
Expand source code
@dataclass
class TaskResult:
    """Result of a task execution."""

    task_id: str
    status: TaskStatus
    result: Optional[Any] = None
    error: Optional[str] = None
    execution_time: float = 0.0
    memory_used_mb: float = 0.0

Result of a task execution.

Instance variables

var error : str | None
var execution_time : float
var memory_used_mb : float
var result : Any | None
var statusTaskStatus
var task_id : str
class TaskStatus (*args, **kwds)
Expand source code
class TaskStatus(Enum):
    """Status of a task."""

    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

Status of a task.

Ancestors

  • enum.Enum

Class variables

var CANCELLED
var COMPLETED
var FAILED
var PENDING
var RUNNING
class TaskType (*args, **kwds)
Expand source code
class TaskType(Enum):
    """Type of task to be executed."""

    INFERENCE = "inference"  # For AI model inference
    MEETING = "meeting"  # For video meeting tasks
    JOB = "job"  # For general job execution

Type of task to be executed.

Ancestors

  • enum.Enum

Class variables

var INFERENCE
var JOB
var MEETING
class ThreadResource (resource_id: str, config: Dict[str, Any])
Expand source code
class ThreadResource(BaseResource):
    """
    Thread-based resource for task execution.

    Uses threading for concurrent task execution within the same process.
    """

    def __init__(self, resource_id: str, config: Dict[str, Any]):
        super().__init__(resource_id, config)
        self.thread: Optional[threading.Thread] = None
        self.task_queue: asyncio.Queue = asyncio.Queue()
        self.result_queue: asyncio.Queue = asyncio.Queue()
        self.control_queue: asyncio.Queue = asyncio.Queue()
        self._thread_ready = False
        self._loop: Optional[asyncio.AbstractEventLoop] = None

    @property
    def resource_type(self) -> ResourceType:
        return ResourceType.THREAD

    async def _initialize_impl(self) -> None:
        """Initialize the thread resource."""
        # Start the thread
        self.thread = threading.Thread(
            target=self._thread_worker,
            args=(
                self.resource_id,
                self.task_queue,
                self.result_queue,
                self.control_queue,
                self.config,
            ),
            daemon=True,
        )
        self.thread.start()

        # Simple readiness check - just wait a bit for thread to start
        await asyncio.sleep(0.5)

        # Mark as ready if thread is alive
        if self.thread.is_alive():
            self._thread_ready = True
        else:
            raise RuntimeError(f"Thread {self.resource_id} failed to start")

    async def _execute_task_impl(
        self, task_id: str, config, entrypoint: Callable, args: tuple, kwargs: dict
    ) -> Any:
        """Execute task in the thread."""
        if not self._thread_ready:
            raise RuntimeError(f"Thread {self.resource_id} is not ready")

        # Send task to thread
        task_data = {
            "task_id": task_id,
            "config": config,
            "entrypoint": entrypoint,
            "args": args,
            "kwargs": kwargs,
        }

        await self.task_queue.put(task_data)

        # Wait for result
        timeout = config.timeout
        start_time = time.time()

        while (time.time() - start_time) < timeout:
            try:
                if not self.result_queue.empty():
                    result_data = await self.result_queue.get()
                    if result_data.get("task_id") == task_id:
                        if result_data.get("status") == "success":
                            return result_data.get("result")
                        else:
                            raise RuntimeError(
                                result_data.get("error", "Unknown error")
                            )

                await asyncio.sleep(0.1)
            except Exception as e:
                logger.warning(f"Error checking task result: {e}")

        raise TimeoutError(f"Task {task_id} timed out after {timeout}s")

    async def _shutdown_impl(self) -> None:
        """Shutdown the thread resource."""
        if self.thread and self.thread.is_alive():
            # Send shutdown signal
            await self.control_queue.put({"type": "shutdown"})

            # Wait for graceful shutdown
            timeout = self.config.get("close_timeout", 60.0)
            start_time = time.time()

            while self.thread.is_alive() and (time.time() - start_time) < timeout:
                await asyncio.sleep(0.1)

    @staticmethod
    def _thread_worker(
        resource_id: str,
        task_queue: asyncio.Queue,
        result_queue: asyncio.Queue,
        control_queue: asyncio.Queue,
        config: Dict[str, Any],
    ):
        """Worker function that runs in the thread."""
        try:
            # Set up event loop for this thread
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)

            logger.info(f"Thread worker {resource_id} started")

            async def worker_main():
                # Main task processing loop
                while True:
                    try:
                        # Check for shutdown signal
                        if not control_queue.empty():
                            message = await control_queue.get()
                            if message.get("type") == "shutdown":
                                break

                        # Check for tasks
                        if not task_queue.empty():
                            task_data = await task_queue.get()
                            task_id = task_data["task_id"]
                            entrypoint = task_data["entrypoint"]
                            args = task_data.get("args", ())
                            kwargs = task_data.get("kwargs", {})

                            try:
                                # Execute the task
                                if asyncio.iscoroutinefunction(entrypoint):
                                    result = await entrypoint(*args, **kwargs)
                                else:
                                    result = entrypoint(*args, **kwargs)

                                await result_queue.put(
                                    {
                                        "task_id": task_id,
                                        "status": "success",
                                        "result": result,
                                    }
                                )
                            except Exception as e:
                                await result_queue.put(
                                    {
                                        "task_id": task_id,
                                        "status": "error",
                                        "error": str(e),
                                    }
                                )
                        else:
                            await asyncio.sleep(0.1)

                    except Exception as e:
                        logger.error(f"Error in thread worker {resource_id}: {e}")
                        await asyncio.sleep(1.0)

                logger.info(f"Thread worker {resource_id} shutting down")

            # Run the worker
            loop.run_until_complete(worker_main())

        except Exception as e:
            logger.error(f"Fatal error in thread worker {resource_id}: {e}")
        finally:
            if loop and not loop.is_closed():
                loop.close()

Thread-based resource for task execution.

Uses threading for concurrent task execution within the same process.

Ancestors

Inherited members