Module agents.job

Functions

def get_current_job_context() ‑> JobContext | None
Expand source code
def get_current_job_context() -> Optional["JobContext"]:
    """Get the current job context (used by pipeline constructors)"""
    return _current_job_context.get()

Get the current job context (used by pipeline constructors)

Classes

class JobAcceptArguments (identity: str, name: str, metadata: str = '')
Expand source code
@dataclass
class JobAcceptArguments:
    """Arguments for accepting a job."""

    identity: str
    name: str
    metadata: str = ""

Arguments for accepting a job.

Instance variables

var identity : str
var metadata : str
var name : str
class JobContext (*,
room_options: RoomOptions,
metadata: dict | None = None,
loop: asyncio.events.AbstractEventLoop | None = None)
Expand source code
class JobContext:
    def __init__(
        self,
        *,
        room_options: RoomOptions,
        metadata: Optional[dict] = None,
        loop: Optional[asyncio.AbstractEventLoop] = None,
    ) -> None:
        self.room_options = room_options
        self.metadata = metadata or {}
        self._loop = loop or asyncio.get_event_loop()
        self._pipeline: Optional[Pipeline] = None
        self.videosdk_auth = self.room_options.auth_token or os.getenv(
            "VIDEOSDK_AUTH_TOKEN"
        )
        self.room: Optional["BaseTransportHandler"] = None
        self._shutdown_callbacks: list[Callable[[], Coroutine[None, None, None]]] = []
        self._is_shutting_down: bool = False
        self.want_console = len(sys.argv) > 1 and sys.argv[1].lower() == "console"
        self.playground_manager: Optional["PlaygroundManager"] = None
    def _set_pipeline_internal(self, pipeline: Any) -> None:
        """Internal method called by pipeline constructors"""
        self._pipeline = pipeline
        if self.room:
            self.room.pipeline = pipeline
            if hasattr(pipeline, "_set_loop_and_audio_track"):
                pipeline._set_loop_and_audio_track(self._loop, self.room.audio_track)

            # Ensure our lambda function fix is preserved after pipeline setup
            # This prevents the pipeline from overriding our event handlers
            if hasattr(self.room, "meeting") and self.room.meeting:
                # Re-apply our lambda function fix to ensure it's not overridden
                self.room.meeting.add_event_listener(
                    self.room._create_meeting_handler()
                )

    async def connect(self) -> None:
        """Connect to the room"""
        if self.room_options:
            custom_camera_video_track = None
            custom_microphone_audio_track = None
            sinks = []

            avatar = self.room_options.avatar
            if not avatar and self._pipeline and hasattr(self._pipeline, "avatar"):
                avatar = self._pipeline.avatar

            if avatar:
                await avatar.connect()
                custom_camera_video_track = avatar.video_track
                custom_microphone_audio_track = avatar.audio_track
                sinks.append(avatar)

            if self.want_console:
                from .console_mode import setup_console_voice_for_ctx

                if not self._pipeline:
                    raise RuntimeError(
                        "Pipeline must be constructed before ctx.connect() in console mode"
                    )
                cleanup_callback = await setup_console_voice_for_ctx(self)
                self.add_shutdown_callback(cleanup_callback)
            else:
                if self.room_options.transport_mode == TransportMode.VIDEOSDK:
                    if not self.room_options.room_id:
                        self.room_options.room_id = self.get_room_id()
                    if self.room_options.join_meeting:
                        agent_id = self._pipeline.agent.id if self._pipeline and hasattr(self._pipeline, 'agent') else None
                        self.room = VideoSDKHandler(
                            meeting_id=self.room_options.room_id,
                            auth_token=self.videosdk_auth,
                            name=self.room_options.name,
                            agent_participant_id=self.room_options.agent_participant_id,
                            agent_id=agent_id,
                            pipeline=self._pipeline,
                            loop=self._loop,
                            vision=self.room_options.vision,
                            recording=self.room_options.recording,
                            custom_camera_video_track=custom_camera_video_track,
                            custom_microphone_audio_track=custom_microphone_audio_track,
                            audio_sinks=sinks,
                            background_audio=self.room_options.background_audio,
                            on_room_error=self.room_options.on_room_error,
                            auto_end_session=self.room_options.auto_end_session,
                            session_timeout_seconds=self.room_options.session_timeout_seconds,
                            signaling_base_url=self.room_options.signaling_base_url,
                        )
                    if self._pipeline and hasattr(
                        self._pipeline, "_set_loop_and_audio_track"
                    ):
                        self._pipeline._set_loop_and_audio_track(
                            self._loop, self.room.audio_track
                        )

                elif self.room_options.transport_mode == TransportMode.WEBSOCKET:
                    if not self.room_options.websocket:
                        raise ValueError("WebSocket configuration (websocket) is required when mode is WEBSOCKET")
                    
                    if self.room_options.webrtc and (self.room_options.webrtc.signaling_url or self.room_options.webrtc.ice_servers != [{"urls": "stun:stun.l.google.com:19302"}]):
                        logger.warning("WebRTC configuration provided but transport mode is set to WEBSOCKET. WebRTC config will be ignored.")

                    from .transports.websocket_handler import WebSocketTransportHandler
                    self.room = WebSocketTransportHandler(
                        loop=self._loop,
                        pipeline=self._pipeline,
                        port=self.room_options.websocket.port,
                        path=self.room_options.websocket.path
                    )
                elif self.room_options.transport_mode == TransportMode.WEBRTC:
                    if not self.room_options.webrtc:
                        raise ValueError("WebRTC configuration (webrtc) is required when mode is WEBRTC")
                    
                    if not self.room_options.webrtc.signaling_url:
                        raise ValueError("WebRTC signaling_url is required when mode is WEBRTC")

                    if self.room_options.websocket and (self.room_options.websocket.port != 8080 or self.room_options.websocket.path != "/ws"):
                        logger.warning("WebSocket configuration provided but connection mode is set to WEBRTC. WebSocket config will be ignored.")

                    from .transports.webrtc_handler import WebRTCTransportHandler
                    self.room = WebRTCTransportHandler(
                        loop=self._loop,
                        pipeline=self._pipeline,
                        signaling_url=self.room_options.webrtc.signaling_url,
                        ice_servers=self.room_options.webrtc.ice_servers
                    )
                
                elif self.room_options.transport_mode == TransportMode.VIDEOSDK:
                    if self.room_options.websocket and (self.room_options.websocket.port != 8080 or self.room_options.websocket.path != "/ws"):
                         logger.warning("WebSocket configuration provided but transport mode is VIDEOSDK. WebSocket config will be ignored.")
                    if self.room_options.webrtc and (self.room_options.webrtc.signaling_url or self.room_options.webrtc.ice_servers != [{"urls": "stun:stun.l.google.com:19302"}]):
                         logger.warning("WebRTC configuration provided but transport mode is VIDEOSDK. WebRTC config will be ignored.")

        if self.room:
            await self.room.connect()

            # For Non-VideoSDK modes, we still need to ensure audio track is linked if not done inside constructor
            if (
                self.room_options.transport_mode != TransportMode.VIDEOSDK
                and self._pipeline
                and hasattr(self._pipeline, "_set_loop_and_audio_track")
            ):
                # BaseTransportHandler subclasses now initialize self.audio_track
                if self.room.audio_track:
                    self._pipeline._set_loop_and_audio_track(self._loop, self.room.audio_track)

        if (
            self.room_options.playground
            and self.room_options.join_meeting
            and not self.want_console
            and self.room_options.transport_mode == TransportMode.VIDEOSDK
        ):
            if self.videosdk_auth:
                playground_url = f"https://playground.videosdk.live?token={self.videosdk_auth}&meetingId={self.room_options.room_id}"
                print(f"\033[1;36m" + "Agent started in playground mode" + "\033[0m")
                print("\033[1;75m" + "Interact with agent here at:" + "\033[0m")
                print("\033[1;4;94m" + playground_url + "\033[0m")
            else:
                raise ValueError("VIDEOSDK_AUTH_TOKEN environment variable not found")

    async def shutdown(self) -> None:
        """Called by Worker during graceful shutdown"""
        if self._is_shutting_down:
            logger.info("JobContext already shutting down")
            return
        self._is_shutting_down = True
        logger.info("JobContext shutting down")
        for callback in self._shutdown_callbacks:
            try:
                await callback()
            except Exception as e:
                logger.error(f"Error in shutdown callback: {e}")

        if self._pipeline:
            try:
                await self._pipeline.cleanup()
            except Exception as e:
                logger.error(f"Error during pipeline cleanup: {e}")
            self._pipeline = None

        if self.room:
            try:
                if not getattr(self.room, "_left", False):
                    await self.room.leave()
                else:
                    logger.info("Room already left, skipping room.leave()")
            except Exception as e:
                logger.error(f"Error during room leave: {e}")
            try:
                if hasattr(self.room, "cleanup"):
                    await self.room.cleanup()
            except Exception as e:
                logger.error(f"Error during room cleanup: {e}")
            self.room = None

        self.room_options = None
        self._loop = None
        self.videosdk_auth = None
        self._shutdown_callbacks.clear()
        logger.info("JobContext cleaned up")

    def add_shutdown_callback(
        self, callback: Callable[[], Coroutine[None, None, None]]
    ) -> None:
        """Add a callback to be called during shutdown"""
        self._shutdown_callbacks.append(callback)

    async def wait_for_participant(self, participant_id: str | None = None) -> str:
        if self.room:
            return await self.room.wait_for_participant(participant_id)
        else:
            raise ValueError("Room not initialized")

    async def run_until_shutdown(
        self,
        session: Any = None,
        wait_for_participant: bool = False,
    ) -> None:
        """
        Simplified helper that handles all cleanup boilerplate.

        This method:
        1. Connects to the room
        2. Sets up session end callbacks
        3. Waits for participant (optional)
        4. Starts the session
        5. Waits for shutdown signal
        6. Cleans up gracefully

        Args:
            session: AgentSession to manage (will call session.start() and session.close())
            wait_for_participant: Whether to wait for a participant before starting

        Example:
            ```python
            async def entrypoint(ctx: JobContext):
                session = AgentSession(agent=agent, pipeline=pipeline)
                await ctx.run_until_shutdown(session=session, wait_for_participant=True)
            ```
        """
        shutdown_event = asyncio.Event()

        if session:

            async def cleanup_session():
                logger.info("Cleaning up session...")
                try:
                    await session.close()
                except Exception as e:
                    logger.error(f"Error closing session in cleanup: {e}")
                shutdown_event.set()

            self.add_shutdown_callback(cleanup_session)
        else:

            async def cleanup_no_session():
                logger.info("Shutdown called, no session to clean up")
                shutdown_event.set()

            self.add_shutdown_callback(cleanup_no_session)

        def on_session_end(reason: str):
            logger.info(f"Session ended: {reason}")
            asyncio.create_task(self.shutdown())

        try:
            try:
                await self.connect()
            except Exception as e:
                logger.error(f"Error connecting to room: {e}")
                raise

            if self.room:
                try:
                    self.room.setup_session_end_callback(on_session_end)
                    logger.info("Session end callback configured")
                except Exception as e:
                    logger.warning(f"Error setting up session end callback: {e}")
            else:
                logger.warning(
                    "Room not available, session end callback not configured"
                )

            if wait_for_participant and self.room:
                try:
                    logger.info("Waiting for participant...")
                    await self.room.wait_for_participant()
                    logger.info("Participant joined")
                except Exception as e:
                    logger.error(f"Error waiting for participant: {e}")
                    raise

            if session:
                try:
                    await session.start()
                    logger.info("Agent session started")
                except Exception as e:
                    logger.error(f"Error starting session: {e}")
                    raise

            logger.info(
                "Agent is running... (will exit when session ends or on interrupt)"
            )
            await shutdown_event.wait()
            logger.info("Shutdown event received, exiting gracefully...")

        except KeyboardInterrupt:
            logger.info("Keyboard interrupt received, shutting down...")
        except Exception as e:
            logger.error(f"Unexpected error in run_until_shutdown: {e}")
            raise
        finally:
            if session:
                try:
                    await session.close()
                except Exception as e:
                    logger.error(f"Error closing session in finally: {e}")

            try:
                await self.shutdown()
            except Exception as e:
                logger.error(f"Error in ctx.shutdown: {e}")

    def get_room_id(self) -> str:
        """
        Creates a new room using the VideoSDK API and returns the room ID.

        Raises:
            ValueError: If the VIDEOSDK_AUTH_TOKEN is missing.
            RuntimeError: If the API request fails or the response is invalid.
        """
        if self.want_console:
            return None

        if self.videosdk_auth:
            base_url = self.room_options.signaling_base_url
            url = f"https://{base_url}/v2/rooms"
            headers = {"Authorization": self.videosdk_auth}

            try:
                response = requests.post(url, headers=headers)
                response.raise_for_status()
            except requests.RequestException as e:
                raise RuntimeError(f"Failed to create room: {e}") from e

            data = response.json()
            room_id = data.get("roomId")
            if not room_id:
                raise RuntimeError(f"Unexpected API response, missing roomId: {data}")

            return room_id
        else:
            raise ValueError(
                "VIDEOSDK_AUTH_TOKEN not found. "
                "Set it as an environment variable or provide it in room options via auth_token."
            )

