Module agents.job

Functions

def get_current_job_context() ‑> JobContext | None
Expand source code
def get_current_job_context() -> Optional["JobContext"]:
    """Get the current job context (used by pipeline constructors)"""
    return _current_job_context.get()

Get the current job context (used by pipeline constructors)

def resolve_video_sdk_recording(room_options: RoomOptions) ‑> tuple[bool | None, bool]
Expand source code
def resolve_video_sdk_recording(
    room_options: "RoomOptions",
) -> tuple[Optional[bool], bool]:
    """
    Map RoomOptions recording fields to VideoSDKHandler inputs.

    Returns:
        (record_audio, record_screen_share)
        - record_audio: None → participant recording (audio+video composite API);
          True → track recording, kind=audio only.
        - record_screen_share: whether to start screen_* track recording APIs.
    """
    if not room_options.recording:
        return None, False

    ro = room_options.recording_options
    if ro is None:
        return True, False

    if ro.video:
        return None, False
    if ro.screen_share:
        return True, True
    return True, False

Map RoomOptions recording fields to VideoSDKHandler inputs.

Returns

(record_audio, record_screen_share) - record_audio: None → participant recording (audio+video composite API); True → track recording, kind=audio only. - record_screen_share: whether to start screen_* track recording APIs.

def validate_room_options_recording(room_options: RoomOptions) ‑> None
Expand source code
def validate_room_options_recording(room_options: "RoomOptions") -> None:
    """Raise ValueError if recording-related options are inconsistent."""
    if not room_options.recording:
        return
    ro = room_options.recording_options
    if isinstance(ro, dict):
        room_options.recording_options = _coerce_recording_options_dict(ro)
        ro = room_options.recording_options
    if ro is None:
        return
    if ro.screen_share and not room_options.vision:
        raise ValueError(
            "RoomOptions: recording_options.screen_share=True requires vision=True "
            "(vision subscribes to video/share streams required for screen recording)."
        )

Raise ValueError if recording-related options are inconsistent.

Classes

class JobAcceptArguments (identity: str, name: str, metadata: str = '')
Expand source code
@dataclass
class JobAcceptArguments:
    """Holds identity, name, and metadata used when accepting a job from the worker pool."""

    identity: str
    name: str
    metadata: str = ""

Holds identity, name, and metadata used when accepting a job from the worker pool.

Instance variables

