Module agents.worker

Classes

class JobRequest (job: Any,
on_reject: Callable[[], Any],
on_accept: Callable[[JobAcceptArguments], Any])
Expand source code
@dataclass
class JobRequest:
    """Job request from the backend."""

    job: Any
    on_reject: Callable[[], Any]
    on_accept: Callable[[JobAcceptArguments], Any]

Job request from the backend.

Instance variables

var job : Any
var on_accept : Callable[[JobAcceptArguments], Any]
var on_reject : Callable[[], Any]
class Worker (options: WorkerOptions,
default_room_options: RoomOptions | None = None)
Expand source code
class Worker:
    """
    VideoSDK worker that manages job execution and backend registration.

    def run(self):
        job_context = functools.partial(self.job.jobctx)
        entrypoint = functools.partial(self.job.entrypoint)
        p = multiprocessing.Process(
            target=_job_runner, args=(entrypoint, job_context)
    Automatically selects the appropriate executor type based on platform.
    """

    def __init__(
        self, options: WorkerOptions, default_room_options: Optional[RoomOptions] = None
    ):
        """Initialize the worker."""
        self.options = options
        self.default_room_options = default_room_options
        self._shutdown = False
        self._draining = False
        self._worker_load = 0.0
        self._current_jobs: Dict[str, RunningJobInfo] = {}
        self._tasks: Set[asyncio.Task] = set()
        self.backend_connection: Optional[BackendConnection] = None
        self.process_manager: Optional[TaskExecutor] = (
            None  # Changed from ProcessManager
        )
        self._http_server: Optional[HttpServer] = None

        # Add debounce mechanism for status updates
        self._last_status_update = 0.0
        self._status_update_debounce_seconds = (
            2.0  # Minimum 2 seconds between status updates
        )
        # Initialize tracing
        self._tracing = Tracing.with_handle("worker")
        self._worker_load_graph = Tracing.add_graph(
            title="worker_load",
            x_label="time",
            y_label="load",
            x_type="time",
            y_range=(0, 1),
            max_data_points=1000,
        )

        # Validate configuration
        if not self.options.auth_token:
            raise ValueError(
                "auth_token is required, or add VIDEOSDK_AUTH_TOKEN in your environment"
            )

    @staticmethod
    def run_worker(
        options: WorkerOptions, default_room_options: Optional[RoomOptions] = None
    ):
        """
        Run a VideoSDK worker with the given options.

        This is the main entry point for running a VideoSDK worker,
        providing a high-level interface for worker initialization, job management, and lifecycle control.

        Args:
            options: Worker configuration options
            default_room_options: Optional default room options

        Example:
            ```python
            from videosdk.agents import Worker, WorkerOptions

            def my_agent(job_ctx):
                # Your agent code here
                pass

            # Configure worker with custom log level - logging is automatically configured!
            options = WorkerOptions(
                entrypoint_fnc=my_agent,
                log_level="DEBUG"  # Options: DEBUG, INFO, WARNING, ERROR
            )

            # Run the worker - no manual logging setup needed!
            Worker.run_worker(options)
            ```
        """
        worker = Worker(options, default_room_options=default_room_options)
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        async def main_task():
            try:
                await worker.initialize()
                if options.register:
                    # Backend registration mode
                    await worker._run_backend_mode()
                else:
                    # Default mode - just keep alive
                    while not worker._shutdown:
                        await asyncio.sleep(1)

            except asyncio.CancelledError:
                logger.info("Main task cancelled")
            except Exception as e:
                logger.error(f"Worker error: {e}")
                raise
            finally:
                await worker.shutdown()

        main_future = loop.create_task(main_task())
        shutting_down = False

        def signal_handler(signum, frame):
            nonlocal shutting_down
            if shutting_down:
                # If already shutting down, cancel all tasks more aggressively
                for task in asyncio.all_tasks(loop):
                    task.cancel()
                return
            shutting_down = True
            logger.info(f"Received signal {signum}. Initiating graceful shutdown...")
            # Cancel the main task
            loop.call_soon_threadsafe(main_future.cancel)
            # Set a timeout for graceful shutdown
            loop.call_later(
                3.0, lambda: [task.cancel() for task in asyncio.all_tasks(loop)]
            )

        try:
            signal.signal(signal.SIGINT, signal_handler)
            signal.signal(signal.SIGTERM, signal_handler)

            loop.run_until_complete(main_future)
        except KeyboardInterrupt:
            logger.info("Keyboard interrupt received")
            if not shutting_down:
                shutting_down = True
                if not main_future.done():
                    main_future.cancel()
                loop.run_until_complete(worker.shutdown())
        finally:
            try:
                loop.close()
            except Exception as e:
                logger.error(f"Error closing event loop: {e}")

        if loop.is_closed():
            logger.info("Event loop closed successfully")

    async def initialize(self, default_room_options: Optional[RoomOptions] = None):
        """Initialize the worker."""
        logger.info("Initializing VideoSDK worker")

        # Initialize task executor with new execution architecture
        # Convert ExecutorType to ResourceType
        resource_type = (
            ResourceType.THREAD
            if self.options.executor_type == ExecutorType.THREAD
            else ResourceType.PROCESS
        )

        config = ResourceConfig(
            resource_type=resource_type,
            num_idle_resources=self.options.num_idle_processes,
            max_resources=self.options.max_processes,
            initialize_timeout=self.options.initialize_timeout,
            close_timeout=self.options.close_timeout,
            memory_warn_mb=self.options.memory_warn_mb,
            memory_limit_mb=self.options.memory_limit_mb,
            ping_interval=self.options.ping_interval,
            load_threshold=self.options.load_threshold,
            max_concurrent_tasks=1,  # Each resource handles one task at a time
            executor_type=self.options.executor_type,
            # Legacy IPC compatibility - dedicated inference process
            use_dedicated_inference_process=False,  # Disable dedicated inference process for now
            inference_process_timeout=30.0,  # Longer timeout for AI model loading
            inference_memory_warn_mb=1000.0,  # Higher threshold for AI models
        )

        self.process_manager = TaskExecutor(config)
        await self.process_manager.start()

        # Initialize backend connection if registering
        if self.options.register:
            await self._initialize_backend_connection()

        # Initialize and start debug HTTP server
        self._http_server = HttpServer(
            host=self.options.host,
            port=self.options.port,
        )
        self._http_server.set_worker(self)
        await self._http_server.start()

        logger.info("VideoSDK worker initialized successfully")

    async def _initialize_backend_connection(self):
        """Initialize connection to the backend registry."""
        if not self.options.register:
            return

        # Fetch agent init config to get registry URL
        try:
            logger.info("Fetching agent init config...")
            registry_url = await fetch_agent_init_config(
                auth_token=self.options.auth_token,
                api_base_url=f"https://{self.options.signaling_base_url}",
            )
            logger.info(f"Using registry URL: {registry_url}")
        except Exception as e:
            logger.error(f"Failed to fetch agent init config: {e}")
            raise RuntimeError(f"Agent init config is mandatory. Error: {e}")

        self.backend_connection = BackendConnection(
            auth_token=self.options.auth_token,
            agent_id=self.options.agent_id,
            worker_type=self.options.worker_type.value,
            version="1.0.0",
            max_retry=self.options.max_retry,
            backend_url=registry_url,
            load_threshold=self.options.load_threshold,
            max_processes=self.options.max_processes,
        )

        # Set up message handlers
        self.backend_connection.on_register(self._handle_register)
        self.backend_connection.on_availability(self._handle_availability)
        self.backend_connection.on_assignment(self._handle_assignment)
        self.backend_connection.on_termination(self._handle_termination)

        # Connect to backend
        await self.backend_connection.connect()

    async def _run_backend_mode(self):
        """Run the worker in backend registration mode."""
        logger.info("Running in backend registration mode")

        # Start status update loop
        status_task = asyncio.create_task(self._status_update_loop())
        self._tasks.add(status_task)

        try:
            # Keep the worker running
            while not self._shutdown:
                await asyncio.sleep(1)
        finally:
            status_task.cancel()
            self._tasks.discard(status_task)

    def _handle_register(self, worker_id: str, server_info: Dict[str, Any]):
        """Handle registration response from backend."""
        logger.info(f"Registered with backend: {worker_id}")
        logger.info(f"Server info: {server_info}")

    def _handle_availability(self, request: AvailabilityRequest):
        """Handle availability request from backend."""
        logger.info(f"Received availability request for job {request.job_id}")
        asyncio.create_task(self._answer_availability(request))

    async def _answer_availability(self, request: AvailabilityRequest):
        """Answer availability request."""
        try:
            # Check if we can accept the job
            can_accept = (
                not self._draining
                and self._worker_load < self.options.load_threshold
                and len(self._current_jobs) < self.options.max_processes
            )

            if can_accept:
                # Accept the job and provide our auth token
                response = AvailabilityResponse(
                    job_id=request.job_id,
                    available=True,
                    token=self.options.auth_token,  # Provide worker's auth token
                )
                logger.info(f"Accepting job {request.job_id}")
            else:
                # Reject the job
                response = AvailabilityResponse(
                    job_id=request.job_id,
                    available=False,
                    error="Worker at capacity or draining",
                )
                logger.info(f"Rejecting job {request.job_id}")

            # Send response
            await self.backend_connection.send_message(response)

        except Exception as e:
            logger.error(f"Error handling availability request: {e}")
            # Send rejection on error
            response = AvailabilityResponse(
                job_id=request.job_id,
                available=False,
                error=str(e),
            )
            await self.backend_connection.send_message(response)

    def _handle_assignment(self, assignment: JobAssignment):
        """Handle job assignment from backend."""
        logger.info(f"Received job assignment: {assignment.job_id}")
        asyncio.create_task(self._handle_job_assignment(assignment))

    async def _handle_job_assignment(self, assignment: JobAssignment):
        """Handle job assignment."""
        try:
            # Create job accept arguments
            args = JobAcceptArguments(
                identity=f"agent_{assignment.job_id}",
                name=self.options.agent_id,
                metadata="",
            )

            # Launch the job
            await self._launch_job_from_assignment(assignment, args)

        except Exception as e:
            logger.error(f"Error handling job assignment: {e}")
            # Send job update with error
            job_update = JobUpdate(
                job_id=assignment.job_id,
                status="failed",
                error=str(e),
            )
            await self.backend_connection.send_message(job_update)

    async def _handle_termination(self, termination: JobTermination):
        """Handle job termination request."""
        logger.info(f"Received job termination: {termination.job_id}")

        if termination.job_id in self._current_jobs:
            job_info = self._current_jobs[termination.job_id]

            try:
                await job_info.job.shutdown()
                logger.info(f"Successfully terminated job {termination.job_id}")
            except Exception as e:
                logger.error(f"Error terminating job {termination.job_id}: {e}")

            # Remove job from current jobs
            del self._current_jobs[termination.job_id]
            logger.info(
                f"Removed job {termination.job_id} from current jobs. Remaining jobs: {len(self._current_jobs)}"
            )

            # Notify registry about job completion
            if self.backend_connection and self.backend_connection.is_connected:
                try:
                    job_update = JobUpdate(
                        job_id=termination.job_id,
                        status="completed",
                        error="Job terminated by registry",
                    )
                    await self.backend_connection.send_message(job_update)
                    logger.info(
                        f"Sent job completion update for terminated job {termination.job_id}"
                    )
                except Exception as e:
                    logger.error(
                        f"Failed to send job completion update for terminated job {termination.job_id}: {e}"
                    )

            # IMMEDIATELY send status update to reflect reduced job count
            # This bypasses the debounce mechanism to ensure registry gets correct info
            await self._send_immediate_status_update()
        else:
            logger.warning(
                f"Job {termination.job_id} not found in current jobs for termination"
            )

    async def _handle_meeting_end(self, job_id: str, reason: str = "meeting_ended"):
        """Handle meeting end/leave events and inform registry."""
        logger.info(f"Meeting ended for job {job_id}, reason: {reason}")
        logger.info(
            f"Checking if job {job_id} is in current_jobs: {job_id in self._current_jobs}"
        )
        logger.info(f"Current jobs: {list(self._current_jobs.keys())}")

        if job_id in self._current_jobs:
            # Remove job from worker's current jobs
            job_info = self._current_jobs.pop(job_id, None)
            if job_info:
                logger.info(
                    f"Removed job {job_id} from worker's current jobs. Remaining jobs: {len(self._current_jobs)}"
                )

            # Inform registry about job completion
            if self.backend_connection and self.backend_connection.is_connected:
                try:
                    job_update = JobUpdate(
                        job_id=job_id,
                        status="completed",
                        error=f"Meeting ended: {reason}",
                    )
                    await self.backend_connection.send_message(job_update)
                    logger.info(
                        f"Sent job completion update to registry for job {job_id}"
                    )
                except Exception as e:
                    logger.error(
                        f"Failed to send job completion update to registry: {e}"
                    )

            # IMMEDIATELY send status update to reflect reduced job count
            # This bypasses the debounce mechanism to ensure registry gets correct info
            await self._send_immediate_status_update()
        else:
            logger.warning(f"Job {job_id} not found in current jobs when meeting ended")

    async def _send_immediate_status_update(self):
        """Send an immediate status update, bypassing debounce mechanism."""
        if not self.backend_connection or not self.backend_connection.is_connected:
            return

        try:
            # Calculate current load
            job_count = len(self._current_jobs)
            load = min(job_count / self.options.max_processes, 1.0)
            self._worker_load = load

            logger.info(
                f"Sending immediate status update - job_count: {job_count}, load: {load}, max_processes: {self.options.max_processes}"
            )

            # Log the actual job IDs for debugging
            if job_count > 0:
                job_ids = list(self._current_jobs.keys())
                logger.info(f"Active job IDs: {job_ids}")
            else:
                logger.info("No active jobs")

            # Send status update
            status_msg = WorkerMessage(
                type="status_update",
                worker_id=self.backend_connection.worker_id,
                agent_name=self.options.agent_id,
                status="available" if not self._draining else "draining",
                load=load,
                job_count=job_count,
            )

            await self.backend_connection.send_message(status_msg)
            logger.info("Immediate status update sent successfully")

        except Exception as e:
            logger.error(f"Error sending immediate status update: {e}")

    def setup_meeting_event_handlers(self, job_context, job_id: str):
        """Set up meeting event handlers for a specific job."""
        if not job_context.room:
            logger.warning(
                f"Cannot set up meeting handlers for job {job_id}: room not available"
            )
            # Set up a delayed handler setup that will be called when room becomes available
            original_connect = job_context.connect

            def delayed_handler_setup():
                if job_context.room:
                    self._setup_meeting_event_handlers_impl(job_context, job_id)
                else:
                    logger.warning(
                        f"Room still not available for job {job_id} after connect"
                    )

            # Override connect method to set up handlers after room is created
            async def connect_with_handlers():
                result = await original_connect()
                delayed_handler_setup()
                return result

            job_context.connect = connect_with_handlers
            logger.info(f"Set up delayed meeting event handlers for job {job_id}")
            return

        # Room is available, set up handlers immediately
        self._setup_meeting_event_handlers_impl(job_context, job_id)

    def _setup_meeting_event_handlers_impl(self, job_context, job_id: str):
        """Internal method to set up the actual meeting event handlers."""
        if not job_context.room:
            logger.warning(f"Room not available for job {job_id} in handler setup")
            return

        # Store original event handler
        original_on_meeting_left = job_context.room.on_meeting_left

        # Create wrapper that calls original and then handles cleanup
        def on_meeting_left_wrapper(data=None):
            # Call original handler first
            if original_on_meeting_left and callable(original_on_meeting_left):
                try:
                    # Call as a method with self bound
                    import inspect

                    sig = inspect.signature(original_on_meeting_left)
                    # Check if it's a bound method or function
                    if hasattr(original_on_meeting_left, "__self__"):
                        # It's a bound method
                        if len(sig.parameters) > 1:  # self + data
                            original_on_meeting_left(data)
                        else:  # just self
                            original_on_meeting_left()
                    else:
                        # It's a function
                        if len(sig.parameters) > 0:
                            original_on_meeting_left(data)
                        else:
                            original_on_meeting_left()
                except Exception as e:
                    logger.warning(f"Error calling original on_meeting_left: {e}")

            # Handle meeting end for this specific job
            logger.info(f"Meeting left event - triggering job cleanup for {job_id}")
            asyncio.create_task(self._handle_meeting_end(job_id, "meeting_left"))

        # Replace the handler with our wrapper
        job_context.room.on_meeting_left = on_meeting_left_wrapper
        logger.info(f"Set up meeting end handler for job {job_id}")

    async def _launch_job_from_assignment(
        self, assignment: JobAssignment, args: JobAcceptArguments
    ):
        """Launch a job from backend assignment."""
        try:
            # Use assignment token if available, otherwise fall back to worker's auth token
            auth_token = (
                assignment.token if assignment.token else self.options.auth_token
            )

            # Create room options from assignment (this was already done in _handle_job_assignment)
            room_options = RoomOptions(
                room_id=assignment.room_id,
                name=assignment.room_name,  # Use 'name' instead of 'room_name'
                auth_token=auth_token,
                signaling_base_url=self.options.signaling_base_url,
                recording=self.default_room_options.recording,
                background_audio=self.default_room_options.background_audio,
                agent_participant_id=self.default_room_options.agent_participant_id,
                join_meeting=self.default_room_options.join_meeting,
                auto_end_session=self.default_room_options.auto_end_session,
                session_timeout_seconds=self.default_room_options.session_timeout_seconds,
            )

            # Apply RoomOptions from assignment if provided
            if assignment.room_options:
                logger.info(
                    f"Received room_options from assignment: {assignment.room_options}"
                )
                if "auto_end_session" in assignment.room_options:
                    room_options.auto_end_session = assignment.room_options[
                        "auto_end_session"
                    ]
                    logger.info(
                        f"Set auto_end_session: {room_options.auto_end_session}"
                    )
                if "session_timeout_seconds" in assignment.room_options:
                    room_options.session_timeout_seconds = assignment.room_options[
                        "session_timeout_seconds"
                    ]
                    logger.info(
                        f"Set session_timeout_seconds: {room_options.session_timeout_seconds}"
                    )
                if "playground" in assignment.room_options:
                    room_options.playground = assignment.room_options["playground"]
                    logger.info(f"Set playground: {room_options.playground}")
                if "vision" in assignment.room_options:
                    room_options.vision = assignment.room_options["vision"]
                    logger.info(f"Set vision: {room_options.vision}")
                if "join_meeting" in assignment.room_options:
                    room_options.join_meeting = assignment.room_options["join_meeting"]
                    logger.info(f"Set join_meeting: {room_options.join_meeting}")
                if "recording" in assignment.room_options:
                    room_options.recording = assignment.room_options["recording"]
                    logger.info(f"Set recording: {room_options.recording}")
                if "background_audio" in assignment.room_options:
                    room_options.background_audio = assignment.room_options["background_audio"]
                    logger.info(f"Set background_audio: {room_options.background_audio}")
                if "agent_participant_id" in assignment.room_options:
                    room_options.agent_participant_id = assignment.room_options[
                        "agent_participant_id"
                    ]
                    logger.info(
                        f"Set agent_participant_id: {room_options.agent_participant_id}"
                    )
            else:
                logger.warning("No room_options received from assignment")

            # Create job context
            job_context = JobContext(
                room_options=room_options,
                metadata=assignment.metadata,
            )

            # Create running job info with correct parameters
            job_info = RunningJobInfo(
                accept_arguments=args,
                job=job_context,
                url=assignment.url,
                token=auth_token,
                worker_id=self.backend_connection.worker_id,
            )

            # Store job info BEFORE executing entrypoint
            self._current_jobs[assignment.job_id] = job_info
            logger.info(
                f"Added job {assignment.job_id} to worker's current jobs. Total jobs: {len(self._current_jobs)}"
            )

            # Send job update to registry
            job_update = JobUpdate(
                job_id=assignment.job_id,
                status="running",
            )
            await self.backend_connection.send_message(job_update)

            # Set up session end callback BEFORE executing entrypoint
            # This ensures the callback is set up even if entrypoint fails
            self.setup_session_end_callback(job_context, assignment.job_id)
            logger.info(f"Session end callback set up for job {assignment.job_id}")

            # Set up meeting event handlers to ensure proper event handling
            self.setup_meeting_event_handlers(job_context, assignment.job_id)
            logger.info(f"Meeting event handlers set up for job {assignment.job_id}")

            # Execute the job using the worker's entrypoint function
            logger.info(f"Executing job {assignment.job_id} with entrypoint function")

            try:
                # Set the current job context so pipeline auto-registration works
                from .job import _set_current_job_context, _reset_current_job_context

                token = _set_current_job_context(job_context)
                try:
                    # Execute the entrypoint function
                    await self.options.entrypoint_fnc(job_context)
                    logger.info(
                        f"Entrypoint function completed for job {assignment.job_id}"
                    )
                finally:
                    pass
            except Exception as entrypoint_error:
                logger.error(
                    f"Entrypoint function failed for job {assignment.job_id}: {entrypoint_error}"
                )
                # Don't remove the job from _current_jobs here - let the session end callback handle it
                # The job should remain active until the session actually ends

                # Send error update but keep job active
                error_update = JobUpdate(
                    job_id=assignment.job_id,
                    status="error",
                    error=f"Entrypoint failed: {entrypoint_error}",
                )
                await self.backend_connection.send_message(error_update)

            # The job should remain in _current_jobs until the session ends
            # This ensures the registry sees the correct load and job count
            logger.info(
                f"Job {assignment.job_id} remains active in worker's current jobs: {len(self._current_jobs)} total jobs"
            )

        except Exception as e:
            logger.error(f"Error launching job {assignment.job_id}: {e}")
            # Send error update
            job_update = JobUpdate(
                job_id=assignment.job_id,
                status="failed",
                error=str(e),
            )
            await self.backend_connection.send_message(job_update)
            # Remove job from current jobs since it failed to launch
            self._current_jobs.pop(assignment.job_id, None)
            logger.info(f"Removed failed job {assignment.job_id} from current jobs")

            # Send immediate status update to reflect reduced job count
            await self._send_immediate_status_update()

    def setup_session_end_callback(self, job_context, job_id: str):
        """Set up session end callback for automatic session ending."""
        if not job_context.room:
            logger.warning(
                f"Cannot set up session end callback for job {job_id}: room not available"
            )
            # Set up a delayed callback setup that will be called when room becomes available
            original_connect = job_context.connect

            def delayed_callback_setup():
                if job_context.room:
                    self._setup_session_end_callback_impl(job_context, job_id)
                else:
                    logger.warning(
                        f"Room still not available for job {job_id} after connect"
                    )

            # Override connect method to set up callback after room is created
            async def connect_with_callback():
                result = await original_connect()
                delayed_callback_setup()
                return result

            job_context.connect = connect_with_callback
            logger.info(f"Set up delayed session end callback for job {job_id}")
            return

        # Room is available, set up callback immediately
        self._setup_session_end_callback_impl(job_context, job_id)

    def _setup_session_end_callback_impl(self, job_context, job_id: str):
        """Internal method to set up the actual session end callback."""
        if not job_context.room:
            logger.warning(f"Room not available for job {job_id} in callback setup")
            return

        # Store original callback if it exists
        original_on_session_end = job_context.room.on_session_end

        def on_session_end_wrapper(reason: str):
            logger.info(f"Session ended for job {job_id}, reason: {reason}")

            # Call original callback if it exists
            if original_on_session_end:
                try:
                    original_on_session_end(reason)
                except Exception as e:
                    logger.error(f"Error in original session end callback: {e}")

            logger.info(f"Calling _handle_meeting_end for job {job_id}")
            # Handle meeting end asynchronously
            asyncio.create_task(
                self._handle_meeting_end(job_id, f"session_ended: {reason}")
            )

        # Set the wrapped session end callback
        job_context.room.on_session_end = on_session_end_wrapper
        logger.info(f"Session end callback set up for job {job_id}")

    async def _status_update_loop(self):
        """Periodic status update loop."""
        while not self._shutdown:
            try:
                await self._update_worker_status()
                await asyncio.sleep(self.options.ping_interval)
            except Exception as e:
                logger.error(f"Error in status update loop: {e}")
                await asyncio.sleep(5)  # Wait before retrying

    async def _update_worker_status(self):
        """Update worker status with backend."""
        if not self.backend_connection or not self.backend_connection.is_connected:
            return

        # Check debounce - don't send status updates too frequently
        current_time = time.time()
        if (
            current_time - self._last_status_update
            < self._status_update_debounce_seconds
        ):
            logger.debug("Skipping status update due to debounce")
            return

        try:
            # Calculate current load
            job_count = len(self._current_jobs)
            load = min(job_count / self.options.max_processes, 1.0)
            self._worker_load = load

            # Add detailed logging to track job count changes
            logger.info(
                f"Updating worker status - job_count: {job_count}, load: {load}, max_processes: {self.options.max_processes}"
            )

            # Log the actual job IDs for debugging
            if job_count > 0:
                job_ids = list(self._current_jobs.keys())
                logger.info(f"Active job IDs: {job_ids}")
            else:
                logger.info("No active jobs")

            # Send status update
            status_msg = WorkerMessage(
                type="status_update",
                worker_id=self.backend_connection.worker_id,
                agent_name=self.options.agent_id,  # Include agent_id
                status="available" if not self._draining else "draining",
                load=load,
                job_count=job_count,
            )

            await self.backend_connection.send_message(status_msg)

            # Update last status update time
            self._last_status_update = current_time

            # Update tracing
            self._worker_load_graph.add_point(load)

        except Exception as e:
            logger.error(f"Error updating worker status: {e}")

    async def execute_job(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
        """Execute a job using the task executor."""
        if not self.process_manager:
            raise RuntimeError("Task executor not initialized")

        # Extract entrypoint function from job data
        entrypoint = job_data.get("entrypoint", self.options.entrypoint_fnc)

        # Execute using new task executor
        result = await self.process_manager.execute(
            entrypoint=entrypoint,
            task_type=TaskType.JOB,
            timeout=job_data.get("timeout", 300.0),
            retry_count=job_data.get("retry_count", 3),
            priority=job_data.get("priority", 0),
            *job_data.get("args", ()),
            **job_data.get("kwargs", {}),
        )

        # Convert TaskResult to expected format
        return {
            "status": result.status.value,
            "result": result.result,
            "error": result.error,
            "execution_time": result.execution_time,
            "task_id": result.task_id,
        }

    async def execute_inference(self, inference_data: Dict[str, Any]) -> Dict[str, Any]:
        """Execute an inference using the task executor."""
        if not self.process_manager:
            raise RuntimeError("Task executor not initialized")

        # Extract entrypoint function from inference data
        entrypoint = inference_data.get("entrypoint", self.options.entrypoint_fnc)

        # Execute using new task executor
        result = await self.process_manager.execute(
            entrypoint=entrypoint,
            task_type=TaskType.INFERENCE,
            timeout=inference_data.get("timeout", 300.0),
            retry_count=inference_data.get("retry_count", 3),
            priority=inference_data.get("priority", 0),
            *inference_data.get("args", ()),
            **inference_data.get("kwargs", {}),
        )

        # Convert TaskResult to expected format
        return {
            "status": result.status.value,
            "result": result.result,
            "error": result.error,
            "execution_time": result.execution_time,
            "task_id": result.task_id,
        }

    def get_stats(self) -> Dict[str, Any]:
        """Get worker statistics."""
        # Calculate current load dynamically
        job_count = len(self._current_jobs)
        current_load = min(job_count / self.options.max_processes, 1.0)

        stats = {
            "worker_load": current_load,
            "draining": self._draining,
            "current_jobs": job_count,
            "max_processes": self.options.max_processes,
            "agent_id": self.options.agent_id,
            "register": self.options.register,
        }

        if self.backend_connection:
            stats.update(
                {
                    "backend_connected": self.backend_connection.is_connected,
                    "worker_id": self.backend_connection.worker_id,
                }
            )

        if self.process_manager:
            try:
                process_stats = self.process_manager.get_stats()
                logger.debug(f"Process manager stats: {process_stats}")

                # Get current resource stats and dedicated inference status
                if "resource_stats" in process_stats:
                    stats["resource_stats"] = process_stats["resource_stats"]
                    logger.debug(f"Resource stats: {process_stats['resource_stats']}")
                if "dedicated_inference" in process_stats:
                    stats["dedicated_inference"] = process_stats["dedicated_inference"]

                # Also get current resource info for more detailed stats
                try:
                    resource_info = self.process_manager.get_resource_info()
                    logger.debug(
                        f"Resource info count: {len(resource_info) if resource_info else 0}"
                    )

                    if resource_info:
                        stats["resource_info"] = [
                            {
                                "resource_id": info.resource_id,
                                "resource_type": info.resource_type.value,
                                "status": info.status.value,
                                "current_load": info.current_load,
                                "memory_usage_mb": info.memory_usage_mb,
                                "cpu_usage_percent": info.cpu_usage_percent,
                                "active_tasks": info.active_tasks,
                                "total_tasks_processed": info.total_tasks_processed,
                                "last_heartbeat": info.last_heartbeat,
                                "metadata": info.metadata,
                            }
                            for info in resource_info
                        ]

                        # Add summary of resource status
                        resource_summary = {
                            "total_resources": len(resource_info),
                            "available_resources": len(
                                [r for r in resource_info if r.status == "IDLE"]
                            ),
                            "active_resources": len(
                                [r for r in resource_info if r.status != "IDLE"]
                            ),
                            "dedicated_inference_active": any(
                                r.resource_type == "DEDICATED_INFERENCE"
                                and r.status != "IDLE"
                                for r in resource_info
                            ),
                        }
                        stats["resource_summary"] = resource_summary
                        logger.debug(f"Resource summary: {resource_summary}")
                except Exception as e:
                    logger.debug(f"Could not get detailed resource info: {e}")
            except Exception as e:
                logger.error(f"Error getting process manager stats: {e}")
                stats["resource_stats"] = {"error": str(e)}
                stats["dedicated_inference"] = None

        return stats

    async def drain(self, timeout: Optional[float] = None) -> None:
        """Drain the worker - wait for current jobs to finish before shutting down."""
        if self._draining:
            return

        logger.info("Draining VideoSDK worker")
        self._draining = True
        await self._update_worker_status()

        # Wait for current jobs to complete
        if self._current_jobs:
            logger.info(
                f"Waiting for {len(self._current_jobs)} active jobs to complete"
            )

            if timeout:
                try:
                    await asyncio.wait_for(self._wait_for_jobs(), timeout)
                except asyncio.TimeoutError:
                    logger.warning(
                        f"Timeout waiting for jobs to complete after {timeout}s"
                    )
            else:
                await self._wait_for_jobs()

    async def _wait_for_jobs(self) -> None:
        """Wait for all current jobs to complete."""
        while self._current_jobs:
            # Wait a bit and check again
            await asyncio.sleep(1)
            logger.info(f"Still waiting for {len(self._current_jobs)} jobs to complete")

    async def _cleanup_all_jobs(self):
        """Clean up all current jobs and notify registry."""
        if not self._current_jobs:
            return

        logger.info(f"Cleaning up {len(self._current_jobs)} jobs during shutdown")

        # Create a copy of jobs to iterate over, as they will be modified
        jobs_to_clean = list(self._current_jobs.items())

        for job_id, job_info in jobs_to_clean:
            try:
                logger.info(f"Terminating job {job_id}...")
                await job_info.job.shutdown()  # This calls job.shutdown()
                logger.info(f"Job {job_id} terminated successfully.")
            except Exception as e:
                logger.error(f"Error terminating job {job_id}: {e}")

            try:
                if self.backend_connection and self.backend_connection.is_connected:
                    job_update = JobUpdate(
                        job_id=job_id,
                        status="completed",
                        error="Worker shutdown",
                    )
                    await self.backend_connection.send_message(job_update)
                    logger.info(
                        f"Sent job completion update for job {job_id} during shutdown"
                    )
            except Exception as e:
                logger.error(f"Failed to send job completion update for {job_id}: {e}")

        # Clear all jobs from the worker's state
        self._current_jobs.clear()
        logger.info("All jobs cleared from worker")

        # Send a final status update reflecting zero jobs
        if self.backend_connection and self.backend_connection.is_connected:
            await self._send_immediate_status_update()

    async def shutdown(self):
        """Shutdown the worker."""
        logger.info("Shutting down VideoSDK worker")
        self._shutdown = True
        self._draining = True

        try:
            # Clean up all jobs first to ensure proper room cleanup
            await self._cleanup_all_jobs()
        except Exception as e:
            logger.error(f"Error during job cleanup: {e}")

        try:
            # Send final status update to registry
            if self.backend_connection and self.backend_connection.is_connected:
                try:
                    await self._update_worker_status()
                    logger.info("Sent final status update to registry")
                except Exception as e:
                    logger.warning(f"Failed to send final status update: {e}")

            # Disconnect from backend
            if self.backend_connection:
                logger.info("Disconnecting from backend")
                await self.backend_connection.disconnect()
        except Exception as e:
            logger.error(f"Error during backend cleanup: {e}")

        try:
            # Cancel all tasks
            for task in self._tasks:
                if not task.done():
                    task.cancel()

            # Wait briefly for tasks to complete
            if self._tasks:
                done, pending = await asyncio.wait(self._tasks, timeout=2.0)
                for task in pending:
                    task.cancel()
        except Exception as e:
            logger.error(f"Error during task cleanup: {e}")

        try:
            # Shutdown task executor
            if self.process_manager:
                await self.process_manager.stop()
        except Exception as e:
            logger.error(f"Error stopping process manager: {e}")

        try:
            # Stop debug HTTP server
            if self._http_server:
                await self._http_server.aclose()
        except Exception as e:
            logger.error(f"Error stopping HTTP server: {e}")

        logger.info("VideoSDK worker shutdown complete")

    async def __aenter__(self):
        """Async context manager entry."""
        await self.initialize()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        await self.shutdown()

VideoSDK worker that manages job execution and backend registration.

def run(self): job_context = functools.partial(self.job.jobctx) entrypoint = functools.partial(self.job.entrypoint) p = multiprocessing.Process( target=_job_runner, args=(entrypoint, job_context) Automatically selects the appropriate executor type based on platform.

Initialize the worker.

Static methods

def run_worker(options: WorkerOptions,
default_room_options: RoomOptions | None = None)
Expand source code
@staticmethod
def run_worker(
    options: WorkerOptions, default_room_options: Optional[RoomOptions] = None
):
    """
    Run a VideoSDK worker with the given options.

    This is the main entry point for running a VideoSDK worker,
    providing a high-level interface for worker initialization, job management, and lifecycle control.

    Args:
        options: Worker configuration options
        default_room_options: Optional default room options

    Example:
        ```python
        from videosdk.agents import Worker, WorkerOptions

        def my_agent(job_ctx):
            # Your agent code here
            pass

        # Configure worker with custom log level - logging is automatically configured!
        options = WorkerOptions(
            entrypoint_fnc=my_agent,
            log_level="DEBUG"  # Options: DEBUG, INFO, WARNING, ERROR
        )

        # Run the worker - no manual logging setup needed!
        Worker.run_worker(options)
        ```
    """
    worker = Worker(options, default_room_options=default_room_options)
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    async def main_task():
        try:
            await worker.initialize()
            if options.register:
                # Backend registration mode
                await worker._run_backend_mode()
            else:
                # Default mode - just keep alive
                while not worker._shutdown:
                    await asyncio.sleep(1)

        except asyncio.CancelledError:
            logger.info("Main task cancelled")
        except Exception as e:
            logger.error(f"Worker error: {e}")
            raise
        finally:
            await worker.shutdown()

    main_future = loop.create_task(main_task())
    shutting_down = False

    def signal_handler(signum, frame):
        nonlocal shutting_down
        if shutting_down:
            # If already shutting down, cancel all tasks more aggressively
            for task in asyncio.all_tasks(loop):
                task.cancel()
            return
        shutting_down = True
        logger.info(f"Received signal {signum}. Initiating graceful shutdown...")
        # Cancel the main task
        loop.call_soon_threadsafe(main_future.cancel)
        # Set a timeout for graceful shutdown
        loop.call_later(
            3.0, lambda: [task.cancel() for task in asyncio.all_tasks(loop)]
        )

    try:
        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGTERM, signal_handler)

        loop.run_until_complete(main_future)
    except KeyboardInterrupt:
        logger.info("Keyboard interrupt received")
        if not shutting_down:
            shutting_down = True
            if not main_future.done():
                main_future.cancel()
            loop.run_until_complete(worker.shutdown())
    finally:
        try:
            loop.close()
        except Exception as e:
            logger.error(f"Error closing event loop: {e}")

    if loop.is_closed():
        logger.info("Event loop closed successfully")

Run a VideoSDK worker with the given options.

This is the main entry point for running a VideoSDK worker, providing a high-level interface for worker initialization, job management, and lifecycle control.

Args

options
Worker configuration options
default_room_options
Optional default room options

Example

from videosdk.agents import Worker, WorkerOptions

def my_agent(job_ctx):
    # Your agent code here
    pass

# Configure worker with custom log level - logging is automatically configured!
options = WorkerOptions(
    entrypoint_fnc=my_agent,
    log_level="DEBUG"  # Options: DEBUG, INFO, WARNING, ERROR
)

# Run the worker - no manual logging setup needed!
Worker.run_worker(options)

Methods

async def drain(self, timeout: float | None = None) ‑> None
Expand source code
async def drain(self, timeout: Optional[float] = None) -> None:
    """Drain the worker - wait for current jobs to finish before shutting down."""
    if self._draining:
        return

    logger.info("Draining VideoSDK worker")
    self._draining = True
    await self._update_worker_status()

    # Wait for current jobs to complete
    if self._current_jobs:
        logger.info(
            f"Waiting for {len(self._current_jobs)} active jobs to complete"
        )

        if timeout:
            try:
                await asyncio.wait_for(self._wait_for_jobs(), timeout)
            except asyncio.TimeoutError:
                logger.warning(
                    f"Timeout waiting for jobs to complete after {timeout}s"
                )
        else:
            await self._wait_for_jobs()

Drain the worker - wait for current jobs to finish before shutting down.

async def execute_inference(self, inference_data: Dict[str, Any]) ‑> Dict[str, Any]
Expand source code
async def execute_inference(self, inference_data: Dict[str, Any]) -> Dict[str, Any]:
    """Execute an inference using the task executor."""
    if not self.process_manager:
        raise RuntimeError("Task executor not initialized")

    # Extract entrypoint function from inference data
    entrypoint = inference_data.get("entrypoint", self.options.entrypoint_fnc)

    # Execute using new task executor
    result = await self.process_manager.execute(
        entrypoint=entrypoint,
        task_type=TaskType.INFERENCE,
        timeout=inference_data.get("timeout", 300.0),
        retry_count=inference_data.get("retry_count", 3),
        priority=inference_data.get("priority", 0),
        *inference_data.get("args", ()),
        **inference_data.get("kwargs", {}),
    )

    # Convert TaskResult to expected format
    return {
        "status": result.status.value,
        "result": result.result,
        "error": result.error,
        "execution_time": result.execution_time,
        "task_id": result.task_id,
    }

Execute an inference using the task executor.

async def execute_job(self, job_data: Dict[str, Any]) ‑> Dict[str, Any]
Expand source code
async def execute_job(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
    """Execute a job using the task executor."""
    if not self.process_manager:
        raise RuntimeError("Task executor not initialized")

    # Extract entrypoint function from job data
    entrypoint = job_data.get("entrypoint", self.options.entrypoint_fnc)

    # Execute using new task executor
    result = await self.process_manager.execute(
        entrypoint=entrypoint,
        task_type=TaskType.JOB,
        timeout=job_data.get("timeout", 300.0),
        retry_count=job_data.get("retry_count", 3),
        priority=job_data.get("priority", 0),
        *job_data.get("args", ()),
        **job_data.get("kwargs", {}),
    )

    # Convert TaskResult to expected format
    return {
        "status": result.status.value,
        "result": result.result,
        "error": result.error,
        "execution_time": result.execution_time,
        "task_id": result.task_id,
    }

Execute a job using the task executor.

def get_stats(self) ‑> Dict[str, Any]
Expand source code
def get_stats(self) -> Dict[str, Any]:
    """Get worker statistics."""
    # Calculate current load dynamically
    job_count = len(self._current_jobs)
    current_load = min(job_count / self.options.max_processes, 1.0)

    stats = {
        "worker_load": current_load,
        "draining": self._draining,
        "current_jobs": job_count,
        "max_processes": self.options.max_processes,
        "agent_id": self.options.agent_id,
        "register": self.options.register,
    }

    if self.backend_connection:
        stats.update(
            {
                "backend_connected": self.backend_connection.is_connected,
                "worker_id": self.backend_connection.worker_id,
            }
        )

    if self.process_manager:
        try:
            process_stats = self.process_manager.get_stats()
            logger.debug(f"Process manager stats: {process_stats}")

            # Get current resource stats and dedicated inference status
            if "resource_stats" in process_stats:
                stats["resource_stats"] = process_stats["resource_stats"]
                logger.debug(f"Resource stats: {process_stats['resource_stats']}")
            if "dedicated_inference" in process_stats:
                stats["dedicated_inference"] = process_stats["dedicated_inference"]

            # Also get current resource info for more detailed stats
            try:
                resource_info = self.process_manager.get_resource_info()
                logger.debug(
                    f"Resource info count: {len(resource_info) if resource_info else 0}"
                )

                if resource_info:
                    stats["resource_info"] = [
                        {
                            "resource_id": info.resource_id,
                            "resource_type": info.resource_type.value,
                            "status": info.status.value,
                            "current_load": info.current_load,
                            "memory_usage_mb": info.memory_usage_mb,
                            "cpu_usage_percent": info.cpu_usage_percent,
                            "active_tasks": info.active_tasks,
                            "total_tasks_processed": info.total_tasks_processed,
                            "last_heartbeat": info.last_heartbeat,
                            "metadata": info.metadata,
                        }
                        for info in resource_info
                    ]

                    # Add summary of resource status
                    resource_summary = {
                        "total_resources": len(resource_info),
                        "available_resources": len(
                            [r for r in resource_info if r.status == "IDLE"]
                        ),
                        "active_resources": len(
                            [r for r in resource_info if r.status != "IDLE"]
                        ),
                        "dedicated_inference_active": any(
                            r.resource_type == "DEDICATED_INFERENCE"
                            and r.status != "IDLE"
                            for r in resource_info
                        ),
                    }
                    stats["resource_summary"] = resource_summary
                    logger.debug(f"Resource summary: {resource_summary}")
            except Exception as e:
                logger.debug(f"Could not get detailed resource info: {e}")
        except Exception as e:
            logger.error(f"Error getting process manager stats: {e}")
            stats["resource_stats"] = {"error": str(e)}
            stats["dedicated_inference"] = None

    return stats

Get worker statistics.

async def initialize(self,
default_room_options: RoomOptions | None = None)
Expand source code
async def initialize(self, default_room_options: Optional[RoomOptions] = None):
    """Initialize the worker."""
    logger.info("Initializing VideoSDK worker")

    # Initialize task executor with new execution architecture
    # Convert ExecutorType to ResourceType
    resource_type = (
        ResourceType.THREAD
        if self.options.executor_type == ExecutorType.THREAD
        else ResourceType.PROCESS
    )

    config = ResourceConfig(
        resource_type=resource_type,
        num_idle_resources=self.options.num_idle_processes,
        max_resources=self.options.max_processes,
        initialize_timeout=self.options.initialize_timeout,
        close_timeout=self.options.close_timeout,
        memory_warn_mb=self.options.memory_warn_mb,
        memory_limit_mb=self.options.memory_limit_mb,
        ping_interval=self.options.ping_interval,
        load_threshold=self.options.load_threshold,
        max_concurrent_tasks=1,  # Each resource handles one task at a time
        executor_type=self.options.executor_type,
        # Legacy IPC compatibility - dedicated inference process
        use_dedicated_inference_process=False,  # Disable dedicated inference process for now
        inference_process_timeout=30.0,  # Longer timeout for AI model loading
        inference_memory_warn_mb=1000.0,  # Higher threshold for AI models
    )

    self.process_manager = TaskExecutor(config)
    await self.process_manager.start()

    # Initialize backend connection if registering
    if self.options.register:
        await self._initialize_backend_connection()

    # Initialize and start debug HTTP server
    self._http_server = HttpServer(
        host=self.options.host,
        port=self.options.port,
    )
    self._http_server.set_worker(self)
    await self._http_server.start()

    logger.info("VideoSDK worker initialized successfully")

Initialize the worker.

def setup_meeting_event_handlers(self, job_context, job_id: str)
Expand source code
def setup_meeting_event_handlers(self, job_context, job_id: str):
    """Set up meeting event handlers for a specific job."""
    if not job_context.room:
        logger.warning(
            f"Cannot set up meeting handlers for job {job_id}: room not available"
        )
        # Set up a delayed handler setup that will be called when room becomes available
        original_connect = job_context.connect

        def delayed_handler_setup():
            if job_context.room:
                self._setup_meeting_event_handlers_impl(job_context, job_id)
            else:
                logger.warning(
                    f"Room still not available for job {job_id} after connect"
                )

        # Override connect method to set up handlers after room is created
        async def connect_with_handlers():
            result = await original_connect()
            delayed_handler_setup()
            return result

        job_context.connect = connect_with_handlers
        logger.info(f"Set up delayed meeting event handlers for job {job_id}")
        return

    # Room is available, set up handlers immediately
    self._setup_meeting_event_handlers_impl(job_context, job_id)

Set up meeting event handlers for a specific job.

def setup_session_end_callback(self, job_context, job_id: str)
Expand source code
def setup_session_end_callback(self, job_context, job_id: str):
    """Set up session end callback for automatic session ending."""
    if not job_context.room:
        logger.warning(
            f"Cannot set up session end callback for job {job_id}: room not available"
        )
        # Set up a delayed callback setup that will be called when room becomes available
        original_connect = job_context.connect

        def delayed_callback_setup():
            if job_context.room:
                self._setup_session_end_callback_impl(job_context, job_id)
            else:
                logger.warning(
                    f"Room still not available for job {job_id} after connect"
                )

        # Override connect method to set up callback after room is created
        async def connect_with_callback():
            result = await original_connect()
            delayed_callback_setup()
            return result

        job_context.connect = connect_with_callback
        logger.info(f"Set up delayed session end callback for job {job_id}")
        return

    # Room is available, set up callback immediately
    self._setup_session_end_callback_impl(job_context, job_id)

Set up session end callback for automatic session ending.

async def shutdown(self)
Expand source code
async def shutdown(self):
    """Shutdown the worker."""
    logger.info("Shutting down VideoSDK worker")
    self._shutdown = True
    self._draining = True

    try:
        # Clean up all jobs first to ensure proper room cleanup
        await self._cleanup_all_jobs()
    except Exception as e:
        logger.error(f"Error during job cleanup: {e}")

    try:
        # Send final status update to registry
        if self.backend_connection and self.backend_connection.is_connected:
            try:
                await self._update_worker_status()
                logger.info("Sent final status update to registry")
            except Exception as e:
                logger.warning(f"Failed to send final status update: {e}")

        # Disconnect from backend
        if self.backend_connection:
            logger.info("Disconnecting from backend")
            await self.backend_connection.disconnect()
    except Exception as e:
        logger.error(f"Error during backend cleanup: {e}")

    try:
        # Cancel all tasks
        for task in self._tasks:
            if not task.done():
                task.cancel()

        # Wait briefly for tasks to complete
        if self._tasks:
            done, pending = await asyncio.wait(self._tasks, timeout=2.0)
            for task in pending:
                task.cancel()
    except Exception as e:
        logger.error(f"Error during task cleanup: {e}")

    try:
        # Shutdown task executor
        if self.process_manager:
            await self.process_manager.stop()
    except Exception as e:
        logger.error(f"Error stopping process manager: {e}")

    try:
        # Stop debug HTTP server
        if self._http_server:
            await self._http_server.aclose()
    except Exception as e:
        logger.error(f"Error stopping HTTP server: {e}")

    logger.info("VideoSDK worker shutdown complete")

Shutdown the worker.

class WorkerOptions (entrypoint_fnc: Callable[[JobContext], Any],
request_fnc: Callable[[ForwardRef('JobRequest')], Any] | None = None,
initialize_process_fnc: Callable[[Any], Any] | None = None,
executor_type: ExecutorType = ExecutorType.PROCESS,
num_idle_processes: int = 2,
initialize_timeout: float = 10.0,
close_timeout: float = 60.0,
memory_warn_mb: float = 500.0,
memory_limit_mb: float = 0.0,
ping_interval: float = 30.0,
max_processes: int = 10,
agent_id: str = 'VideoSDKAgent',
auth_token: str | None = None,
worker_type: WorkerType = WorkerType.ROOM,
permissions: WorkerPermissions = <factory>,
max_retry: int = 16,
load_threshold: float = 0.75,
register: bool = False,
signaling_base_url: str = 'api.videosdk.live',
host: str = '0.0.0.0',
port: int = 8081,
log_level: str = 'INFO')
Expand source code
@dataclass
class WorkerOptions:
    """Configuration options for the VideoSDK worker."""

    entrypoint_fnc: Callable[[JobContext], Any]
    """Entrypoint function that will be called when a job is assigned to this worker."""

    request_fnc: Optional[Callable[["JobRequest"], Any]] = None
    """Function to handle job requests and decide whether to accept them."""

    initialize_process_fnc: Optional[Callable[[Any], Any]] = None
    """A function to perform any necessary initialization before the job starts."""

    executor_type: ExecutorType = _default_executor_type
    """Which executor to use to run jobs. Automatically selected based on platform."""

    num_idle_processes: int = 2
    """Number of idle processes/threads to keep warm."""

    initialize_timeout: float = 10.0
    """Maximum amount of time to wait for a process/thread to initialize/prewarm"""

    close_timeout: float = 60.0
    """Maximum amount of time to wait for a job to shut down gracefully"""

    memory_warn_mb: float = 500.0
    """Memory warning threshold in MB."""

    memory_limit_mb: float = 0.0
    """Maximum memory usage for a job in MB. Defaults to 0 (disabled)."""

    ping_interval: float = 30.0
    """Interval between health check pings."""

    max_processes: int = 10
    """Maximum number of processes/threads."""

    agent_id: str = "VideoSDKAgent"
    """ID of the agent."""

    auth_token: Optional[str] = None
    """VideoSDK authentication token. Uses VIDEOSDK_AUTH_TOKEN env var if not provided. 
    This token is used for both VideoSDK services and registry authentication."""

    worker_type: WorkerType = WorkerType.ROOM
    """Type of worker (room or publisher)."""

    permissions: WorkerPermissions = field(default_factory=WorkerPermissions)
    """Permissions for the agent participant."""

    max_retry: int = 16
    """Maximum number of times to retry connecting to VideoSDK."""

    load_threshold: float = 0.75
    """Load threshold above which worker is marked as unavailable."""

    register: bool = False
    """Whether to register with the backend. Defaults to False for local development."""

    signaling_base_url: str = "api.videosdk.live"
    """Signaling base URL for VideoSDK services. Defaults to api.videosdk.live."""

    host: str = "0.0.0.0"
    """Host for the debug HTTP server."""

    port: int = 8081
    """Port for the debug HTTP server."""

    log_level: str = "INFO"
    """Log level for SDK logging. Options: DEBUG, INFO, WARNING, ERROR. Defaults to INFO."""

    def __post_init__(self):
        """Post-initialization setup."""
        if not self.auth_token:
            self.auth_token = os.getenv("VIDEOSDK_AUTH_TOKEN")

        # Log the selected executor type
        logger.info(f"Worker configured with {self.executor_type.value} executor")

Configuration options for the VideoSDK worker.

Instance variables

var agent_id : str

ID of the agent.

var auth_token : str | None

VideoSDK authentication token. Uses VIDEOSDK_AUTH_TOKEN env var if not provided. This token is used for both VideoSDK services and registry authentication.

var close_timeout : float

Maximum amount of time to wait for a job to shut down gracefully

var entrypoint_fnc : Callable[[JobContext], Any]

Entrypoint function that will be called when a job is assigned to this worker.

var executor_typeExecutorType

Which executor to use to run jobs. Automatically selected based on platform.

var host : str

Host for the debug HTTP server.

var initialize_process_fnc : Callable[[Any], Any] | None

A function to perform any necessary initialization before the job starts.

var initialize_timeout : float

Maximum amount of time to wait for a process/thread to initialize/prewarm

var load_threshold : float

Load threshold above which worker is marked as unavailable.

var log_level : str

Log level for SDK logging. Options: DEBUG, INFO, WARNING, ERROR. Defaults to INFO.

var max_processes : int

Maximum number of processes/threads.

var max_retry : int

Maximum number of times to retry connecting to VideoSDK.

var memory_limit_mb : float

Maximum memory usage for a job in MB. Defaults to 0 (disabled).

var memory_warn_mb : float

Memory warning threshold in MB.

var num_idle_processes : int

Number of idle processes/threads to keep warm.

var permissionsWorkerPermissions

Permissions for the agent participant.

var ping_interval : float

Interval between health check pings.

var port : int

Port for the debug HTTP server.

var register : bool

Whether to register with the backend. Defaults to False for local development.

var request_fnc : Callable[[JobRequest], Any] | None

Function to handle job requests and decide whether to accept them.

var signaling_base_url : str

Signaling base URL for VideoSDK services. Defaults to api.videosdk.live.

var worker_typeWorkerType

Type of worker (room or publisher).

class WorkerPermissions (can_publish: bool = True,
can_subscribe: bool = True,
can_publish_data: bool = True,
can_update_metadata: bool = True,
hidden: bool = False)
Expand source code
@dataclass
class WorkerPermissions:
    """Permissions for the agent participant."""

    can_publish: bool = True
    can_subscribe: bool = True
    can_publish_data: bool = True
    can_update_metadata: bool = True
    hidden: bool = False

Permissions for the agent participant.

Instance variables

var can_publish : bool
var can_publish_data : bool
var can_subscribe : bool
var can_update_metadata : bool
var hidden : bool
class WorkerType (*args, **kwds)
Expand source code
class WorkerType(Enum):
    ROOM = "room"

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Ancestors

  • enum.Enum

Class variables

var ROOM