Methods

def add_shutdown_callback(self, callback: Callable[[], Coroutine[None, None, None]]) ‑> None
Expand source code
def add_shutdown_callback(
    self, callback: Callable[[], Coroutine[None, None, None]]
) -> None:
    """Add a callback to be called during shutdown"""
    self._shutdown_callbacks.append(callback)

Add a callback to be called during shutdown

async def connect(self) ‑> None
Expand source code
async def connect(self) -> None:
    """Connect to the room"""
    if self.room_options:
        custom_camera_video_track = None
        custom_microphone_audio_track = None
        sinks = []

        avatar = self.room_options.avatar
        if not avatar and self._pipeline and hasattr(self._pipeline, "avatar"):
            avatar = self._pipeline.avatar

        if avatar:
            await avatar.connect()
            custom_camera_video_track = avatar.video_track
            custom_microphone_audio_track = avatar.audio_track
            sinks.append(avatar)

        if self.want_console:
            from .console_mode import setup_console_voice_for_ctx

            if not self._pipeline:
                raise RuntimeError(
                    "Pipeline must be constructed before ctx.connect() in console mode"
                )
            cleanup_callback = await setup_console_voice_for_ctx(self)
            self.add_shutdown_callback(cleanup_callback)
        else:
            if self.room_options.transport_mode == TransportMode.VIDEOSDK:
                if not self.room_options.room_id:
                    self.room_options.room_id = self.get_room_id()
                if self.room_options.join_meeting:
                    agent_id = self._pipeline.agent.id if self._pipeline and hasattr(self._pipeline, 'agent') else None
                    self.room = VideoSDKHandler(
                        meeting_id=self.room_options.room_id,
                        auth_token=self.videosdk_auth,
                        name=self.room_options.name,
                        agent_participant_id=self.room_options.agent_participant_id,
                        agent_id=agent_id,
                        pipeline=self._pipeline,
                        loop=self._loop,
                        vision=self.room_options.vision,
                        recording=self.room_options.recording,
                        custom_camera_video_track=custom_camera_video_track,
                        custom_microphone_audio_track=custom_microphone_audio_track,
                        audio_sinks=sinks,
                        background_audio=self.room_options.background_audio,
                        on_room_error=self.room_options.on_room_error,
                        auto_end_session=self.room_options.auto_end_session,
                        session_timeout_seconds=self.room_options.session_timeout_seconds,
                        signaling_base_url=self.room_options.signaling_base_url,
                    )
                if self._pipeline and hasattr(
                    self._pipeline, "_set_loop_and_audio_track"
                ):
                    self._pipeline._set_loop_and_audio_track(
                        self._loop, self.room.audio_track
                    )

            elif self.room_options.transport_mode == TransportMode.WEBSOCKET:
                if not self.room_options.websocket:
                    raise ValueError("WebSocket configuration (websocket) is required when mode is WEBSOCKET")
                
                if self.room_options.webrtc and (self.room_options.webrtc.signaling_url or self.room_options.webrtc.ice_servers != [{"urls": "stun:stun.l.google.com:19302"}]):
                    logger.warning("WebRTC configuration provided but transport mode is set to WEBSOCKET. WebRTC config will be ignored.")

                from .transports.websocket_handler import WebSocketTransportHandler
                self.room = WebSocketTransportHandler(
                    loop=self._loop,
                    pipeline=self._pipeline,
                    port=self.room_options.websocket.port,
                    path=self.room_options.websocket.path
                )
            elif self.room_options.transport_mode == TransportMode.WEBRTC:
                if not self.room_options.webrtc:
                    raise ValueError("WebRTC configuration (webrtc) is required when mode is WEBRTC")
                
                if not self.room_options.webrtc.signaling_url:
                    raise ValueError("WebRTC signaling_url is required when mode is WEBRTC")

                if self.room_options.websocket and (self.room_options.websocket.port != 8080 or self.room_options.websocket.path != "/ws"):
                    logger.warning("WebSocket configuration provided but connection mode is set to WEBRTC. WebSocket config will be ignored.")

                from .transports.webrtc_handler import WebRTCTransportHandler
                self.room = WebRTCTransportHandler(
                    loop=self._loop,
                    pipeline=self._pipeline,
                    signaling_url=self.room_options.webrtc.signaling_url,
                    ice_servers=self.room_options.webrtc.ice_servers
                )
            
            elif self.room_options.transport_mode == TransportMode.VIDEOSDK:
                if self.room_options.websocket and (self.room_options.websocket.port != 8080 or self.room_options.websocket.path != "/ws"):
                     logger.warning("WebSocket configuration provided but transport mode is VIDEOSDK. WebSocket config will be ignored.")
                if self.room_options.webrtc and (self.room_options.webrtc.signaling_url or self.room_options.webrtc.ice_servers != [{"urls": "stun:stun.l.google.com:19302"}]):
                     logger.warning("WebRTC configuration provided but transport mode is VIDEOSDK. WebRTC config will be ignored.")

    if self.room:
        await self.room.connect()

        # For Non-VideoSDK modes, we still need to ensure audio track is linked if not done inside constructor
        if (
            self.room_options.transport_mode != TransportMode.VIDEOSDK
            and self._pipeline
            and hasattr(self._pipeline, "_set_loop_and_audio_track")
        ):
            # BaseTransportHandler subclasses now initialize self.audio_track
            if self.room.audio_track:
                self._pipeline._set_loop_and_audio_track(self._loop, self.room.audio_track)

    if (
        self.room_options.playground
        and self.room_options.join_meeting
        and not self.want_console
        and self.room_options.transport_mode == TransportMode.VIDEOSDK
    ):
        if self.videosdk_auth:
            playground_url = f"https://playground.videosdk.live?token={self.videosdk_auth}&meetingId={self.room_options.room_id}"
            print(f"\033[1;36m" + "Agent started in playground mode" + "\033[0m")
            print("\033[1;75m" + "Interact with agent here at:" + "\033[0m")
            print("\033[1;4;94m" + playground_url + "\033[0m")
        else:
            raise ValueError("VIDEOSDK_AUTH_TOKEN environment variable not found")