var identity : str
var metadata : str
var name : str
class JobContext (*,
room_options: RoomOptions,
metadata: dict | None = None,
loop: asyncio.events.AbstractEventLoop | None = None)
Expand source code
class JobContext:
    """Holds the runtime state for a single job, including room connection, pipeline, and shutdown lifecycle management."""

    def __init__(
        self,
        *,
        room_options: RoomOptions,
        metadata: Optional[dict] = None,
        loop: Optional[asyncio.AbstractEventLoop] = None,
    ) -> None:
        self.room_options = room_options
        self.metadata = metadata or {}
        self._loop = loop or asyncio.get_event_loop()
        self._pipeline: Optional["Pipeline"] = None
        self.videosdk_auth = self.room_options.auth_token or os.getenv(
            "VIDEOSDK_AUTH_TOKEN"
        )
        self.room: Optional["BaseTransportHandler"] = None
        self._shutdown_callbacks: list[Callable[[], Coroutine[None, None, None]]] = []
        self._is_shutting_down: bool = False
        self._meeting_joined_event: asyncio.Event = asyncio.Event()
        self._wait_for_meeting_join: bool = False
        self.want_console = len(sys.argv) > 1 and sys.argv[1].lower() == "console"
        self.playground_manager: Optional["PlaygroundManager"] = None
        
        from .metrics import metrics_collector
        self.metrics_collector = metrics_collector
        
        self._log_manager = None
        self._job_logger = None
        
    def _set_pipeline_internal(self, pipeline: Any) -> None:
        """Internal method called by pipeline constructors"""
        self._pipeline = pipeline
        if self.room:
            self.room.pipeline = pipeline
            if hasattr(pipeline, "_set_loop_and_audio_track"):
                pipeline._set_loop_and_audio_track(self._loop, self.room.audio_track)

            # Ensure our lambda function fix is preserved after pipeline setup
            # This prevents the pipeline from overriding our event handlers
            if hasattr(self.room, "meeting") and self.room.meeting:
                # Re-apply our lambda function fix to ensure it's not overridden
                self.room.meeting.add_event_listener(
                    self.room._create_meeting_handler()
                )

    async def connect(self) -> None:
        """Connect to the room"""
        if self.room_options:
            custom_camera_video_track = None
            custom_microphone_audio_track = None
            sinks = []

            avatar = self.room_options.avatar
            if not avatar and self._pipeline and hasattr(self._pipeline, "avatar"):
                avatar = self._pipeline.avatar

            if avatar:
                if not self.room_options.room_id:
                    self.room_options.room_id = self.get_room_id()
                room_id = self.room_options.room_id

                from .avatar import AvatarAudioOut, generate_avatar_credentials

                if isinstance(avatar, AvatarAudioOut):
                    avatar.set_room_id(room_id)
                    await avatar.connect()
                    audio_out = avatar
                else:
                    _api_key = os.getenv("VIDEOSDK_API_KEY")
                    _secret_key = os.getenv("VIDEOSDK_SECRET_KEY")
                    credentials = generate_avatar_credentials(
                        _api_key, _secret_key, participant_id=avatar.participant_id
                    )
                    await avatar.connect(room_id, credentials.token)
                    audio_out = AvatarAudioOut(credentials=credentials, room_id=room_id)
                    await audio_out.connect()  # no-op (no dispatcher_url)

                custom_camera_video_track = getattr(avatar, 'video_track', None)
                custom_microphone_audio_track = getattr(avatar, 'audio_track', None)
                sinks.append(audio_out)
                self._cloud_avatar = avatar if not isinstance(avatar, AvatarAudioOut) else None
                self._avatar_audio_out = audio_out
                if self._pipeline:
                    self._pipeline.avatar = audio_out

            if self.want_console:
                from .console_mode import setup_console_voice_for_ctx

                if not self._pipeline:
                    raise RuntimeError(
                        "Pipeline must be constructed before ctx.connect() in console mode"
                    )
                cleanup_callback = await setup_console_voice_for_ctx(self)
                self.add_shutdown_callback(cleanup_callback)
            else:
                self.metrics_collector.transport_mode = self.room_options.transport_mode
                self.metrics_collector.analytics_client.configure(self.room_options.metrics)
                if self.room_options.transport_mode == TransportMode.VIDEOSDK:
                    from .room.room import VideoSDKHandler
                    
                    if not self.room_options.room_id:
                        self.room_options.room_id = self.get_room_id()
                    if self.room_options.send_logs_to_dashboard or (self.room_options.logs and self.room_options.logs.enabled):
                        from .metrics.logger_handler import LogManager, JobLogger
                        self._log_manager = LogManager()
                        self._log_manager.start(auth_token=self.videosdk_auth or "")
                        self._job_logger = JobLogger(
                            queue=self._log_manager.get_queue(),
                            room_id=self.room_options.room_id or "",
                            peer_id=self.room_options.agent_participant_id or "agent",
                            auth_token=self.videosdk_auth or "",
                            dashboard_log_level=self.room_options.dashboard_log_level if not (self.room_options.logs and self.room_options.logs.level) else self.room_options.logs.level,
                            send_logs_to_dashboard=True,
                        )

                    if self.room_options.join_meeting:
                        validate_room_options_recording(self.room_options)
                        record_audio_resolved, record_screen_share = resolve_video_sdk_recording(
                            self.room_options
                        )
                        agent_id = self._pipeline.agent.id if self._pipeline and hasattr(self._pipeline, 'agent') else None
                        self.room = VideoSDKHandler(
                            meeting_id=self.room_options.room_id,
                            auth_token=self.videosdk_auth,
                            name=self.room_options.name,
                            agent_participant_id=self.room_options.agent_participant_id,
                            agent_id=agent_id,
                            pipeline=self._pipeline,
                            loop=self._loop,
                            vision=self.room_options.vision,
                            recording=self.room_options.recording,
                            record_audio=record_audio_resolved,
                            record_screen_share=record_screen_share,
                            custom_camera_video_track=custom_camera_video_track,
                            custom_microphone_audio_track=custom_microphone_audio_track,
                            audio_sinks=sinks,
                            background_audio=self.room_options.background_audio,
                            on_room_error=self.room_options.on_room_error,
                            auto_end_session=self.room_options.auto_end_session,
                            session_timeout_seconds=self.room_options.session_timeout_seconds,
                            no_participant_timeout_seconds=self.room_options.no_participant_timeout_seconds,
                            signaling_base_url=self.room_options.signaling_base_url,
                            job_logger=self._job_logger,
                            traces_options=self.room_options.traces,
                            metrics_options=self.room_options.metrics,
                            logs_options=self.room_options.logs,
                            avatar_participant_id=avatar.participant_id if avatar and hasattr(avatar, 'participant_id') else None,
                        )
                    if self._pipeline and hasattr(
                        self._pipeline, "_set_loop_and_audio_track"
                    ):
                        self._pipeline._set_loop_and_audio_track(
                            self._loop, self.room.audio_track
                        )

                elif self.room_options.transport_mode == TransportMode.WEBSOCKET:
                    if not self.room_options.websocket:
                        raise ValueError("WebSocket configuration (websocket) is required when mode is WEBSOCKET")
                    
                    if self.room_options.webrtc and (self.room_options.webrtc.signaling_url or self.room_options.webrtc.ice_servers != [{"urls": "stun:stun.l.google.com:19302"}]):
                        logger.warning("WebRTC configuration provided but transport mode is set to WEBSOCKET. WebRTC config will be ignored.")

                    from .transports.websocket_handler import WebSocketTransportHandler
                    self.room = WebSocketTransportHandler(
                        loop=self._loop,
                        pipeline=self._pipeline,
                        port=self.room_options.websocket.port,
                        path=self.room_options.websocket.path
                    )
                elif self.room_options.transport_mode == TransportMode.WEBRTC:
                    if not self.room_options.webrtc:
                        raise ValueError("WebRTC configuration (webrtc) is required when mode is WEBRTC")
                    
                    if not self.room_options.webrtc.signaling_url:
                        raise ValueError("WebRTC signaling_url is required when mode is WEBRTC")

                    if self.room_options.websocket and (self.room_options.websocket.port != 8080 or self.room_options.websocket.path != "/ws"):
                        logger.warning("WebSocket configuration provided but connection mode is set to WEBRTC. WebSocket config will be ignored.")

                    from .transports.webrtc_handler import WebRTCTransportHandler
                    self.room = WebRTCTransportHandler(
                        loop=self._loop,
                        pipeline=self._pipeline,
                        signaling_url=self.room_options.webrtc.signaling_url,
                        ice_servers=self.room_options.webrtc.ice_servers
                    )
                
                elif self.room_options.transport_mode == TransportMode.VIDEOSDK:
                    if self.room_options.websocket and (self.room_options.websocket.port != 8080 or self.room_options.websocket.path != "/ws"):
                         logger.warning("WebSocket configuration provided but transport mode is VIDEOSDK. WebSocket config will be ignored.")
                    if self.room_options.webrtc and (self.room_options.webrtc.signaling_url or self.room_options.webrtc.ice_servers != [{"urls": "stun:stun.l.google.com:19302"}]):
                         logger.warning("WebRTC configuration provided but transport mode is VIDEOSDK. WebRTC config will be ignored.")

        if self.room:
            await self.room.connect()

            # For Non-VideoSDK modes, we still need to ensure audio track is linked if not done inside constructor
            if (
                self.room_options.transport_mode != TransportMode.VIDEOSDK
                and self._pipeline
                and hasattr(self._pipeline, "_set_loop_and_audio_track")
            ):
                # BaseTransportHandler subclasses now initialize self.audio_track
                if self.room.audio_track:
                    self._pipeline._set_loop_and_audio_track(self._loop, self.room.audio_track)

        if (
            self.room_options.playground
            and self.room_options.join_meeting
            and not self.want_console
            and self.room_options.transport_mode == TransportMode.VIDEOSDK
        ):
            if self.videosdk_auth:
                playground_url = f"https://playground.videosdk.live?token={self.videosdk_auth}&meetingId={self.room_options.room_id}"
                print(f"\033[1;36m" + "Agent started in playground mode" + "\033[0m")
                print("\033[1;75m" + "Interact with agent here at:" + "\033[0m")
                print("\033[1;4;94m" + playground_url + "\033[0m")
            else:
                raise ValueError("VIDEOSDK_AUTH_TOKEN environment variable not found")

    async def shutdown(self) -> None:
        """Called by Worker during graceful shutdown"""
        if self._is_shutting_down:
            logger.info("JobContext already shutting down")
            return
        self._is_shutting_down = True
        logger.info("JobContext shutting down")
        for callback in self._shutdown_callbacks:
            try:
                await callback()
            except Exception as e:
                logger.error(f"Error in shutdown callback: {e}")

        if self._pipeline:
            try:
                await self._pipeline.cleanup()
            except Exception as e:
                logger.error(f"Error during pipeline cleanup: {e}")
            self._pipeline = None

        cloud_avatar = getattr(self, '_cloud_avatar', None)
        if cloud_avatar and hasattr(cloud_avatar, 'aclose'):
            try:
                await cloud_avatar.aclose()
            except Exception as e:
                logger.error(f"Error during cloud avatar aclose: {e}")
        audio_out = getattr(self, '_avatar_audio_out', None)
        if audio_out:
            try:
                await audio_out.aclose()
            except Exception as e:
                logger.error(f"Error during avatar audio_out aclose: {e}")

        if self._job_logger:
            try:
                self._job_logger.cleanup()
            except Exception as e:
                logger.error(f"Error during job logger cleanup: {e}")
            self._job_logger = None
        if self._log_manager:
            try:
                self._log_manager.stop()
            except Exception as e:
                logger.error(f"Error during log manager stop: {e}")
            self._log_manager = None

        if self.room:
            try:
                if not getattr(self.room, "_left", False):
                    await self.room.leave()
                else:
                    logger.info("Room already left, skipping room.leave()")
            except Exception as e:
                logger.error(f"Error during room leave: {e}")
            try:
                if hasattr(self.room, "cleanup"):
                    await self.room.cleanup()
            except Exception as e:
                logger.error(f"Error during room cleanup: {e}")
            self.room = None

        self.room_options = None
        self._loop = None
        self.videosdk_auth = None
        self._shutdown_callbacks.clear()
        logger.info("JobContext cleaned up")

    def add_shutdown_callback(
        self, callback: Callable[[], Coroutine[None, None, None]]
    ) -> None:
        """Add a callback to be called during shutdown"""
        self._shutdown_callbacks.append(callback)

    def notify_meeting_joined(self) -> None:
        """Called when the agent successfully joins the meeting."""
        self._meeting_joined_event.set()
        audio_out = getattr(self, '_avatar_audio_out', None)
        if audio_out and self.room and self.room.meeting:
            audio_out._set_meeting(self.room.meeting)

    async def wait_for_meeting_joined(self, timeout: float = 30.0) -> bool:
        """Wait until the meeting is joined or timeout. Returns True if joined."""
        try:
            await asyncio.wait_for(self._meeting_joined_event.wait(), timeout=timeout)
            return True
        except asyncio.TimeoutError:
            logger.warning(f"Timeout waiting for meeting join after {timeout}s")
            return False

    async def wait_for_participant(self, participant_id: str | None = None) -> str:
        if self.room:
            return await self.room.wait_for_participant(participant_id)
        else:
            raise ValueError("Room not initialized")

    async def run_until_shutdown(
        self,
        session: Any = None,
        wait_for_participant: bool = False,
    ) -> None:
        """
        Simplified helper that handles all cleanup boilerplate.

        This method:
        1. Connects to the room
        2. Sets up session end callbacks
        3. Waits for participant (optional)
        4. Starts the session
        5. Waits for shutdown signal
        6. Cleans up gracefully

        Args:
            session: AgentSession to manage (will call session.start() and session.close())
            wait_for_participant: Whether to wait for a participant before starting

        Example:
            ```python
            async def entrypoint(ctx: JobContext):
                session = AgentSession(agent=agent, pipeline=pipeline)
                await ctx.run_until_shutdown(session=session, wait_for_participant=True)
            ```
        """
        shutdown_event = asyncio.Event()

        if session:

            async def cleanup_session():
                logger.info("Cleaning up session...")
                try:
                    await session.close()
                except Exception as e:
                    logger.error(f"Error closing session in cleanup: {e}")
                shutdown_event.set()

            self.add_shutdown_callback(cleanup_session)
        else:

            async def cleanup_no_session():
                logger.info("Shutdown called, no session to clean up")
                shutdown_event.set()

            self.add_shutdown_callback(cleanup_no_session)

        def on_session_end(reason: str):
            logger.info(f"Session ended: {reason}")
            asyncio.create_task(self.shutdown())

        try:
            try:
                await self.connect()
            except Exception as e:
                logger.error(f"Error connecting to room: {e}")
                raise

            if self.room:
                try:
                    self.room.setup_session_end_callback(on_session_end)
                    logger.info("Session end callback configured")
                except Exception as e:
                    logger.warning(f"Error setting up session end callback: {e}")
            else:
                logger.warning(
                    "Room not available, session end callback not configured"
                )

            if wait_for_participant and self.room:
                try:
                    logger.info("Waiting for participant...")
                    participant_id = await self.room.wait_for_participant()
                    if participant_id is None:
                        logger.info("Session ended before any participant joined, shutting down")
                        return
                    logger.info("Participant joined")
                except Exception as e:
                    logger.error(f"Error waiting for participant: {e}")
                    raise

            if session:
                try:
                    await session.start()
                    logger.info("Agent session started")
                except Exception as e:
                    logger.error(f"Error starting session: {e}")
                    raise

            logger.info(
                "Agent is running... (will exit when session ends or on interrupt)"
            )
            await shutdown_event.wait()
            logger.info("Shutdown event received, exiting gracefully...")

        except KeyboardInterrupt:
            logger.info("Keyboard interrupt received, shutting down...")
        except Exception as e:
            logger.error(f"Unexpected error in run_until_shutdown: {e}")
            raise
        finally:
            if session:
                try:
                    await session.close()
                except Exception as e:
                    logger.error(f"Error closing session in finally: {e}")

            try:
                await self.shutdown()
            except Exception as e:
                logger.error(f"Error in ctx.shutdown: {e}")

    def get_room_id(self) -> str:
        """
        Creates a new room using the VideoSDK API and returns the room ID.

        Raises:
            ValueError: If the VIDEOSDK_AUTH_TOKEN is missing.
            RuntimeError: If the API request fails or the response is invalid.
        """
        if self.want_console:
            return None

        if self.videosdk_auth:
            base_url = self.room_options.signaling_base_url
            url = f"https://{base_url}/v2/rooms"
            headers = {"Authorization": self.videosdk_auth}

            try:
                response = requests.post(url, headers=headers)
                response.raise_for_status()
            except requests.RequestException as e:
                raise RuntimeError(f"Failed to create room: {e}") from e

            data = response.json()
            room_id = data.get("roomId")
            if not room_id:
                raise RuntimeError(f"Unexpected API response, missing roomId: {data}")

            return room_id
        else:
            raise ValueError(
                "VIDEOSDK_AUTH_TOKEN not found. "
                "Set it as an environment variable or provide it in room options via auth_token."
            )

