Module agents.job
Functions
def get_current_job_context() ‑> JobContext | None-
Expand source code
def get_current_job_context() -> Optional["JobContext"]: """Get the current job context (used by pipeline constructors)""" return _current_job_context.get()Get the current job context (used by pipeline constructors)
Classes
class JobAcceptArguments (identity: str, name: str, metadata: str = '')-
Expand source code
@dataclass class JobAcceptArguments: """Arguments for accepting a job.""" identity: str name: str metadata: str = ""Arguments for accepting a job.
Instance variables
var identity : strvar metadata : strvar name : str
class JobContext (*,
room_options: RoomOptions,
metadata: dict | None = None,
loop: asyncio.events.AbstractEventLoop | None = None)-
Expand source code
class JobContext: def __init__( self, *, room_options: RoomOptions, metadata: Optional[dict] = None, loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: self.room_options = room_options self.metadata = metadata or {} self._loop = loop or asyncio.get_event_loop() self._pipeline: Optional[Pipeline] = None self.videosdk_auth = self.room_options.auth_token or os.getenv( "VIDEOSDK_AUTH_TOKEN" ) self.room: Optional[VideoSDKHandler] = None self._shutdown_callbacks: list[Callable[[], Coroutine[None, None, None]]] = [] self._is_shutting_down: bool = False self.want_console = len(sys.argv) > 1 and sys.argv[1].lower() == "console" def _set_pipeline_internal(self, pipeline: Any) -> None: """Internal method called by pipeline constructors""" self._pipeline = pipeline if self.room: self.room.pipeline = pipeline if hasattr(pipeline, "_set_loop_and_audio_track"): pipeline._set_loop_and_audio_track(self._loop, self.room.audio_track) # Ensure our lambda function fix is preserved after pipeline setup # This prevents the pipeline from overriding our event handlers if hasattr(self.room, "meeting") and self.room.meeting: # Re-apply our lambda function fix to ensure it's not overridden self.room.meeting.add_event_listener( self.room._create_meeting_handler() ) async def connect(self) -> None: """Connect to the room""" if self.room_options: if not self.room_options.room_id: self.room_options.room_id = self.get_room_id() custom_camera_video_track = None custom_microphone_audio_track = None sinks = [] avatar = self.room_options.avatar if not avatar and self._pipeline and hasattr(self._pipeline, "avatar"): avatar = self._pipeline.avatar if avatar: await avatar.connect() custom_camera_video_track = avatar.video_track custom_microphone_audio_track = avatar.audio_track sinks.append(avatar) if self.want_console: from .console_mode import setup_console_voice_for_ctx if not self._pipeline: raise RuntimeError( "Pipeline must be constructed before ctx.connect() in console mode" ) cleanup_callback = await setup_console_voice_for_ctx(self) self.add_shutdown_callback(cleanup_callback) else: if self.room_options.join_meeting: self.room = VideoSDKHandler( meeting_id=self.room_options.room_id, auth_token=self.videosdk_auth, name=self.room_options.name, agent_participant_id=self.room_options.agent_participant_id, pipeline=self._pipeline, loop=self._loop, vision=self.room_options.vision, recording=self.room_options.recording, custom_camera_video_track=custom_camera_video_track, custom_microphone_audio_track=custom_microphone_audio_track, audio_sinks=sinks, background_audio=self.room_options.background_audio, on_room_error=self.room_options.on_room_error, auto_end_session=self.room_options.auto_end_session, session_timeout_seconds=self.room_options.session_timeout_seconds, signaling_base_url=self.room_options.signaling_base_url, ) if self._pipeline and hasattr( self._pipeline, "_set_loop_and_audio_track" ): self._pipeline._set_loop_and_audio_track( self._loop, self.room.audio_track ) if self.room and self.room_options.join_meeting: self.room.init_meeting() await self.room.join() if ( self.room_options.playground and self.room_options.join_meeting and not self.want_console ): if self.videosdk_auth: playground_url = f"https://playground.videosdk.live?token={self.videosdk_auth}&meetingId={self.room_options.room_id}" print(f"\033[1;36m" + "Agent started in playground mode" + "\033[0m") print("\033[1;75m" + "Interact with agent here at:" + "\033[0m") print("\033[1;4;94m" + playground_url + "\033[0m") else: raise ValueError("VIDEOSDK_AUTH_TOKEN environment variable not found") async def shutdown(self) -> None: """Called by Worker during graceful shutdown""" if self._is_shutting_down: logger.info("JobContext already shutting down") return self._is_shutting_down = True logger.info("JobContext shutting down") for callback in self._shutdown_callbacks: try: await callback() except Exception as e: logger.error(f"Error in shutdown callback: {e}") if self._pipeline: try: await self._pipeline.cleanup() except Exception as e: logger.error(f"Error during pipeline cleanup: {e}") self._pipeline = None if self.room: try: if not getattr(self.room, "_left", False): await self.room.leave() else: logger.info("Room already left, skipping room.leave()") except Exception as e: logger.error(f"Error during room leave: {e}") try: if hasattr(self.room, "cleanup"): await self.room.cleanup() except Exception as e: logger.error(f"Error during room cleanup: {e}") self.room = None self.room_options = None self._loop = None self.videosdk_auth = None self._shutdown_callbacks.clear() logger.info("JobContext cleaned up") def add_shutdown_callback( self, callback: Callable[[], Coroutine[None, None, None]] ) -> None: """Add a callback to be called during shutdown""" self._shutdown_callbacks.append(callback) async def wait_for_participant(self, participant_id: str | None = None) -> str: if self.room: return await self.room.wait_for_participant(participant_id) else: raise ValueError("Room not initialized") async def run_until_shutdown( self, session: Any = None, wait_for_participant: bool = False, ) -> None: """ Simplified helper that handles all cleanup boilerplate. This method: 1. Connects to the room 2. Sets up session end callbacks 3. Waits for participant (optional) 4. Starts the session 5. Waits for shutdown signal 6. Cleans up gracefully Args: session: AgentSession to manage (will call session.start() and session.close()) wait_for_participant: Whether to wait for a participant before starting Example: ```python async def entrypoint(ctx: JobContext): session = AgentSession(agent=agent, pipeline=pipeline) await ctx.run_until_shutdown(session=session, wait_for_participant=True) ``` """ shutdown_event = asyncio.Event() if session: async def cleanup_session(): logger.info("Cleaning up session...") try: await session.close() except Exception as e: logger.error(f"Error closing session in cleanup: {e}") shutdown_event.set() self.add_shutdown_callback(cleanup_session) else: async def cleanup_no_session(): logger.info("Shutdown called, no session to clean up") shutdown_event.set() self.add_shutdown_callback(cleanup_no_session) def on_session_end(reason: str): logger.info(f"Session ended: {reason}") asyncio.create_task(self.shutdown()) try: try: await self.connect() except Exception as e: logger.error(f"Error connecting to room: {e}") raise if self.room: try: self.room.setup_session_end_callback(on_session_end) logger.info("Session end callback configured") except Exception as e: logger.warning(f"Error setting up session end callback: {e}") else: logger.warning( "Room not available, session end callback not configured" ) if wait_for_participant and self.room: try: logger.info("Waiting for participant...") await self.room.wait_for_participant() logger.info("Participant joined") except Exception as e: logger.error(f"Error waiting for participant: {e}") raise if session: try: await session.start() logger.info("Agent session started") except Exception as e: logger.error(f"Error starting session: {e}") raise logger.info( "Agent is running... (will exit when session ends or on interrupt)" ) await shutdown_event.wait() logger.info("Shutdown event received, exiting gracefully...") except KeyboardInterrupt: logger.info("Keyboard interrupt received, shutting down...") except Exception as e: logger.error(f"Unexpected error in run_until_shutdown: {e}") raise finally: if session: try: await session.close() except Exception as e: logger.error(f"Error closing session in finally: {e}") try: await self.shutdown() except Exception as e: logger.error(f"Error in ctx.shutdown: {e}") def get_room_id(self) -> str: """ Creates a new room using the VideoSDK API and returns the room ID. Raises: ValueError: If the VIDEOSDK_AUTH_TOKEN is missing. RuntimeError: If the API request fails or the response is invalid. """ if self.want_console: return None if self.videosdk_auth: url = "https://api.videosdk.live/v2/rooms" headers = {"Authorization": self.videosdk_auth} try: response = requests.post(url, headers=headers) response.raise_for_status() except requests.RequestException as e: raise RuntimeError(f"Failed to create room: {e}") from e data = response.json() room_id = data.get("roomId") if not room_id: raise RuntimeError(f"Unexpected API response, missing roomId: {data}") return room_id else: raise ValueError( "VIDEOSDK_AUTH_TOKEN not found. " "Set it as an environment variable or provide it in room options via auth_token." )Methods
def add_shutdown_callback(self, callback: Callable[[], Coroutine[None, None, None]]) ‑> None-
Expand source code
def add_shutdown_callback( self, callback: Callable[[], Coroutine[None, None, None]] ) -> None: """Add a callback to be called during shutdown""" self._shutdown_callbacks.append(callback)Add a callback to be called during shutdown
async def connect(self) ‑> None-
Expand source code
async def connect(self) -> None: """Connect to the room""" if self.room_options: if not self.room_options.room_id: self.room_options.room_id = self.get_room_id() custom_camera_video_track = None custom_microphone_audio_track = None sinks = [] avatar = self.room_options.avatar if not avatar and self._pipeline and hasattr(self._pipeline, "avatar"): avatar = self._pipeline.avatar if avatar: await avatar.connect() custom_camera_video_track = avatar.video_track custom_microphone_audio_track = avatar.audio_track sinks.append(avatar) if self.want_console: from .console_mode import setup_console_voice_for_ctx if not self._pipeline: raise RuntimeError( "Pipeline must be constructed before ctx.connect() in console mode" ) cleanup_callback = await setup_console_voice_for_ctx(self) self.add_shutdown_callback(cleanup_callback) else: if self.room_options.join_meeting: self.room = VideoSDKHandler( meeting_id=self.room_options.room_id, auth_token=self.videosdk_auth, name=self.room_options.name, agent_participant_id=self.room_options.agent_participant_id, pipeline=self._pipeline, loop=self._loop, vision=self.room_options.vision, recording=self.room_options.recording, custom_camera_video_track=custom_camera_video_track, custom_microphone_audio_track=custom_microphone_audio_track, audio_sinks=sinks, background_audio=self.room_options.background_audio, on_room_error=self.room_options.on_room_error, auto_end_session=self.room_options.auto_end_session, session_timeout_seconds=self.room_options.session_timeout_seconds, signaling_base_url=self.room_options.signaling_base_url, ) if self._pipeline and hasattr( self._pipeline, "_set_loop_and_audio_track" ): self._pipeline._set_loop_and_audio_track( self._loop, self.room.audio_track ) if self.room and self.room_options.join_meeting: self.room.init_meeting() await self.room.join() if ( self.room_options.playground and self.room_options.join_meeting and not self.want_console ): if self.videosdk_auth: playground_url = f"https://playground.videosdk.live?token={self.videosdk_auth}&meetingId={self.room_options.room_id}" print(f"\033[1;36m" + "Agent started in playground mode" + "\033[0m") print("\033[1;75m" + "Interact with agent here at:" + "\033[0m") print("\033[1;4;94m" + playground_url + "\033[0m") else: raise ValueError("VIDEOSDK_AUTH_TOKEN environment variable not found")Connect to the room
def get_room_id(self) ‑> str-
Expand source code
def get_room_id(self) -> str: """ Creates a new room using the VideoSDK API and returns the room ID. Raises: ValueError: If the VIDEOSDK_AUTH_TOKEN is missing. RuntimeError: If the API request fails or the response is invalid. """ if self.want_console: return None if self.videosdk_auth: url = "https://api.videosdk.live/v2/rooms" headers = {"Authorization": self.videosdk_auth} try: response = requests.post(url, headers=headers) response.raise_for_status() except requests.RequestException as e: raise RuntimeError(f"Failed to create room: {e}") from e data = response.json() room_id = data.get("roomId") if not room_id: raise RuntimeError(f"Unexpected API response, missing roomId: {data}") return room_id else: raise ValueError( "VIDEOSDK_AUTH_TOKEN not found. " "Set it as an environment variable or provide it in room options via auth_token." )Creates a new room using the VideoSDK API and returns the room ID.
Raises
ValueError- If the VIDEOSDK_AUTH_TOKEN is missing.
RuntimeError- If the API request fails or the response is invalid.
async def run_until_shutdown(self, session: Any = None, wait_for_participant: bool = False) ‑> None-
Expand source code
async def run_until_shutdown( self, session: Any = None, wait_for_participant: bool = False, ) -> None: """ Simplified helper that handles all cleanup boilerplate. This method: 1. Connects to the room 2. Sets up session end callbacks 3. Waits for participant (optional) 4. Starts the session 5. Waits for shutdown signal 6. Cleans up gracefully Args: session: AgentSession to manage (will call session.start() and session.close()) wait_for_participant: Whether to wait for a participant before starting Example: ```python async def entrypoint(ctx: JobContext): session = AgentSession(agent=agent, pipeline=pipeline) await ctx.run_until_shutdown(session=session, wait_for_participant=True) ``` """ shutdown_event = asyncio.Event() if session: async def cleanup_session(): logger.info("Cleaning up session...") try: await session.close() except Exception as e: logger.error(f"Error closing session in cleanup: {e}") shutdown_event.set() self.add_shutdown_callback(cleanup_session) else: async def cleanup_no_session(): logger.info("Shutdown called, no session to clean up") shutdown_event.set() self.add_shutdown_callback(cleanup_no_session) def on_session_end(reason: str): logger.info(f"Session ended: {reason}") asyncio.create_task(self.shutdown()) try: try: await self.connect() except Exception as e: logger.error(f"Error connecting to room: {e}") raise if self.room: try: self.room.setup_session_end_callback(on_session_end) logger.info("Session end callback configured") except Exception as e: logger.warning(f"Error setting up session end callback: {e}") else: logger.warning( "Room not available, session end callback not configured" ) if wait_for_participant and self.room: try: logger.info("Waiting for participant...") await self.room.wait_for_participant() logger.info("Participant joined") except Exception as e: logger.error(f"Error waiting for participant: {e}") raise if session: try: await session.start() logger.info("Agent session started") except Exception as e: logger.error(f"Error starting session: {e}") raise logger.info( "Agent is running... (will exit when session ends or on interrupt)" ) await shutdown_event.wait() logger.info("Shutdown event received, exiting gracefully...") except KeyboardInterrupt: logger.info("Keyboard interrupt received, shutting down...") except Exception as e: logger.error(f"Unexpected error in run_until_shutdown: {e}") raise finally: if session: try: await session.close() except Exception as e: logger.error(f"Error closing session in finally: {e}") try: await self.shutdown() except Exception as e: logger.error(f"Error in ctx.shutdown: {e}")Simplified helper that handles all cleanup boilerplate.
This method: 1. Connects to the room 2. Sets up session end callbacks 3. Waits for participant (optional) 4. Starts the session 5. Waits for shutdown signal 6. Cleans up gracefully
Args
session- AgentSession to manage (will call session.start() and session.close())
wait_for_participant- Whether to wait for a participant before starting
Example
async def entrypoint(ctx: JobContext): session = AgentSession(agent=agent, pipeline=pipeline) await ctx.run_until_shutdown(session=session, wait_for_participant=True) async def shutdown(self) ‑> None-
Expand source code
async def shutdown(self) -> None: """Called by Worker during graceful shutdown""" if self._is_shutting_down: logger.info("JobContext already shutting down") return self._is_shutting_down = True logger.info("JobContext shutting down") for callback in self._shutdown_callbacks: try: await callback() except Exception as e: logger.error(f"Error in shutdown callback: {e}") if self._pipeline: try: await self._pipeline.cleanup() except Exception as e: logger.error(f"Error during pipeline cleanup: {e}") self._pipeline = None if self.room: try: if not getattr(self.room, "_left", False): await self.room.leave() else: logger.info("Room already left, skipping room.leave()") except Exception as e: logger.error(f"Error during room leave: {e}") try: if hasattr(self.room, "cleanup"): await self.room.cleanup() except Exception as e: logger.error(f"Error during room cleanup: {e}") self.room = None self.room_options = None self._loop = None self.videosdk_auth = None self._shutdown_callbacks.clear() logger.info("JobContext cleaned up")Called by Worker during graceful shutdown
async def wait_for_participant(self, participant_id: str | None = None) ‑> str-
Expand source code
async def wait_for_participant(self, participant_id: str | None = None) -> str: if self.room: return await self.room.wait_for_participant(participant_id) else: raise ValueError("Room not initialized")
class JobExecutorType (*args, **kwds)-
Expand source code
@unique class JobExecutorType(Enum): PROCESS = "process" THREAD = "thread"Create a collection of name/value pairs.
Example enumeration:
>>> class Color(Enum): ... RED = 1 ... BLUE = 2 ... GREEN = 3Access them by:
- attribute access:
Color.RED
- value lookup:
Color(1)
- name lookup:
Color['RED']
Enumerations can be iterated over, and know how many members they have:
>>> len(Color) 3>>> list(Color) [<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.
Ancestors
- enum.Enum
Class variables
var PROCESSvar THREAD
class Options (executor_type: Any = None,
num_idle_processes: int = 1,
initialize_timeout: float = 10.0,
close_timeout: float = 60.0,
memory_warn_mb: float = 500.0,
memory_limit_mb: float = 0.0,
ping_interval: float = 30.0,
max_processes: int = 1,
agent_id: str = 'VideoSDKAgent',
auth_token: str | None = None,
permissions: Any = None,
max_retry: int = 16,
load_threshold: float = 0.75,
register: bool = False,
signaling_base_url: str = 'api.videosdk.live',
host: str = '0.0.0.0',
port: int = 8081,
log_level: str = 'INFO')-
Expand source code
@dataclass class Options: """Configuration options for WorkerJob execution.""" executor_type: Any = None # Will be set in __post_init__ """Which executor to use to run jobs. Automatically selected based on platform.""" num_idle_processes: int = 1 """Number of idle processes/threads to keep warm.""" initialize_timeout: float = 10.0 """Maximum amount of time to wait for a process/thread to initialize/prewarm""" close_timeout: float = 60.0 """Maximum amount of time to wait for a job to shut down gracefully""" memory_warn_mb: float = 500.0 """Memory warning threshold in MB.""" memory_limit_mb: float = 0.0 """Maximum memory usage for a job in MB. Defaults to 0 (disabled).""" ping_interval: float = 30.0 """Interval between health check pings.""" max_processes: int = 1 """Maximum number of processes/threads.""" agent_id: str = "VideoSDKAgent" """ID of the agent.""" auth_token: Optional[str] = None """VideoSDK authentication token. Uses VIDEOSDK_AUTH_TOKEN env var if not provided.""" permissions: Any = None # Will be set in __post_init__ """Permissions for the agent participant.""" max_retry: int = 16 """Maximum number of times to retry connecting to VideoSDK.""" load_threshold: float = 0.75 """Load threshold above which worker is marked as unavailable.""" register: bool = False """Whether to register with the backend. Defaults to False for local development.""" signaling_base_url: str = "api.videosdk.live" """Signaling base URL for VideoSDK services. Defaults to api.videosdk.live.""" host: str = "0.0.0.0" """Host for the debug HTTP server.""" port: int = 8081 """Port for the debug HTTP server.""" log_level: str = "INFO" """Log level for SDK logging. Options: DEBUG, INFO, WARNING, ERROR. Defaults to INFO.""" def __post_init__(self): """Post-initialization setup.""" # Import here to avoid circular imports from .worker import ExecutorType, WorkerPermissions, _default_executor_type if self.executor_type is None: self.executor_type = _default_executor_type if self.permissions is None: self.permissions = WorkerPermissions() if not self.auth_token: self.auth_token = os.getenv("VIDEOSDK_AUTH_TOKEN")Configuration options for WorkerJob execution.
Instance variables
var agent_id : str-
ID of the agent.
var auth_token : str | None-
VideoSDK authentication token. Uses VIDEOSDK_AUTH_TOKEN env var if not provided.
var close_timeout : float-
Maximum amount of time to wait for a job to shut down gracefully
var executor_type : Any-
Which executor to use to run jobs. Automatically selected based on platform.
var host : str-
Host for the debug HTTP server.
var initialize_timeout : float-
Maximum amount of time to wait for a process/thread to initialize/prewarm
var load_threshold : float-
Load threshold above which worker is marked as unavailable.
var log_level : str-
Log level for SDK logging. Options: DEBUG, INFO, WARNING, ERROR. Defaults to INFO.
var max_processes : int-
Maximum number of processes/threads.
var max_retry : int-
Maximum number of times to retry connecting to VideoSDK.
var memory_limit_mb : float-
Maximum memory usage for a job in MB. Defaults to 0 (disabled).
var memory_warn_mb : float-
Memory warning threshold in MB.
var num_idle_processes : int-
Number of idle processes/threads to keep warm.
var permissions : Any-
Permissions for the agent participant.
var ping_interval : float-
Interval between health check pings.
var port : int-
Port for the debug HTTP server.
var register : bool-
Whether to register with the backend. Defaults to False for local development.
var signaling_base_url : str-
Signaling base URL for VideoSDK services. Defaults to api.videosdk.live.
class RoomOptions (room_id: str | None = None,
auth_token: str | None = None,
name: str | None = 'Agent',
agent_participant_id: str | None = None,
playground: bool = True,
vision: bool = False,
recording: bool = False,
avatar: Any | None = None,
join_meeting: bool | None = True,
on_room_error: Callable[[Any], None] | None = None,
auto_end_session: bool = True,
session_timeout_seconds: int | None = 5,
signaling_base_url: str | None = None,
background_audio: bool = False)-
Expand source code
@dataclass class RoomOptions: room_id: Optional[str] = None auth_token: Optional[str] = None name: Optional[str] = "Agent" agent_participant_id: Optional[str] = None playground: bool = True vision: bool = False recording: bool = False avatar: Optional[Any] = None join_meeting: Optional[bool] = True on_room_error: Optional[Callable[[Any], None]] = None # Session management options auto_end_session: bool = True session_timeout_seconds: Optional[int] = 5 # VideoSDK connection options signaling_base_url: Optional[str] = None background_audio: bool = FalseRoomOptions(room_id: Optional[str] = None, auth_token: Optional[str] = None, name: Optional[str] = 'Agent', agent_participant_id: Optional[str] = None, playground: bool = True, vision: bool = False, recording: bool = False, avatar: Optional[Any] = None, join_meeting: Optional[bool] = True, on_room_error: Optional[Callable[[Any], NoneType]] = None, auto_end_session: bool = True, session_timeout_seconds: Optional[int] = 5, signaling_base_url: Optional[str] = None, background_audio: bool = False)
Instance variables
var agent_participant_id : str | Nonevar auth_token : str | Nonevar auto_end_session : boolvar avatar : Any | Nonevar background_audio : boolvar join_meeting : bool | Nonevar name : str | Nonevar on_room_error : Callable[[Any], None] | Nonevar playground : boolvar recording : boolvar room_id : str | Nonevar session_timeout_seconds : int | Nonevar signaling_base_url : str | Nonevar vision : bool
class RunningJobInfo (accept_arguments: JobAcceptArguments,
job: JobContext,
url: str,
token: str,
worker_id: str)-
Expand source code
@dataclass class RunningJobInfo: """Information about a running job.""" accept_arguments: JobAcceptArguments job: JobContext url: str token: str worker_id: str async def _run(self): # Placeholder for job execution logic if needed in the future passInformation about a running job.
Instance variables
var accept_arguments : JobAcceptArgumentsvar job : JobContextvar token : strvar url : strvar worker_id : str
class WorkerJob (entrypoint,
jobctx=None,
options: Options | None = None)-
Expand source code
class WorkerJob: def __init__(self, entrypoint, jobctx=None, options: Optional[Options] = None): """ :param entrypoint: An async function accepting one argument: jobctx :param jobctx: A static object or a callable that returns a context per job :param options: Configuration options for job execution """ if not asyncio.iscoroutinefunction(entrypoint): raise TypeError("entrypoint must be a coroutine function") self.entrypoint = entrypoint self.jobctx = jobctx self.options = options or Options() def start(self): from .worker import Worker, WorkerOptions # Convert JobOptions to WorkerOptions for compatibility worker_options = WorkerOptions( entrypoint_fnc=self.entrypoint, agent_id=self.options.agent_id, auth_token=self.options.auth_token, executor_type=self.options.executor_type, num_idle_processes=self.options.num_idle_processes, initialize_timeout=self.options.initialize_timeout, close_timeout=self.options.close_timeout, memory_warn_mb=self.options.memory_warn_mb, memory_limit_mb=self.options.memory_limit_mb, ping_interval=self.options.ping_interval, max_processes=self.options.max_processes, permissions=self.options.permissions, max_retry=self.options.max_retry, load_threshold=self.options.load_threshold, register=self.options.register, signaling_base_url=self.options.signaling_base_url, host=self.options.host, port=self.options.port, log_level=self.options.log_level, ) # If register=True, run the worker in backend mode (don't execute entrypoint immediately) if self.options.register: default_room_options = None if self.jobctx: if callable(self.jobctx): job_context = self.jobctx() else: job_context = self.jobctx default_room_options = job_context.room_options # Run the worker normally (for backend registration mode) Worker.run_worker( options=worker_options, default_room_options=default_room_options ) else: # Direct mode - run entrypoint immediately if we have a job context if self.jobctx: if callable(self.jobctx): job_context = self.jobctx() else: job_context = self.jobctx # Set the current job context and run the entrypoint token = _set_current_job_context(job_context) try: asyncio.run(self.entrypoint(job_context)) finally: _reset_current_job_context(token) else: # No job context provided, run worker normally Worker.run_worker(worker_options):param entrypoint: An async function accepting one argument: jobctx :param jobctx: A static object or a callable that returns a context per job :param options: Configuration options for job execution
Methods
def start(self)-
Expand source code
def start(self): from .worker import Worker, WorkerOptions # Convert JobOptions to WorkerOptions for compatibility worker_options = WorkerOptions( entrypoint_fnc=self.entrypoint, agent_id=self.options.agent_id, auth_token=self.options.auth_token, executor_type=self.options.executor_type, num_idle_processes=self.options.num_idle_processes, initialize_timeout=self.options.initialize_timeout, close_timeout=self.options.close_timeout, memory_warn_mb=self.options.memory_warn_mb, memory_limit_mb=self.options.memory_limit_mb, ping_interval=self.options.ping_interval, max_processes=self.options.max_processes, permissions=self.options.permissions, max_retry=self.options.max_retry, load_threshold=self.options.load_threshold, register=self.options.register, signaling_base_url=self.options.signaling_base_url, host=self.options.host, port=self.options.port, log_level=self.options.log_level, ) # If register=True, run the worker in backend mode (don't execute entrypoint immediately) if self.options.register: default_room_options = None if self.jobctx: if callable(self.jobctx): job_context = self.jobctx() else: job_context = self.jobctx default_room_options = job_context.room_options # Run the worker normally (for backend registration mode) Worker.run_worker( options=worker_options, default_room_options=default_room_options ) else: # Direct mode - run entrypoint immediately if we have a job context if self.jobctx: if callable(self.jobctx): job_context = self.jobctx() else: job_context = self.jobctx # Set the current job context and run the entrypoint token = _set_current_job_context(job_context) try: asyncio.run(self.entrypoint(job_context)) finally: _reset_current_job_context(token) else: # No job context provided, run worker normally Worker.run_worker(worker_options)