Connect to the room

def get_room_id(self) ‑> str
Expand source code
def get_room_id(self) -> str:
    """
    Creates a new room using the VideoSDK API and returns the room ID.

    Raises:
        ValueError: If the VIDEOSDK_AUTH_TOKEN is missing.
        RuntimeError: If the API request fails or the response is invalid.
    """
    if self.want_console:
        return None

    if self.videosdk_auth:
        base_url = self.room_options.signaling_base_url
        url = f"https://{base_url}/v2/rooms"
        headers = {"Authorization": self.videosdk_auth}

        try:
            response = requests.post(url, headers=headers)
            response.raise_for_status()
        except requests.RequestException as e:
            raise RuntimeError(f"Failed to create room: {e}") from e

        data = response.json()
        room_id = data.get("roomId")
        if not room_id:
            raise RuntimeError(f"Unexpected API response, missing roomId: {data}")

        return room_id
    else:
        raise ValueError(
            "VIDEOSDK_AUTH_TOKEN not found. "
            "Set it as an environment variable or provide it in room options via auth_token."
        )

Creates a new room using the VideoSDK API and returns the room ID.

Raises

ValueError
If the VIDEOSDK_AUTH_TOKEN is missing.
RuntimeError
If the API request fails or the response is invalid.
async def run_until_shutdown(self, session: Any = None, wait_for_participant: bool = False) ‑> None
Expand source code
async def run_until_shutdown(
    self,
    session: Any = None,
    wait_for_participant: bool = False,
) -> None:
    """
    Simplified helper that handles all cleanup boilerplate.

    This method:
    1. Connects to the room
    2. Sets up session end callbacks
    3. Waits for participant (optional)
    4. Starts the session
    5. Waits for shutdown signal
    6. Cleans up gracefully

    Args:
        session: AgentSession to manage (will call session.start() and session.close())
        wait_for_participant: Whether to wait for a participant before starting

    Example:
        ```python
        async def entrypoint(ctx: JobContext):
            session = AgentSession(agent=agent, pipeline=pipeline)
            await ctx.run_until_shutdown(session=session, wait_for_participant=True)
        ```
    """
    shutdown_event = asyncio.Event()

    if session:

        async def cleanup_session():
            logger.info("Cleaning up session...")
            try:
                await session.close()
            except Exception as e:
                logger.error(f"Error closing session in cleanup: {e}")
            shutdown_event.set()

        self.add_shutdown_callback(cleanup_session)
    else:

        async def cleanup_no_session():
            logger.info("Shutdown called, no session to clean up")
            shutdown_event.set()

        self.add_shutdown_callback(cleanup_no_session)

    def on_session_end(reason: str):
        logger.info(f"Session ended: {reason}")
        asyncio.create_task(self.shutdown())

    try:
        try:
            await self.connect()
        except Exception as e:
            logger.error(f"Error connecting to room: {e}")
            raise

        if self.room:
            try:
                self.room.setup_session_end_callback(on_session_end)
                logger.info("Session end callback configured")
            except Exception as e:
                logger.warning(f"Error setting up session end callback: {e}")
        else:
            logger.warning(
                "Room not available, session end callback not configured"
            )

        if wait_for_participant and self.room:
            try:
                logger.info("Waiting for participant...")
                await self.room.wait_for_participant()
                logger.info("Participant joined")
            except Exception as e:
                logger.error(f"Error waiting for participant: {e}")
                raise

        if session:
            try:
                await session.start()
                logger.info("Agent session started")
            except Exception as e:
                logger.error(f"Error starting session: {e}")
                raise

        logger.info(
            "Agent is running... (will exit when session ends or on interrupt)"
        )
        await shutdown_event.wait()
        logger.info("Shutdown event received, exiting gracefully...")

    except KeyboardInterrupt:
        logger.info("Keyboard interrupt received, shutting down...")
    except Exception as e:
        logger.error(f"Unexpected error in run_until_shutdown: {e}")
        raise
    finally:
        if session:
            try:
                await session.close()
            except Exception as e:
                logger.error(f"Error closing session in finally: {e}")

        try:
            await self.shutdown()
        except Exception as e:
            logger.error(f"Error in ctx.shutdown: {e}")