Holds the runtime state for a single job, including room connection, pipeline, and shutdown lifecycle management.

Methods

def add_shutdown_callback(self, callback: Callable[[], Coroutine[None, None, None]]) ‑> None
Expand source code
def add_shutdown_callback(
    self, callback: Callable[[], Coroutine[None, None, None]]
) -> None:
    """Add a callback to be called during shutdown"""
    self._shutdown_callbacks.append(callback)

Add a callback to be called during shutdown

async def connect(self) ‑> None
Expand source code
async def connect(self) -> None:
    """Connect to the room"""
    if self.room_options:
        custom_camera_video_track = None
        custom_microphone_audio_track = None
        sinks = []

        avatar = self.room_options.avatar
        if not avatar and self._pipeline and hasattr(self._pipeline, "avatar"):
            avatar = self._pipeline.avatar

        if avatar:
            if not self.room_options.room_id:
                self.room_options.room_id = self.get_room_id()
            room_id = self.room_options.room_id

            from .avatar import AvatarAudioOut, generate_avatar_credentials

            if isinstance(avatar, AvatarAudioOut):
                avatar.set_room_id(room_id)
                await avatar.connect()
                audio_out = avatar
            else:
                _api_key = os.getenv("VIDEOSDK_API_KEY")
                _secret_key = os.getenv("VIDEOSDK_SECRET_KEY")
                credentials = generate_avatar_credentials(
                    _api_key, _secret_key, participant_id=avatar.participant_id
                )
                await avatar.connect(room_id, credentials.token)
                audio_out = AvatarAudioOut(credentials=credentials, room_id=room_id)
                await audio_out.connect()  # no-op (no dispatcher_url)

            custom_camera_video_track = getattr(avatar, 'video_track', None)
            custom_microphone_audio_track = getattr(avatar, 'audio_track', None)
            sinks.append(audio_out)
            self._cloud_avatar = avatar if not isinstance(avatar, AvatarAudioOut) else None
            self._avatar_audio_out = audio_out
            if self._pipeline:
                self._pipeline.avatar = audio_out

        if self.want_console:
            from .console_mode import setup_console_voice_for_ctx

            if not self._pipeline:
                raise RuntimeError(
                    "Pipeline must be constructed before ctx.connect() in console mode"
                )
            cleanup_callback = await setup_console_voice_for_ctx(self)
            self.add_shutdown_callback(cleanup_callback)
        else:
            self.metrics_collector.transport_mode = self.room_options.transport_mode
            self.metrics_collector.analytics_client.configure(self.room_options.metrics)
            if self.room_options.transport_mode == TransportMode.VIDEOSDK:
                from .room.room import VideoSDKHandler
                
                if not self.room_options.room_id:
                    self.room_options.room_id = self.get_room_id()
                if self.room_options.send_logs_to_dashboard or (self.room_options.logs and self.room_options.logs.enabled):
                    from .metrics.logger_handler import LogManager, JobLogger
                    self._log_manager = LogManager()
                    self._log_manager.start(auth_token=self.videosdk_auth or "")
                    self._job_logger = JobLogger(
                        queue=self._log_manager.get_queue(),
                        room_id=self.room_options.room_id or "",
                        peer_id=self.room_options.agent_participant_id or "agent",
                        auth_token=self.videosdk_auth or "",
                        dashboard_log_level=self.room_options.dashboard_log_level if not (self.room_options.logs and self.room_options.logs.level) else self.room_options.logs.level,
                        send_logs_to_dashboard=True,
                    )

                if self.room_options.join_meeting:
                    validate_room_options_recording(self.room_options)
                    record_audio_resolved, record_screen_share = resolve_video_sdk_recording(
                        self.room_options
                    )
                    agent_id = self._pipeline.agent.id if self._pipeline and hasattr(self._pipeline, 'agent') else None
                    self.room = VideoSDKHandler(
                        meeting_id=self.room_options.room_id,
                        auth_token=self.videosdk_auth,
                        name=self.room_options.name,
                        agent_participant_id=self.room_options.agent_participant_id,
                        agent_id=agent_id,
                        pipeline=self._pipeline,
                        loop=self._loop,
                        vision=self.room_options.vision,
                        recording=self.room_options.recording,
                        record_audio=record_audio_resolved,
                        record_screen_share=record_screen_share,
                        custom_camera_video_track=custom_camera_video_track,
                        custom_microphone_audio_track=custom_microphone_audio_track,
                        audio_sinks=sinks,
                        background_audio=self.room_options.background_audio,
                        on_room_error=self.room_options.on_room_error,
                        auto_end_session=self.room_options.auto_end_session,
                        session_timeout_seconds=self.room_options.session_timeout_seconds,
                        no_participant_timeout_seconds=self.room_options.no_participant_timeout_seconds,
                        signaling_base_url=self.room_options.signaling_base_url,
                        job_logger=self._job_logger,
                        traces_options=self.room_options.traces,
                        metrics_options=self.room_options.metrics,
                        logs_options=self.room_options.logs,
                        avatar_participant_id=avatar.participant_id if avatar and hasattr(avatar, 'participant_id') else None,
                    )
                if self._pipeline and hasattr(
                    self._pipeline, "_set_loop_and_audio_track"
                ):
                    self._pipeline._set_loop_and_audio_track(
                        self._loop, self.room.audio_track
                    )

            elif self.room_options.transport_mode == TransportMode.WEBSOCKET:
                if not self.room_options.websocket:
                    raise ValueError("WebSocket configuration (websocket) is required when mode is WEBSOCKET")
                
                if self.room_options.webrtc and (self.room_options.webrtc.signaling_url or self.room_options.webrtc.ice_servers != [{"urls": "stun:stun.l.google.com:19302"}]):
                    logger.warning("WebRTC configuration provided but transport mode is set to WEBSOCKET. WebRTC config will be ignored.")

                from .transports.websocket_handler import WebSocketTransportHandler
                self.room = WebSocketTransportHandler(
                    loop=self._loop,
                    pipeline=self._pipeline,
                    port=self.room_options.websocket.port,
                    path=self.room_options.websocket.path
                )
            elif self.room_options.transport_mode == TransportMode.WEBRTC:
                if not self.room_options.webrtc:
                    raise ValueError("WebRTC configuration (webrtc) is required when mode is WEBRTC")
                
                if not self.room_options.webrtc.signaling_url:
                    raise ValueError("WebRTC signaling_url is required when mode is WEBRTC")

                if self.room_options.websocket and (self.room_options.websocket.port != 8080 or self.room_options.websocket.path != "/ws"):
                    logger.warning("WebSocket configuration provided but connection mode is set to WEBRTC. WebSocket config will be ignored.")

                from .transports.webrtc_handler import WebRTCTransportHandler
                self.room = WebRTCTransportHandler(
                    loop=self._loop,
                    pipeline=self._pipeline,
                    signaling_url=self.room_options.webrtc.signaling_url,
                    ice_servers=self.room_options.webrtc.ice_servers
                )
            
            elif self.room_options.transport_mode == TransportMode.VIDEOSDK:
                if self.room_options.websocket and (self.room_options.websocket.port != 8080 or self.room_options.websocket.path != "/ws"):
                     logger.warning("WebSocket configuration provided but transport mode is VIDEOSDK. WebSocket config will be ignored.")
                if self.room_options.webrtc and (self.room_options.webrtc.signaling_url or self.room_options.webrtc.ice_servers != [{"urls": "stun:stun.l.google.com:19302"}]):
                     logger.warning("WebRTC configuration provided but transport mode is VIDEOSDK. WebRTC config will be ignored.")

    if self.room:
        await self.room.connect()

        # For Non-VideoSDK modes, we still need to ensure audio track is linked if not done inside constructor
        if (
            self.room_options.transport_mode != TransportMode.VIDEOSDK
            and self._pipeline
            and hasattr(self._pipeline, "_set_loop_and_audio_track")
        ):
            # BaseTransportHandler subclasses now initialize self.audio_track
            if self.room.audio_track:
                self._pipeline._set_loop_and_audio_track(self._loop, self.room.audio_track)

    if (
        self.room_options.playground
        and self.room_options.join_meeting
        and not self.want_console
        and self.room_options.transport_mode == TransportMode.VIDEOSDK
    ):
        if self.videosdk_auth:
            playground_url = f"https://playground.videosdk.live?token={self.videosdk_auth}&meetingId={self.room_options.room_id}"
            print(f"\033[1;36m" + "Agent started in playground mode" + "\033[0m")
            print("\033[1;75m" + "Interact with agent here at:" + "\033[0m")
            print("\033[1;4;94m" + playground_url + "\033[0m")
        else:
            raise ValueError("VIDEOSDK_AUTH_TOKEN environment variable not found")