Simplified helper that handles all cleanup boilerplate.

This method: 1. Connects to the room 2. Sets up session end callbacks 3. Waits for participant (optional) 4. Starts the session 5. Waits for shutdown signal 6. Cleans up gracefully

Args

session
AgentSession to manage (will call session.start() and session.close())
wait_for_participant
Whether to wait for a participant before starting

Example

async def entrypoint(ctx: JobContext):
    session = AgentSession(agent=agent, pipeline=pipeline)
    await ctx.run_until_shutdown(session=session, wait_for_participant=True)
async def shutdown(self) ‑> None
Expand source code
async def shutdown(self) -> None:
    """Called by Worker during graceful shutdown"""
    if self._is_shutting_down:
        logger.info("JobContext already shutting down")
        return
    self._is_shutting_down = True
    logger.info("JobContext shutting down")
    for callback in self._shutdown_callbacks:
        try:
            await callback()
        except Exception as e:
            logger.error(f"Error in shutdown callback: {e}")

    if self._pipeline:
        try:
            await self._pipeline.cleanup()
        except Exception as e:
            logger.error(f"Error during pipeline cleanup: {e}")
        self._pipeline = None

    if self.room:
        try:
            if not getattr(self.room, "_left", False):
                await self.room.leave()
            else:
                logger.info("Room already left, skipping room.leave()")
        except Exception as e:
            logger.error(f"Error during room leave: {e}")
        try:
            if hasattr(self.room, "cleanup"):
                await self.room.cleanup()
        except Exception as e:
            logger.error(f"Error during room cleanup: {e}")
        self.room = None

    self.room_options = None
    self._loop = None
    self.videosdk_auth = None
    self._shutdown_callbacks.clear()
    logger.info("JobContext cleaned up")