Connect to the room

def get_room_id(self) ‑> str
Expand source code
def get_room_id(self) -> str:
    """
    Creates a new room using the VideoSDK API and returns the room ID.

    Raises:
        ValueError: If the VIDEOSDK_AUTH_TOKEN is missing.
        RuntimeError: If the API request fails or the response is invalid.
    """
    if self.want_console:
        return None

    if self.videosdk_auth:
        base_url = self.room_options.signaling_base_url
        url = f"https://{base_url}/v2/rooms"
        headers = {"Authorization": self.videosdk_auth}

        try:
            response = requests.post(url, headers=headers)
            response.raise_for_status()
        except requests.RequestException as e:
            raise RuntimeError(f"Failed to create room: {e}") from e

        data = response.json()
        room_id = data.get("roomId")
        if not room_id:
            raise RuntimeError(f"Unexpected API response, missing roomId: {data}")

        return room_id
    else:
        raise ValueError(
            "VIDEOSDK_AUTH_TOKEN not found. "
            "Set it as an environment variable or provide it in room options via auth_token."
        )

Creates a new room using the VideoSDK API and returns the room ID.

Raises

ValueError
If the VIDEOSDK_AUTH_TOKEN is missing.
RuntimeError
If the API request fails or the response is invalid.
def notify_meeting_joined(self) ‑> None
Expand source code
def notify_meeting_joined(self) -> None:
    """Called when the agent successfully joins the meeting."""
    self._meeting_joined_event.set()
    audio_out = getattr(self, '_avatar_audio_out', None)
    if audio_out and self.room and self.room.meeting:
        audio_out._set_meeting(self.room.meeting)

Called when the agent successfully joins the meeting.

async def run_until_shutdown(self, session: Any = None, wait_for_participant: bool = False) ‑> None
Expand source code
async def run_until_shutdown(
    self,
    session: Any = None,
    wait_for_participant: bool = False,
) -> None:
    """
    Simplified helper that handles all cleanup boilerplate.

    This method:
    1. Connects to the room
    2. Sets up session end callbacks
    3. Waits for participant (optional)
    4. Starts the session
    5. Waits for shutdown signal
    6. Cleans up gracefully

    Args:
        session: AgentSession to manage (will call session.start() and session.close())
        wait_for_participant: Whether to wait for a participant before starting

    Example:
        ```python
        async def entrypoint(ctx: JobContext):
            session = AgentSession(agent=agent, pipeline=pipeline)
            await ctx.run_until_shutdown(session=session, wait_for_participant=True)
        ```
    """
    shutdown_event = asyncio.Event()

    if session:

        async def cleanup_session():
            logger.info("Cleaning up session...")
            try:
                await session.close()
            except Exception as e:
                logger.error(f"Error closing session in cleanup: {e}")
            shutdown_event.set()

        self.add_shutdown_callback(cleanup_session)
    else:

        async def cleanup_no_session():
            logger.info("Shutdown called, no session to clean up")
            shutdown_event.set()

        self.add_shutdown_callback(cleanup_no_session)

    def on_session_end(reason: str):
        logger.info(f"Session ended: {reason}")
        asyncio.create_task(self.shutdown())

    try:
        try:
            await self.connect()
        except Exception as e:
            logger.error(f"Error connecting to room: {e}")
            raise

        if self.room:
            try:
                self.room.setup_session_end_callback(on_session_end)
                logger.info("Session end callback configured")
            except Exception as e:
                logger.warning(f"Error setting up session end callback: {e}")
        else:
            logger.warning(
                "Room not available, session end callback not configured"
            )

        if wait_for_participant and self.room:
            try:
                logger.info("Waiting for participant...")
                participant_id = await self.room.wait_for_participant()
                if participant_id is None:
                    logger.info("Session ended before any participant joined, shutting down")
                    return
                logger.info("Participant joined")
            except Exception as e:
                logger.error(f"Error waiting for participant: {e}")
                raise

        if session:
            try:
                await session.start()
                logger.info("Agent session started")
            except Exception as e:
                logger.error(f"Error starting session: {e}")
                raise

        logger.info(
            "Agent is running... (will exit when session ends or on interrupt)"
        )
        await shutdown_event.wait()
        logger.info("Shutdown event received, exiting gracefully...")

    except KeyboardInterrupt:
        logger.info("Keyboard interrupt received, shutting down...")
    except Exception as e:
        logger.error(f"Unexpected error in run_until_shutdown: {e}")
        raise
    finally:
        if session:
            try:
                await session.close()
            except Exception as e:
                logger.error(f"Error closing session in finally: {e}")

        try:
            await self.shutdown()
        except Exception as e:
            logger.error(f"Error in ctx.shutdown: {e}")

Simplified helper that handles all cleanup boilerplate.

This method: 1. Connects to the room 2. Sets up session end callbacks 3. Waits for participant (optional) 4. Starts the session 5. Waits for shutdown signal 6. Cleans up gracefully

Args

session
AgentSession to manage (will call session.start() and session.close())
wait_for_participant
Whether to wait for a participant before starting

Example

async def entrypoint(ctx: JobContext):
    session = AgentSession(agent=agent, pipeline=pipeline)
    await ctx.run_until_shutdown(session=session, wait_for_participant=True)