Called by Worker during graceful shutdown

async def wait_for_participant(self, participant_id: str | None = None) ‑> str
Expand source code
async def wait_for_participant(self, participant_id: str | None = None) -> str:
    if self.room:
        return await self.room.wait_for_participant(participant_id)
    else:
        raise ValueError("Room not initialized")
class JobExecutorType (*args, **kwds)
Expand source code
@unique
class JobExecutorType(Enum):
    PROCESS = "process"
    THREAD = "thread"

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Ancestors

  • enum.Enum

Class variables

var PROCESS
var THREAD
class Options (executor_type: Any = None,
num_idle_processes: int = 1,
initialize_timeout: float = 10.0,
close_timeout: float = 60.0,
memory_warn_mb: float = 500.0,
memory_limit_mb: float = 0.0,
ping_interval: float = 30.0,
max_processes: int = 1,
agent_id: str = 'VideoSDKAgent',
auth_token: str | None = None,
permissions: Any = None,
max_retry: int = 16,
load_threshold: float = 0.75,
register: bool = False,
signaling_base_url: str = 'api.videosdk.live',
host: str = '0.0.0.0',
port: int = 8081,
log_level: str = 'INFO')
Expand source code
@dataclass
class Options:
    """Configuration options for WorkerJob execution."""

    executor_type: Any = None  # Will be set in __post_init__
    """Which executor to use to run jobs. Automatically selected based on platform."""

    num_idle_processes: int = 1
    """Number of idle processes/threads to keep warm."""

    initialize_timeout: float = 10.0
    """Maximum amount of time to wait for a process/thread to initialize/prewarm"""

    close_timeout: float = 60.0
    """Maximum amount of time to wait for a job to shut down gracefully"""

    memory_warn_mb: float = 500.0
    """Memory warning threshold in MB."""

    memory_limit_mb: float = 0.0
    """Maximum memory usage for a job in MB. Defaults to 0 (disabled)."""

    ping_interval: float = 30.0
    """Interval between health check pings."""

    max_processes: int = 1
    """Maximum number of processes/threads."""

    agent_id: str = "VideoSDKAgent"
    """ID of the agent."""

    auth_token: Optional[str] = None
    """VideoSDK authentication token. Uses VIDEOSDK_AUTH_TOKEN env var if not provided."""

    permissions: Any = None  # Will be set in __post_init__
    """Permissions for the agent participant."""

    max_retry: int = 16
    """Maximum number of times to retry connecting to VideoSDK."""

    load_threshold: float = 0.75
    """Load threshold above which worker is marked as unavailable."""

    register: bool = False
    """Whether to register with the backend. Defaults to False for local development."""

    signaling_base_url: str = "api.videosdk.live"
    """Signaling base URL for VideoSDK services. Defaults to api.videosdk.live."""

    host: str = "0.0.0.0"
    """Host for the debug HTTP server."""

    port: int = 8081
    """Port for the debug HTTP server."""

    log_level: str = "INFO"
    """Log level for SDK logging. Options: DEBUG, INFO, WARNING, ERROR. Defaults to INFO."""

    def __post_init__(self):
        """Post-initialization setup."""
        # Import here to avoid circular imports
        from .worker import ExecutorType, WorkerPermissions, _default_executor_type

        if self.executor_type is None:
            self.executor_type = _default_executor_type

        if self.permissions is None:
            self.permissions = WorkerPermissions()

        if not self.auth_token:
            self.auth_token = os.getenv("VIDEOSDK_AUTH_TOKEN")