async def shutdown(self) ‑> None
Expand source code
async def shutdown(self) -> None:
    """Called by Worker during graceful shutdown"""
    if self._is_shutting_down:
        logger.info("JobContext already shutting down")
        return
    self._is_shutting_down = True
    logger.info("JobContext shutting down")
    for callback in self._shutdown_callbacks:
        try:
            await callback()
        except Exception as e:
            logger.error(f"Error in shutdown callback: {e}")

    if self._pipeline:
        try:
            await self._pipeline.cleanup()
        except Exception as e:
            logger.error(f"Error during pipeline cleanup: {e}")
        self._pipeline = None

    cloud_avatar = getattr(self, '_cloud_avatar', None)
    if cloud_avatar and hasattr(cloud_avatar, 'aclose'):
        try:
            await cloud_avatar.aclose()
        except Exception as e:
            logger.error(f"Error during cloud avatar aclose: {e}")
    audio_out = getattr(self, '_avatar_audio_out', None)
    if audio_out:
        try:
            await audio_out.aclose()
        except Exception as e:
            logger.error(f"Error during avatar audio_out aclose: {e}")

    if self._job_logger:
        try:
            self._job_logger.cleanup()
        except Exception as e:
            logger.error(f"Error during job logger cleanup: {e}")
        self._job_logger = None
    if self._log_manager:
        try:
            self._log_manager.stop()
        except Exception as e:
            logger.error(f"Error during log manager stop: {e}")
        self._log_manager = None

    if self.room:
        try:
            if not getattr(self.room, "_left", False):
                await self.room.leave()
            else:
                logger.info("Room already left, skipping room.leave()")
        except Exception as e:
            logger.error(f"Error during room leave: {e}")
        try:
            if hasattr(self.room, "cleanup"):
                await self.room.cleanup()
        except Exception as e:
            logger.error(f"Error during room cleanup: {e}")
        self.room = None

    self.room_options = None
    self._loop = None
    self.videosdk_auth = None
    self._shutdown_callbacks.clear()
    logger.info("JobContext cleaned up")

Called by Worker during graceful shutdown

async def wait_for_meeting_joined(self, timeout: float = 30.0) ‑> bool
Expand source code
async def wait_for_meeting_joined(self, timeout: float = 30.0) -> bool:
    """Wait until the meeting is joined or timeout. Returns True if joined."""
    try:
        await asyncio.wait_for(self._meeting_joined_event.wait(), timeout=timeout)
        return True
    except asyncio.TimeoutError:
        logger.warning(f"Timeout waiting for meeting join after {timeout}s")
        return False

Wait until the meeting is joined or timeout. Returns True if joined.

async def wait_for_participant(self, participant_id: str | None = None) ‑> str
Expand source code
async def wait_for_participant(self, participant_id: str | None = None) -> str:
    if self.room:
        return await self.room.wait_for_participant(participant_id)
    else:
        raise ValueError("Room not initialized")
class JobExecutorType (*args, **kwds)
Expand source code
@unique
class JobExecutorType(Enum):
    """Enumeration of executor types for running jobs in separate processes or threads."""

    PROCESS = "process"
    THREAD = "thread"

Enumeration of executor types for running jobs in separate processes or threads.

Ancestors

  • enum.Enum

Class variables

var PROCESS
var THREAD
class LoggingOptions (enabled: bool = False,
level: str = 'INFO',
export_url: str | None = None,
export_headers: Dict[str, str] | None = None)
Expand source code
@dataclass
class LoggingOptions:
    """Configuration for log collection, level filtering, and export settings."""

    enabled: bool = False
    level: str = "INFO"
    export_url: Optional[str] = None
    export_headers: Optional[Dict[str, str]] = None

Configuration for log collection, level filtering, and export settings.

Instance variables

var enabled : bool
var export_headers : Dict[str, str] | None
var export_url : str | None
var level : str
class MetricsOptions (enabled: bool = True,
export_url: str | None = None,
export_headers: Dict[str, str] | None = None)
Expand source code
@dataclass
class MetricsOptions:
    """Configuration for metrics collection and export settings."""

    enabled: bool = True
    export_url: Optional[str] = None
    export_headers: Optional[Dict[str, str]] = None

Configuration for metrics collection and export settings.

Instance variables

var enabled : bool
var export_headers : Dict[str, str] | None
var export_url : str | None
class Options (executor_type: Any = None,
num_idle_processes: int = 1,
initialize_timeout: float = 10.0,
close_timeout: float = 60.0,
memory_warn_mb: float = 500.0,
memory_limit_mb: float = 0.0,
ping_interval: float = 30.0,
max_processes: int = 1,
agent_id: str = 'VideoSDKAgent',
auth_token: str | None = None,
permissions: Any = None,
max_retry: int = 16,
load_threshold: float = 0.75,
register: bool = False,
signaling_base_url: str = 'api.videosdk.live',
host: str = '0.0.0.0',
port: int = 8081,
log_level: str = 'INFO')
Expand source code
@dataclass
class Options:
    """Configuration options for WorkerJob execution."""

    executor_type: Any = None  # Will be set in __post_init__
    """Which executor to use to run jobs. Automatically selected based on platform."""

    num_idle_processes: int = 1
    """Number of idle processes/threads to keep warm."""

    initialize_timeout: float = 10.0
    """Maximum amount of time to wait for a process/thread to initialize/prewarm"""

    close_timeout: float = 60.0
    """Maximum amount of time to wait for a job to shut down gracefully"""

    memory_warn_mb: float = 500.0
    """Memory warning threshold in MB."""

    memory_limit_mb: float = 0.0
    """Maximum memory usage for a job in MB. Defaults to 0 (disabled)."""

    ping_interval: float = 30.0
    """Interval between health check pings."""

    max_processes: int = 1
    """Maximum number of processes/threads."""

    agent_id: str = "VideoSDKAgent"
    """ID of the agent."""

    auth_token: Optional[str] = None
    """VideoSDK authentication token. Uses VIDEOSDK_AUTH_TOKEN env var if not provided."""

    permissions: Any = None  # Will be set in __post_init__
    """Permissions for the agent participant."""

    max_retry: int = 16
    """Maximum number of times to retry connecting to VideoSDK."""

    load_threshold: float = 0.75
    """Load threshold above which worker is marked as unavailable."""

    register: bool = False
    """Whether to register with the backend. Defaults to False for local development."""

    signaling_base_url: str = "api.videosdk.live"
    """Signaling base URL for VideoSDK services. Defaults to api.videosdk.live."""

    host: str = "0.0.0.0"
    """Host for the debug HTTP server."""

    port: int = 8081
    """Port for the debug HTTP server."""

    log_level: str = "INFO"
    """Log level for SDK logging. Options: DEBUG, INFO, WARNING, ERROR. Defaults to INFO."""

    def __post_init__(self):
        """Post-initialization setup."""
        # Import here to avoid circular imports
        from .worker import ExecutorType, WorkerPermissions, _default_executor_type

        if self.executor_type is None:
            self.executor_type = _default_executor_type

        if self.permissions is None:
            self.permissions = WorkerPermissions()

        if not self.auth_token:
            self.auth_token = os.getenv("VIDEOSDK_AUTH_TOKEN")

Configuration options for WorkerJob execution.

Instance variables

var agent_id : str

ID of the agent.

var auth_token : str | None

VideoSDK authentication token. Uses VIDEOSDK_AUTH_TOKEN env var if not provided.

var close_timeout : float

Maximum amount of time to wait for a job to shut down gracefully

var executor_type : Any

Which executor to use to run jobs. Automatically selected based on platform.

var host : str

Host for the debug HTTP server.

var initialize_timeout : float

Maximum amount of time to wait for a process/thread to initialize/prewarm

var load_threshold : float

Load threshold above which worker is marked as unavailable.

var log_level : str

Log level for SDK logging. Options: DEBUG, INFO, WARNING, ERROR. Defaults to INFO.

var max_processes : int

Maximum number of processes/threads.

var max_retry : int

Maximum number of times to retry connecting to VideoSDK.

var memory_limit_mb : float

Maximum memory usage for a job in MB. Defaults to 0 (disabled).

var memory_warn_mb : float

Memory warning threshold in MB.

var num_idle_processes : int

Number of idle processes/threads to keep warm.

var permissions : Any

Permissions for the agent participant.

var ping_interval : float

Interval between health check pings.

var port : int

Port for the debug HTTP server.

var register : bool

Whether to register with the backend. Defaults to False for local development.

var signaling_base_url : str

Signaling base URL for VideoSDK services. Defaults to api.videosdk.live.

class RecordingOptions (video: bool = False, screen_share: bool = False)
Expand source code
@dataclass
class RecordingOptions:
    """
    Extra recording when RoomOptions.recording is True.

    Audio is always recorded when recording=True (track API, kind=audio).
    Set video and/or screen_share here only when you need them.
    screen_share=True requires RoomOptions.vision=True.
    """

    video: bool = False
    screen_share: bool = False

Extra recording when RoomOptions.recording is True.

Audio is always recorded when recording=True (track API, kind=audio). Set video and/or screen_share here only when you need them. screen_share=True requires RoomOptions.vision=True.

Instance variables

var screen_share : bool
var video : bool
class RoomOptions (transport_mode: str | TransportMode | None = None,
websocket: WebSocketConfig | None = None,
webrtc: WebRTCConfig | None = None,
traces: TracesOptions | None = None,
metrics: MetricsOptions | None = None,
logs: LoggingOptions | None = None,
**kwargs)
Expand source code
@dataclass
class RoomOptions:
    """Configuration options for connecting to and managing a VideoSDK room, including transport, telemetry, and session settings."""

    room_id: Optional[str] = None
    auth_token: Optional[str] = None
    name: Optional[str] = "Agent"
    agent_participant_id: Optional[str] = None
    playground: bool = True
    vision: bool = False
    recording: bool = False
    # recording=True → always record audio (track API). Optional RecordingOptions.video /
    # RecordingOptions.screen_share for camera video and/or screen share (see validate/resolve).
    recording_options: Optional[RecordingOptions] = None
    avatar: Optional[Any] = None
    join_meeting: Optional[bool] = True
    on_room_error: Optional[Callable[[Any], None]] = None
    send_analytics_to_pubsub: Optional[bool] = False
    # Session management options
    auto_end_session: bool = True
    session_timeout_seconds: Optional[int] = 5
    no_participant_timeout_seconds: Optional[int] = 90
    # VideoSDK connection options
    signaling_base_url: Optional[str] = "api.videosdk.live"
    background_audio: bool = False

    send_logs_to_dashboard: bool = False
    dashboard_log_level: str = "INFO"

    # Telemetry and logging configurations
    traces: Optional[TracesOptions] = None
    metrics: Optional[MetricsOptions] = None
    logs: Optional[LoggingOptions] = None

    # New Configuration Fields
    _transport_mode: TransportMode = field(default=TransportMode.VIDEOSDK, init=False, repr=False)

    # Structured configs
    websocket: Optional[WebSocketConfig] = None
    webrtc: Optional[WebRTCConfig] = None

    # Alias properties for easier usage as requested
    @property
    def transport_mode(self) -> TransportMode:
        return self._transport_mode

    @transport_mode.setter
    def transport_mode(self, value):
        if isinstance(value, str):
            try:
                self._transport_mode = TransportMode(value.lower())
            except ValueError:
                # Fallback for compatibility or custom modes
                pass
        elif isinstance(value, TransportMode):
            self._transport_mode = value

    def __init__(
        self,
        transport_mode: Optional[str | TransportMode] = None,
        websocket: Optional[WebSocketConfig] = None,
        webrtc: Optional[WebRTCConfig] = None,
        traces: Optional[TracesOptions] = None,
        metrics: Optional[MetricsOptions] = None,
        logs: Optional[LoggingOptions] = None,
        **kwargs,
    ):
        # Initialize internal field
        self._transport_mode = TransportMode.VIDEOSDK
        
        # Handle telemetry options
        self.traces = traces or TracesOptions()
        self.metrics = metrics or MetricsOptions()
        self.logs = logs or LoggingOptions()

        # Handle connection mode
        if transport_mode:
            if isinstance(transport_mode, str):
                try:
                    self._transport_mode = TransportMode(transport_mode.lower())
                except ValueError:
                    pass
            elif isinstance(transport_mode, TransportMode):
                self._transport_mode = transport_mode

        self.websocket = websocket or WebSocketConfig()
        self.webrtc = webrtc or WebRTCConfig()

        # Handle standard fields
        for key, value in kwargs.items():
            if hasattr(self, key):
                setattr(self, key, value)

Configuration options for connecting to and managing a VideoSDK room, including transport, telemetry, and session settings.

Instance variables

var agent_participant_id : str | None
var auth_token : str | None
var auto_end_session : bool
var avatar : typing.Any | None
var background_audio : bool
var dashboard_log_level : str
var join_meeting : bool | None
var logsLoggingOptions | None
var metricsMetricsOptions | None
var name : str | None
var no_participant_timeout_seconds : int | None
var on_room_error : Callable[[Any], None] | None
var playground : bool
var recording : bool
var recording_optionsRecordingOptions | None
var room_id : str | None
var send_analytics_to_pubsub : bool | None
var send_logs_to_dashboard : bool
var session_timeout_seconds : int | None
var signaling_base_url : str | None
var tracesTracesOptions | None
prop transport_modeTransportMode
Expand source code
@property
def transport_mode(self) -> TransportMode:
    return self._transport_mode
var vision : bool
var webrtcWebRTCConfig | None
var websocketWebSocketConfig | None
class RunningJobInfo (accept_arguments: JobAcceptArguments,
job: JobContext,
url: str,
token: str,
worker_id: str)
Expand source code
@dataclass
class RunningJobInfo:
    """Tracks a running job's context, connection details, and associated worker identity."""

    accept_arguments: JobAcceptArguments
    job: JobContext
    url: str
    token: str
    worker_id: str

    async def _run(self):
        # Placeholder for job execution logic if needed in the future
        pass