Configuration options for WorkerJob execution.

Instance variables

var agent_id : str

ID of the agent.

var auth_token : str | None

VideoSDK authentication token. Uses VIDEOSDK_AUTH_TOKEN env var if not provided.

var close_timeout : float

Maximum amount of time to wait for a job to shut down gracefully

var executor_type : Any

Which executor to use to run jobs. Automatically selected based on platform.

var host : str

Host for the debug HTTP server.

var initialize_timeout : float

Maximum amount of time to wait for a process/thread to initialize/prewarm

var load_threshold : float

Load threshold above which worker is marked as unavailable.

var log_level : str

Log level for SDK logging. Options: DEBUG, INFO, WARNING, ERROR. Defaults to INFO.

var max_processes : int

Maximum number of processes/threads.

var max_retry : int

Maximum number of times to retry connecting to VideoSDK.

var memory_limit_mb : float

Maximum memory usage for a job in MB. Defaults to 0 (disabled).

var memory_warn_mb : float

Memory warning threshold in MB.

var num_idle_processes : int

Number of idle processes/threads to keep warm.

var permissions : Any

Permissions for the agent participant.

var ping_interval : float

Interval between health check pings.

var port : int

Port for the debug HTTP server.

var register : bool

Whether to register with the backend. Defaults to False for local development.