Tracks a running job's context, connection details, and associated worker identity.

Instance variables

var accept_argumentsJobAcceptArguments
var jobJobContext
var token : str
var url : str
var worker_id : str
class TracesOptions (enabled: bool = True,
export_url: str | None = None,
export_headers: Dict[str, str] | None = None)
Expand source code
@dataclass
class TracesOptions:
    """Configuration for OpenTelemetry trace export settings."""

    enabled: bool = True
    export_url: Optional[str] = None
    export_headers: Optional[Dict[str, str]] = None

Configuration for OpenTelemetry trace export settings.

Instance variables

var enabled : bool
var export_headers : Dict[str, str] | None
var export_url : str | None
class TransportMode (*args, **kwds)
Expand source code
class TransportMode(Enum):
    """Enumeration of supported transport modes for room connections."""

    VIDEOSDK = "videosdk"
    WEBSOCKET = "websocket"
    WEBRTC = "webrtc"

Enumeration of supported transport modes for room connections.

Ancestors

  • enum.Enum

Class variables

var VIDEOSDK
var WEBRTC
var WEBSOCKET
class WebRTCConfig (signaling_url: str | None = None,
signaling_type: str = 'websocket',
ice_servers: list | None = None)
Expand source code
@dataclass
class WebRTCConfig:
    """Configuration for WebRTC transport including signaling and ICE server settings."""

    signaling_url: Optional[str] = None
    signaling_type: str = "websocket"
    ice_servers: Optional[list] = None

    def __post_init__(self):
        if self.ice_servers is None:
            self.ice_servers = [{"urls": "stun:stun.l.google.com:19302"}]

Configuration for WebRTC transport including signaling and ICE server settings.

Instance variables

var ice_servers : list | None
var signaling_type : str
var signaling_url : str | None
class WebSocketConfig (port: int = 8080, path: str = '/ws')
Expand source code
@dataclass
class WebSocketConfig:
    """Configuration for WebSocket transport including port and endpoint path."""

    port: int = 8080
    path: str = "/ws"

Configuration for WebSocket transport including port and endpoint path.

Instance variables

var path : str
var port : int
class WorkerJob (entrypoint,
jobctx=None,
options: Options | None = None)
Expand source code
class WorkerJob:
    """Wraps an async entrypoint function and manages its execution either directly or via a Worker process."""

    def __init__(self, entrypoint, jobctx=None, options: Optional[Options] = None):
        """
        :param entrypoint: An async function accepting one argument: jobctx
        :param jobctx: A static object or a callable that returns a context per job
        :param options: Configuration options for job execution
        """
        if not asyncio.iscoroutinefunction(entrypoint):
            raise TypeError("entrypoint must be a coroutine function")
        self.entrypoint = entrypoint
        self.jobctx = jobctx
        self.options = options or Options()

    def start(self):
        from .worker import Worker, WorkerOptions

        # Convert JobOptions to WorkerOptions for compatibility
        worker_options = WorkerOptions(
            entrypoint_fnc=self.entrypoint,
            agent_id=self.options.agent_id,
            auth_token=self.options.auth_token,
            executor_type=self.options.executor_type,
            num_idle_processes=self.options.num_idle_processes,
            initialize_timeout=self.options.initialize_timeout,
            close_timeout=self.options.close_timeout,
            memory_warn_mb=self.options.memory_warn_mb,
            memory_limit_mb=self.options.memory_limit_mb,
            ping_interval=self.options.ping_interval,
            max_processes=self.options.max_processes,
            permissions=self.options.permissions,
            max_retry=self.options.max_retry,
            load_threshold=self.options.load_threshold,
            register=self.options.register,
            signaling_base_url=self.options.signaling_base_url,
            host=self.options.host,
            port=self.options.port,
            log_level=self.options.log_level,
        )

        # If register=True, run the worker in backend mode (don't execute entrypoint immediately)
        if self.options.register:
            default_room_options = None
            if self.jobctx:
                if callable(self.jobctx):
                    job_context = self.jobctx()
                else:
                    job_context = self.jobctx
                default_room_options = job_context.room_options
            # Run the worker normally (for backend registration mode)
            Worker.run_worker(
                options=worker_options, default_room_options=default_room_options
            )
        else:
            # Direct mode - run entrypoint immediately if we have a job context
            if self.jobctx:
                if callable(self.jobctx):
                    job_context = self.jobctx()
                else:
                    job_context = self.jobctx

                # Set the current job context and run the entrypoint
                token = _set_current_job_context(job_context)
                try:
                    asyncio.run(self.entrypoint(job_context))
                finally:
                    _reset_current_job_context(token)
            else:
                # No job context provided, run worker normally
                Worker.run_worker(worker_options)

Wraps an async entrypoint function and manages its execution either directly or via a Worker process.

:param entrypoint: An async function accepting one argument: jobctx :param jobctx: A static object or a callable that returns a context per job :param options: Configuration options for job execution

Methods

def start(self)
Expand source code
def start(self):
    from .worker import Worker, WorkerOptions

    # Convert JobOptions to WorkerOptions for compatibility
    worker_options = WorkerOptions(
        entrypoint_fnc=self.entrypoint,
        agent_id=self.options.agent_id,
        auth_token=self.options.auth_token,
        executor_type=self.options.executor_type,
        num_idle_processes=self.options.num_idle_processes,
        initialize_timeout=self.options.initialize_timeout,
        close_timeout=self.options.close_timeout,
        memory_warn_mb=self.options.memory_warn_mb,
        memory_limit_mb=self.options.memory_limit_mb,
        ping_interval=self.options.ping_interval,
        max_processes=self.options.max_processes,
        permissions=self.options.permissions,
        max_retry=self.options.max_retry,
        load_threshold=self.options.load_threshold,
        register=self.options.register,
        signaling_base_url=self.options.signaling_base_url,
        host=self.options.host,
        port=self.options.port,
        log_level=self.options.log_level,
    )

    # If register=True, run the worker in backend mode (don't execute entrypoint immediately)
    if self.options.register:
        default_room_options = None
        if self.jobctx:
            if callable(self.jobctx):
                job_context = self.jobctx()
            else:
                job_context = self.jobctx
            default_room_options = job_context.room_options
        # Run the worker normally (for backend registration mode)
        Worker.run_worker(
            options=worker_options, default_room_options=default_room_options
        )
    else:
        # Direct mode - run entrypoint immediately if we have a job context
        if self.jobctx:
            if callable(self.jobctx):
                job_context = self.jobctx()
            else:
                job_context = self.jobctx

            # Set the current job context and run the entrypoint
            token = _set_current_job_context(job_context)
            try:
                asyncio.run(self.entrypoint(job_context))
            finally:
                _reset_current_job_context(token)
        else:
            # No job context provided, run worker normally
            Worker.run_worker(worker_options)