var signaling_base_url : str

Signaling base URL for VideoSDK services. Defaults to api.videosdk.live.

class RoomOptions (transport_mode: str | TransportMode | None = None,
websocket: WebSocketConfig | None = None,
webrtc: WebRTCConfig | None = None,
**kwargs)
Expand source code
@dataclass
class RoomOptions:
    room_id: Optional[str] = None
    auth_token: Optional[str] = None
    name: Optional[str] = "Agent"
    agent_participant_id: Optional[str] = None
    playground: bool = True
    vision: bool = False
    recording: bool = False
    avatar: Optional[Any] = None
    join_meeting: Optional[bool] = True
    on_room_error: Optional[Callable[[Any], None]] = None
    send_analytics_to_pubsub: Optional[bool] = False
    # Session management options
    auto_end_session: bool = True
    session_timeout_seconds: Optional[int] = 5
    # VideoSDK connection options
    signaling_base_url: Optional[str] = "api.videosdk.live"
    background_audio: bool = False

    # New Configuration Fields
    _transport_mode: TransportMode = field(default=TransportMode.VIDEOSDK, init=False, repr=False)

    # Structured configs
    websocket: Optional[WebSocketConfig] = None
    webrtc: Optional[WebRTCConfig] = None

    # Alias properties for easier usage as requested
    @property
    def transport_mode(self) -> TransportMode:
        return self._transport_mode

    @transport_mode.setter
    def transport_mode(self, value):
        if isinstance(value, str):
            try:
                self._transport_mode = TransportMode(value.lower())
            except ValueError:
                # Fallback for compatibility or custom modes
                pass
        elif isinstance(value, TransportMode):
            self._transport_mode = value

    def __init__(
        self,
        transport_mode: Optional[str | TransportMode] = None,
        websocket: Optional[WebSocketConfig] = None,
        webrtc: Optional[WebRTCConfig] = None,
        **kwargs,
    ):
        # Initialize internal field
        self._transport_mode = TransportMode.VIDEOSDK
        
        # Handle connection mode
        if transport_mode:
            if isinstance(transport_mode, str):
                try:
                    self._transport_mode = TransportMode(transport_mode.lower())
                except ValueError:
                    pass
            elif isinstance(transport_mode, TransportMode):
                self._transport_mode = transport_mode

        self.websocket = websocket or WebSocketConfig()
        self.webrtc = webrtc or WebRTCConfig()

        # Handle standard fields
        for key, value in kwargs.items():
            if hasattr(self, key):
                setattr(self, key, value)

RoomOptions(transport_mode: Union[str, agents.job.TransportMode, NoneType] = None, websocket: Optional[agents.job.WebSocketConfig] = None, webrtc: Optional[agents.job.WebRTCConfig] = None, **kwargs)

Instance variables

var agent_participant_id : str | None
var auth_token : str | None
var auto_end_session : bool
var avatar : Any | None
var background_audio : bool
var join_meeting : bool | None
var name : str | None
var on_room_error : Callable[[Any], None] | None
var playground : bool
var recording : bool
var room_id : str | None
var send_analytics_to_pubsub : bool | None
var session_timeout_seconds : int | None
var signaling_base_url : str | None
prop transport_modeTransportMode
Expand source code
@property
def transport_mode(self) -> TransportMode:
    return self._transport_mode
var vision : bool
var webrtcWebRTCConfig | None
var websocketWebSocketConfig | None
class RunningJobInfo (accept_arguments: JobAcceptArguments,
job: JobContext,
url: str,
token: str,
worker_id: str)
Expand source code
@dataclass
class RunningJobInfo:
    """Information about a running job."""

    accept_arguments: JobAcceptArguments
    job: JobContext
    url: str
    token: str
    worker_id: str

    async def _run(self):
        # Placeholder for job execution logic if needed in the future
        pass

Information about a running job.

Instance variables

var accept_argumentsJobAcceptArguments
var jobJobContext
var token : str
var url : str
var worker_id : str
class TransportMode (*args, **kwds)
Expand source code
class TransportMode(Enum):
    VIDEOSDK = "videosdk"
    WEBSOCKET = "websocket"
    WEBRTC = "webrtc"

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Ancestors

  • enum.Enum

Class variables

var VIDEOSDK
var WEBRTC
var WEBSOCKET
class WebRTCConfig (signaling_url: str | None = None,
signaling_type: str = 'websocket',
ice_servers: list | None = None)
Expand source code
@dataclass
class WebRTCConfig:
    signaling_url: Optional[str] = None
    signaling_type: str = "websocket"
    ice_servers: Optional[list] = None

    def __post_init__(self):
        if self.ice_servers is None:
            self.ice_servers = [{"urls": "stun:stun.l.google.com:19302"}]

WebRTCConfig(signaling_url: Optional[str] = None, signaling_type: str = 'websocket', ice_servers: Optional[list] = None)

Instance variables

var ice_servers : list | None
var signaling_type : str
var signaling_url : str | None
class WebSocketConfig (port: int = 8080, path: str = '/ws')
Expand source code
@dataclass
class WebSocketConfig:
    port: int = 8080
    path: str = "/ws"

WebSocketConfig(port: int = 8080, path: str = '/ws')

Instance variables

var path : str
var port : int
class WorkerJob (entrypoint,
jobctx=None,
options: Options | None = None)
Expand source code
class WorkerJob:
    def __init__(self, entrypoint, jobctx=None, options: Optional[Options] = None):
        """
        :param entrypoint: An async function accepting one argument: jobctx
        :param jobctx: A static object or a callable that returns a context per job
        :param options: Configuration options for job execution
        """
        if not asyncio.iscoroutinefunction(entrypoint):
            raise TypeError("entrypoint must be a coroutine function")
        self.entrypoint = entrypoint
        self.jobctx = jobctx
        self.options = options or Options()

    def start(self):
        from .worker import Worker, WorkerOptions

        # Convert JobOptions to WorkerOptions for compatibility
        worker_options = WorkerOptions(
            entrypoint_fnc=self.entrypoint,
            agent_id=self.options.agent_id,
            auth_token=self.options.auth_token,
            executor_type=self.options.executor_type,
            num_idle_processes=self.options.num_idle_processes,
            initialize_timeout=self.options.initialize_timeout,
            close_timeout=self.options.close_timeout,
            memory_warn_mb=self.options.memory_warn_mb,
            memory_limit_mb=self.options.memory_limit_mb,
            ping_interval=self.options.ping_interval,
            max_processes=self.options.max_processes,
            permissions=self.options.permissions,
            max_retry=self.options.max_retry,
            load_threshold=self.options.load_threshold,
            register=self.options.register,
            signaling_base_url=self.options.signaling_base_url,
            host=self.options.host,
            port=self.options.port,
            log_level=self.options.log_level,
        )

        # If register=True, run the worker in backend mode (don't execute entrypoint immediately)
        if self.options.register:
            default_room_options = None
            if self.jobctx:
                if callable(self.jobctx):
                    job_context = self.jobctx()
                else:
                    job_context = self.jobctx
                default_room_options = job_context.room_options
            # Run the worker normally (for backend registration mode)
            Worker.run_worker(
                options=worker_options, default_room_options=default_room_options
            )
        else:
            # Direct mode - run entrypoint immediately if we have a job context
            if self.jobctx:
                if callable(self.jobctx):
                    job_context = self.jobctx()
                else:
                    job_context = self.jobctx

                # Set the current job context and run the entrypoint
                token = _set_current_job_context(job_context)
                try:
                    asyncio.run(self.entrypoint(job_context))
                finally:
                    _reset_current_job_context(token)
            else:
                # No job context provided, run worker normally
                Worker.run_worker(worker_options)

:param entrypoint: An async function accepting one argument: jobctx :param jobctx: A static object or a callable that returns a context per job :param options: Configuration options for job execution

Methods

def start(self)
Expand source code
def start(self):
    from .worker import Worker, WorkerOptions

    # Convert JobOptions to WorkerOptions for compatibility
    worker_options = WorkerOptions(
        entrypoint_fnc=self.entrypoint,
        agent_id=self.options.agent_id,
        auth_token=self.options.auth_token,
        executor_type=self.options.executor_type,
        num_idle_processes=self.options.num_idle_processes,
        initialize_timeout=self.options.initialize_timeout,
        close_timeout=self.options.close_timeout,
        memory_warn_mb=self.options.memory_warn_mb,
        memory_limit_mb=self.options.memory_limit_mb,
        ping_interval=self.options.ping_interval,
        max_processes=self.options.max_processes,
        permissions=self.options.permissions,
        max_retry=self.options.max_retry,
        load_threshold=self.options.load_threshold,
        register=self.options.register,
        signaling_base_url=self.options.signaling_base_url,
        host=self.options.host,
        port=self.options.port,
        log_level=self.options.log_level,
    )

    # If register=True, run the worker in backend mode (don't execute entrypoint immediately)
    if self.options.register:
        default_room_options = None
        if self.jobctx:
            if callable(self.jobctx):
                job_context = self.jobctx()
            else:
                job_context = self.jobctx
            default_room_options = job_context.room_options
        # Run the worker normally (for backend registration mode)
        Worker.run_worker(
            options=worker_options, default_room_options=default_room_options
        )
    else:
        # Direct mode - run entrypoint immediately if we have a job context
        if self.jobctx:
            if callable(self.jobctx):
                job_context = self.jobctx()
            else:
                job_context = self.jobctx

            # Set the current job context and run the entrypoint
            token = _set_current_job_context(job_context)
            try:
                asyncio.run(self.entrypoint(job_context))
            finally:
                _reset_current_job_context(token)
        else:
            # No job context provided, run worker normally
            Worker.run_worker(worker_options)