Module agents.backend

Backend communication module for VideoSDK Agents.

This module provides WebSocket connection and protocol handling for communicating with the VideoSDK backend server.

Sub-modules

agents.backend.connection
agents.backend.protocol

Protocol definitions for VideoSDK Agent backend communication …

Classes

class AvailabilityRequest (type: str = 'availability_request',
job_id: str = '',
job_type: str = '',
room_id: str = '',
room_name: str = '',
agent_name: str = '',
namespace: str = '')
Expand source code
@dataclass
class AvailabilityRequest:
    """Request from server asking if worker is available for a job."""

    type: str = "availability_request"
    job_id: str = ""
    job_type: str = ""
    room_id: str = ""
    room_name: str = ""
    agent_name: str = ""
    namespace: str = ""

Request from server asking if worker is available for a job.

Instance variables

var agent_name : str
var job_id : str
var job_type : str
var namespace : str
var room_id : str
var room_name : str
var type : str
class AvailabilityResponse (type: str = 'availability_response',
job_id: str = '',
available: bool = False,
error: str | None = None,
token: str | None = None)
Expand source code
@dataclass
class AvailabilityResponse:
    """Response from worker indicating availability."""

    type: str = "availability_response"
    job_id: str = ""
    available: bool = False
    error: Optional[str] = None
    token: Optional[str] = None  # Worker's auth token when accepting job

    def dict(self) -> Dict[str, Any]:
        """Convert to dictionary, excluding None values."""
        result = {}
        for key, value in self.__dict__.items():
            if value is not None:
                result[key] = value
        return result

Response from worker indicating availability.

Instance variables

var available : bool
var error : str | None
var job_id : str
var token : str | None
var type : str

Methods

def dict(self) ‑> Dict[str, Any]
Expand source code
def dict(self) -> Dict[str, Any]:
    """Convert to dictionary, excluding None values."""
    result = {}
    for key, value in self.__dict__.items():
        if value is not None:
            result[key] = value
    return result

Convert to dictionary, excluding None values.

class BackendConnection (auth_token: str,
agent_id: str = '',
worker_type: str = 'room',
version: str = '1.0.0',
max_retry: int = 16,
http_proxy: str | None = None,
backend_url: str = None,
load_threshold: float = 0.75,
max_processes: int = 10)
Expand source code
class BackendConnection:
    """Manages WebSocket connection to the backend registry server."""

    def __init__(
        self,
        auth_token: str,
        agent_id: str = "",
        worker_type: str = "room",
        version: str = "1.0.0",
        max_retry: int = 16,
        http_proxy: Optional[str] = None,
        backend_url: str = None,
        load_threshold: float = 0.75,
        max_processes: int = 10,
    ):
        self.auth_token = auth_token
        self.agent_id = agent_id
        self.worker_type = worker_type
        self.version = version
        self.max_retry = max_retry
        self.http_proxy = http_proxy
        self.backend_url = backend_url
        self.load_threshold = load_threshold
        self.max_processes = max_processes

        # Connection state
        self._closed = True
        self._connecting = False
        self._ws: Optional[ClientWebSocketResponse] = None
        self._http_session: Optional[aiohttp.ClientSession] = None
        self._worker_id = "unregistered"
        self._retry_count = 0

        # Message handling
        self._msg_queue: asyncio.Queue[WorkerMessage] = asyncio.Queue()
        self._pending_assignments: Dict[str, asyncio.Future[JobAssignment]] = {}

        # Callbacks
        self._on_availability: Optional[Callable[[AvailabilityRequest], None]] = None
        self._on_assignment: Optional[Callable[[JobAssignment], None]] = None
        self._on_termination: Optional[Callable[[JobTermination], None]] = None
        self._on_register: Optional[Callable[[str, Dict[str, Any]], None]] = None
        self._on_pong: Optional[Callable[[WorkerPong], None]] = None

        # Tasks
        self._connection_task: Optional[asyncio.Task] = None
        self._send_task: Optional[asyncio.Task] = None
        self._recv_task: Optional[asyncio.Task] = None
        self._status_task: Optional[asyncio.Task] = None

    def _get_worker_id_file_path(self) -> str:
        """Get the path to the worker ID file."""
        # Use agent ID to create a unique file path
        safe_agent_id = "".join(
            c for c in self.agent_id if c.isalnum() or c in ("-", "_")
        ).rstrip()
        if not safe_agent_id:
            safe_agent_id = "default"

        # Create a directory for worker IDs if it doesn't exist
        worker_id_dir = os.path.expanduser("~/.videosdk-agents/worker-ids")
        os.makedirs(worker_id_dir, exist_ok=True)

        return os.path.join(worker_id_dir, f"{safe_agent_id}.worker_id")

    def _get_worker_id_env_key(self) -> str:
        """Get the environment variable key for worker ID."""
        safe_agent_id = "".join(
            c for c in self.agent_id if c.isalnum() or c in ("-", "_")
        ).rstrip()
        if not safe_agent_id:
            safe_agent_id = "default"
        return f"VIDEOSDK_WORKER_ID_{safe_agent_id.upper()}"

    def _load_memory_worker_id(self) -> Optional[str]:
        """Load worker ID from memory (environment variable only)."""
        env_key = self._get_worker_id_env_key()
        env_worker_id = os.environ.get(env_key)
        if env_worker_id and len(env_worker_id.strip()) > 0:
            logger.info(f"Loaded worker ID from memory: {env_worker_id}")
            return env_worker_id.strip()
        return None

    def _save_memory_worker_id(self, worker_id: str) -> None:
        """Save worker ID to memory only (environment variable)."""
        env_key = self._get_worker_id_env_key()
        os.environ[env_key] = worker_id
        logger.info(f"Saved worker ID to memory: {worker_id}")

    def _load_persistent_worker_id(self) -> Optional[str]:
        """Load worker ID from persistent storage (alias for memory-based method)."""
        return self._load_memory_worker_id()

    def _save_persistent_worker_id(self, worker_id: str) -> None:
        """Save worker ID to persistent storage (alias for memory-based method)."""
        self._save_memory_worker_id(worker_id)

    def _get_registry_assigned_worker_id(self) -> Optional[str]:
        """Get the worker ID that was previously assigned by the registry."""
        return self._load_memory_worker_id()

    def _generate_or_recover_worker_id(self) -> str:
        """Generate a new worker ID or recover existing one."""
        # Try to load existing worker ID
        existing_worker_id = self._load_persistent_worker_id()
        if existing_worker_id:
            logger.info(f"Using existing worker ID: {existing_worker_id}")
            return existing_worker_id

        # Generate new worker ID
        new_worker_id = str(uuid.uuid4())
        logger.info(f"Generated new worker ID: {new_worker_id}")

        # Save the new worker ID
        self._save_persistent_worker_id(new_worker_id)

        return new_worker_id

    @property
    def worker_id(self) -> str:
        """Get the worker ID assigned by the server."""
        return self._worker_id

    @property
    def is_connected(self) -> bool:
        """Check if connected to the backend."""
        return not self._closed and not self._connecting

    def on_availability(self, callback: Callable[[AvailabilityRequest], None]):
        """Set callback for availability requests."""
        self._on_availability = callback

    def on_assignment(self, callback: Callable[[JobAssignment], None]):
        """Set callback for job assignments."""
        self._on_assignment = callback

    def on_termination(self, callback: Callable[[JobTermination], None]):
        """Set callback for job terminations."""
        self._on_termination = callback

    def on_register(self, callback: Callable[[str, Dict[str, Any]], None]):
        """Set callback for registration responses."""
        self._on_register = callback

    def on_pong(self, callback: Callable[[WorkerPong], None]):
        """Set callback for pong responses."""
        self._on_pong = callback

    async def connect(self):
        """Connect to the backend server."""
        if self._closed:
            self._closed = False
            self._connection_task = asyncio.create_task(self._connection_loop())

    async def disconnect(self):
        """Disconnect from backend server."""
        logger.info("Disconnecting from backend server")
        self._closed = True

        # Cancel connection task FIRST to prevent reconnection
        if self._connection_task and not self._connection_task.done():
            logger.info("Cancelling connection task to prevent reconnection")
            self._connection_task.cancel()
            try:
                await self._connection_task
                logger.info("Connection task cancelled successfully")
            except asyncio.CancelledError:
                logger.info("Connection task was cancelled as expected")
            except Exception as e:
                logger.error(f"Error cancelling connection task: {e}")
        else:
            logger.info("Connection task was already done or doesn't exist")

        # Send final status update to inform registry of shutdown
        if self._ws and not self._ws.closed:
            try:
                shutdown_msg = WorkerMessage(
                    type="status_update",
                    worker_id=self._worker_id,
                    status="offline",
                    load=0.0,
                    job_count=0,
                )
                await self._ws.send_str(json.dumps(shutdown_msg.dict()))
                logger.info(
                    f"Sent shutdown notification to registry for worker: {self._worker_id}"
                )
            except Exception as e:
                logger.warning(f"Failed to send shutdown notification: {e}")

        # Close WebSocket connection properly
        if self._ws and not self._ws.closed:
            try:
                await self._ws.close()
                logger.info("WebSocket connection closed")
            except Exception as e:
                logger.warning(f"Error closing WebSocket: {e}")

        # Cancel other tasks
        for task in [
            self._send_task,
            self._recv_task,
            self._status_task,
        ]:
            if task and not task.done():
                task.cancel()
                try:
                    await task
                except asyncio.CancelledError:
                    pass

        # Close HTTP session if it exists
        if self._http_session:
            await self._http_session.close()

        logger.info("Backend disconnection complete")

    async def send_message(self, message: WorkerMessage):
        """Send a message to the backend."""
        if not self.is_connected:
            raise RuntimeError("Not connected to backend")

        await self._msg_queue.put(message)

    async def _connection_loop(self):
        """Main connection loop with retry logic."""
        logger.info("Connection loop started")
        while not self._closed:
            try:
                logger.debug("Attempting to establish connection")
                self._connecting = True
                await self._establish_connection()
                self._connecting = False
                self._retry_count = 0

                # Start message handling tasks
                self._send_task = asyncio.create_task(self._send_loop())
                self._recv_task = asyncio.create_task(self._recv_loop())
                self._status_task = asyncio.create_task(self._status_loop())

                # Wait for any task to complete (or fail)
                done, pending = await asyncio.wait(
                    [self._send_task, self._recv_task, self._status_task],
                    return_when=asyncio.FIRST_COMPLETED,
                )

                # Cancel remaining tasks
                for task in pending:
                    task.cancel()
                    try:
                        await task
                    except asyncio.CancelledError:
                        pass

                # Check if we should exit the loop
                if self._closed:
                    logger.info("Connection loop exiting due to shutdown")
                    break

            except asyncio.CancelledError:
                logger.info("Connection loop cancelled")
                break
            except Exception as e:
                if self._closed:
                    logger.info(
                        "Connection loop exiting due to shutdown during exception"
                    )
                    break

                if self._retry_count >= self.max_retry:
                    logger.error(
                        f"Failed to connect after {self._retry_count} attempts"
                    )
                    raise RuntimeError(
                        f"Failed to connect to backend after {self._retry_count} attempts"
                    ) from e

                retry_delay = min(self._retry_count * 2, 10)
                self._retry_count += 1

                logger.warning(f"Connection failed, retrying in {retry_delay}s: {e}")
                await asyncio.sleep(retry_delay)

    async def _establish_connection(self):
        """Establish connection to the backend registry server."""
        logger.debug("Establishing connection to backend")

        if not self._http_session:
            self._http_session = aiohttp.ClientSession()

        # Parse backend URL
        parse = urlparse(self.backend_url)
        scheme = parse.scheme or "wss"
        if scheme.startswith("http"):
            scheme = scheme.replace("http", "wss")

        base = f"{scheme}://{parse.netloc}{parse.path}".rstrip("/") + "/"
        agent_url = urljoin(base, "agent")

        # Connect to WebSocket
        headers = {"Authorization": f"Bearer {self.auth_token}"}

        logger.debug(f"Connecting to WebSocket: {agent_url}")
        self._ws = await self._http_session.ws_connect(
            agent_url,
            headers=headers,
            autoping=True,
            proxy=self.http_proxy or None,
        )
        logger.debug("WebSocket connection established")

        # Get previously assigned worker ID from registry (if any)
        worker_id = self._get_registry_assigned_worker_id()
        if worker_id:
            logger.info(f"Using previously assigned worker ID: {worker_id}")
        else:
            logger.info(
                "No previously assigned worker ID found, requesting new assignment from registry"
            )
            worker_id = ""  # Empty string tells registry to assign a new ID

        register_msg = WorkerMessage(
            type="register",
            worker_id=worker_id,  # Empty string for new assignment, existing ID for reconnection
            agent_name=self.agent_id,
            namespace="default",
            version=self.version,
            capabilities=["room", "voice", "stt", "tts"],
            registry_uuid="default",
            token=self.auth_token,
            # Add workload configuration
            load_threshold=self.load_threshold,
            max_processes=self.max_processes,
        )

        logger.debug(
            f"Sending registration message for worker: {worker_id or 'NEW_ASSIGNMENT'}"
        )
        logger.debug(f"Registration message: {register_msg.dict()}")
        logger.debug(f"Agent ID: '{self.agent_id}', Worker type: '{self.worker_type}'")

        await self._ws.send_str(json.dumps(register_msg.dict()))

        # Wait for registration response
        msg = await self._ws.receive()
        if msg.type == aiohttp.WSMsgType.TEXT:
            data = json.loads(msg.data)
            if data.get("type") == "register" and data.get("success"):
                assigned_worker_id = data.get("worker_id")
                self._worker_id = assigned_worker_id
                logger.info(f"Worker registered with backend: {self._worker_id}")

                # Store the assigned worker ID in memory for future use
                if assigned_worker_id and assigned_worker_id != worker_id:
                    logger.info(
                        f"Registry assigned new worker ID: {assigned_worker_id}"
                    )
                    self._save_memory_worker_id(assigned_worker_id)
                elif assigned_worker_id == worker_id:
                    logger.info(
                        f"Registry confirmed existing worker ID: {assigned_worker_id}"
                    )
                else:
                    logger.warning("Registry did not provide a worker ID")

                if self._on_register:
                    self._on_register(self._worker_id, data.get("payload", {}))
            else:
                raise RuntimeError(
                    f"Registration failed: {data.get('message', 'Unknown error')}"
                )
        else:
            raise RuntimeError("Unexpected message type during registration")

    async def _send_loop(self):
        """Send messages to the backend."""
        while not self._closed and self._ws:
            try:
                msg = await asyncio.wait_for(self._msg_queue.get(), timeout=1.0)
                await self._ws.send_str(json.dumps(msg.dict()))
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                logger.error(f"Error sending message: {e}")
                break

    async def _recv_loop(self):
        """Receive messages from the backend."""
        while not self._closed and self._ws:
            try:
                msg = await self._ws.receive()

                if msg.type in (
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    logger.info("WebSocket connection closed")
                    break

                if msg.type != aiohttp.WSMsgType.TEXT:
                    logger.warning(f"Unexpected message type: {msg.type}")
                    continue

                data = json.loads(msg.data)
                await self._handle_server_message(data)

            except Exception as e:
                logger.error(f"Error receiving message: {e}")
                break

    async def _handle_server_message(self, data: Dict[str, Any]):
        """Handle messages from the server."""
        msg_type = data.get("type")

        if msg_type == "availability_request":
            if self._on_availability:
                request = AvailabilityRequest(**data)
                self._on_availability(request)

        elif msg_type == "job_assignment":
            if self._on_assignment:
                assignment = JobAssignment(**data)
                self._on_assignment(assignment)

        elif msg_type == "job_termination":
            if self._on_termination:
                termination = JobTermination(**data)
                self._on_termination(termination)

        elif msg_type == "pong":
            if self._on_pong:
                pong = WorkerPong(**data)
                self._on_pong(pong)

        else:
            logger.warning(f"Unknown message type: {msg_type}")

    async def _status_loop(self):
        """Send periodic status updates."""
        # This method is deprecated - status updates are now handled by the worker
        # The worker sends status updates through the message queue
        while not self._closed:
            try:
                await asyncio.sleep(30)  # Just keep the loop alive
                # No status updates sent from here - worker handles them
            except Exception as e:
                logger.error(f"Error in status loop: {e}")
                break

    async def wait_for_assignment(
        self, job_id: str, timeout: float = 7.5
    ) -> JobAssignment:
        """Wait for a job assignment with timeout."""
        future = asyncio.Future()
        self._pending_assignments[job_id] = future

        try:
            return await asyncio.wait_for(future, timeout=timeout)
        finally:
            self._pending_assignments.pop(job_id, None)

Manages WebSocket connection to the backend registry server.

Instance variables

prop is_connected : bool
Expand source code
@property
def is_connected(self) -> bool:
    """Check if connected to the backend."""
    return not self._closed and not self._connecting

Check if connected to the backend.

prop worker_id : str
Expand source code
@property
def worker_id(self) -> str:
    """Get the worker ID assigned by the server."""
    return self._worker_id

Get the worker ID assigned by the server.

Methods

async def connect(self)
Expand source code
async def connect(self):
    """Connect to the backend server."""
    if self._closed:
        self._closed = False
        self._connection_task = asyncio.create_task(self._connection_loop())

Connect to the backend server.

async def disconnect(self)
Expand source code
async def disconnect(self):
    """Disconnect from backend server."""
    logger.info("Disconnecting from backend server")
    self._closed = True

    # Cancel connection task FIRST to prevent reconnection
    if self._connection_task and not self._connection_task.done():
        logger.info("Cancelling connection task to prevent reconnection")
        self._connection_task.cancel()
        try:
            await self._connection_task
            logger.info("Connection task cancelled successfully")
        except asyncio.CancelledError:
            logger.info("Connection task was cancelled as expected")
        except Exception as e:
            logger.error(f"Error cancelling connection task: {e}")
    else:
        logger.info("Connection task was already done or doesn't exist")

    # Send final status update to inform registry of shutdown
    if self._ws and not self._ws.closed:
        try:
            shutdown_msg = WorkerMessage(
                type="status_update",
                worker_id=self._worker_id,
                status="offline",
                load=0.0,
                job_count=0,
            )
            await self._ws.send_str(json.dumps(shutdown_msg.dict()))
            logger.info(
                f"Sent shutdown notification to registry for worker: {self._worker_id}"
            )
        except Exception as e:
            logger.warning(f"Failed to send shutdown notification: {e}")

    # Close WebSocket connection properly
    if self._ws and not self._ws.closed:
        try:
            await self._ws.close()
            logger.info("WebSocket connection closed")
        except Exception as e:
            logger.warning(f"Error closing WebSocket: {e}")

    # Cancel other tasks
    for task in [
        self._send_task,
        self._recv_task,
        self._status_task,
    ]:
        if task and not task.done():
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass

    # Close HTTP session if it exists
    if self._http_session:
        await self._http_session.close()

    logger.info("Backend disconnection complete")

Disconnect from backend server.

def on_assignment(self,
callback: Callable[[JobAssignment], None])
Expand source code
def on_assignment(self, callback: Callable[[JobAssignment], None]):
    """Set callback for job assignments."""
    self._on_assignment = callback

Set callback for job assignments.

def on_availability(self,
callback: Callable[[AvailabilityRequest], None])
Expand source code
def on_availability(self, callback: Callable[[AvailabilityRequest], None]):
    """Set callback for availability requests."""
    self._on_availability = callback

Set callback for availability requests.

def on_pong(self,
callback: Callable[[WorkerPong], None])
Expand source code
def on_pong(self, callback: Callable[[WorkerPong], None]):
    """Set callback for pong responses."""
    self._on_pong = callback

Set callback for pong responses.

def on_register(self, callback: Callable[[str, Dict[str, Any]], None])
Expand source code
def on_register(self, callback: Callable[[str, Dict[str, Any]], None]):
    """Set callback for registration responses."""
    self._on_register = callback

Set callback for registration responses.

def on_termination(self,
callback: Callable[[JobTermination], None])
Expand source code
def on_termination(self, callback: Callable[[JobTermination], None]):
    """Set callback for job terminations."""
    self._on_termination = callback

Set callback for job terminations.

async def send_message(self,
message: WorkerMessage)
Expand source code
async def send_message(self, message: WorkerMessage):
    """Send a message to the backend."""
    if not self.is_connected:
        raise RuntimeError("Not connected to backend")

    await self._msg_queue.put(message)

Send a message to the backend.

async def wait_for_assignment(self, job_id: str, timeout: float = 7.5) ‑> JobAssignment
Expand source code
async def wait_for_assignment(
    self, job_id: str, timeout: float = 7.5
) -> JobAssignment:
    """Wait for a job assignment with timeout."""
    future = asyncio.Future()
    self._pending_assignments[job_id] = future

    try:
        return await asyncio.wait_for(future, timeout=timeout)
    finally:
        self._pending_assignments.pop(job_id, None)

Wait for a job assignment with timeout.

class JobAssignment (type: str = 'job_assignment',
job_id: str = '',
job_type: str = '',
room_id: str = '',
room_name: str = '',
agent_name: str = '',
namespace: str = '',
token: str = '',
url: str = '',
room_options: Dict[str, Any] | None = None,
metadata: Dict[str, Any] | None = None)
Expand source code
@dataclass
class JobAssignment:
    """Job assignment from server to worker."""

    type: str = "job_assignment"
    job_id: str = ""
    job_type: str = ""
    room_id: str = ""
    room_name: str = ""
    agent_name: str = ""
    namespace: str = ""
    token: str = ""
    url: str = ""
    room_options: Optional[Dict[str, Any]] = None
    metadata: Optional[Dict[str, Any]] = None

    def dict(self) -> Dict[str, Any]:
        """Convert to dictionary, excluding None values."""
        result = {}
        for key, value in self.__dict__.items():
            if value is not None:
                result[key] = value
        return result

Job assignment from server to worker.

Instance variables

var agent_name : str
var job_id : str
var job_type : str
var metadata : Dict[str, Any] | None
var namespace : str
var room_id : str
var room_name : str
var room_options : Dict[str, Any] | None
var token : str
var type : str
var url : str

Methods

def dict(self) ‑> Dict[str, Any]
Expand source code
def dict(self) -> Dict[str, Any]:
    """Convert to dictionary, excluding None values."""
    result = {}
    for key, value in self.__dict__.items():
        if value is not None:
            result[key] = value
    return result

Convert to dictionary, excluding None values.

class JobStatus (*args, **kwds)
Expand source code
class JobStatus(str, Enum):
    """Job status enumeration."""

    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

Job status enumeration.

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var CANCELLED
var COMPLETED
var FAILED
var PENDING
var RUNNING
class JobTermination (type: str = 'job_termination', job_id: str = '', reason: str | None = None)
Expand source code
@dataclass
class JobTermination:
    """Job termination request from server."""

    type: str = "job_termination"
    job_id: str = ""
    reason: Optional[str] = None

Job termination request from server.

Instance variables

var job_id : str
var reason : str | None
var type : str
class JobUpdate (type: str = 'job_update',
job_id: str = '',
status: str = '',
error: str | None = None,
participant_identity: str | None = None,
participant_name: str | None = None,
participant_metadata: str | None = None)
Expand source code
@dataclass
class JobUpdate:
    """Update from worker about job status."""

    type: str = "job_update"
    job_id: str = ""
    status: str = ""
    error: Optional[str] = None
    participant_identity: Optional[str] = None
    participant_name: Optional[str] = None
    participant_metadata: Optional[str] = None

    def dict(self) -> Dict[str, Any]:
        """Convert to dictionary, excluding None values."""
        result = {}
        for key, value in self.__dict__.items():
            if value is not None:
                result[key] = value
        return result

Update from worker about job status.

Instance variables

var error : str | None
var job_id : str
var participant_identity : str | None
var participant_metadata : str | None
var participant_name : str | None
var status : str
var type : str

Methods

def dict(self) ‑> Dict[str, Any]
Expand source code
def dict(self) -> Dict[str, Any]:
    """Convert to dictionary, excluding None values."""
    result = {}
    for key, value in self.__dict__.items():
        if value is not None:
            result[key] = value
    return result

Convert to dictionary, excluding None values.

class ServerMessage (type: str,
worker_id: str | None = None,
success: bool | None = None,
message: str | None = None,
job_id: str | None = None,
job_type: str | None = None,
room_id: str | None = None,
room_name: str | None = None,
agent_name: str | None = None,
namespace: str | None = None,
token: str | None = None,
url: str | None = None)
Expand source code
@dataclass
class ServerMessage:
    """Base message from server to worker."""

    type: str
    worker_id: Optional[str] = None
    success: Optional[bool] = None
    message: Optional[str] = None
    job_id: Optional[str] = None
    job_type: Optional[str] = None
    room_id: Optional[str] = None
    room_name: Optional[str] = None
    agent_name: Optional[str] = None
    namespace: Optional[str] = None
    token: Optional[str] = None
    url: Optional[str] = None

    def dict(self) -> Dict[str, Any]:
        """Convert to dictionary, excluding None values."""
        result = {}
        for key, value in self.__dict__.items():
            if value is not None:
                result[key] = value
        return result

Base message from server to worker.

Instance variables

var agent_name : str | None
var job_id : str | None
var job_type : str | None
var message : str | None
var namespace : str | None
var room_id : str | None
var room_name : str | None
var success : bool | None
var token : str | None
var type : str
var url : str | None
var worker_id : str | None

Methods

def dict(self) ‑> Dict[str, Any]
Expand source code
def dict(self) -> Dict[str, Any]:
    """Convert to dictionary, excluding None values."""
    result = {}
    for key, value in self.__dict__.items():
        if value is not None:
            result[key] = value
    return result

Convert to dictionary, excluding None values.

class UpdateJobStatus (job_id: str,
status: JobStatus,
error: str | None = None,
participant_identity: str | None = None,
participant_name: str | None = None,
participant_metadata: str | None = None)
Expand source code
@dataclass
class UpdateJobStatus:
    """Update job status message."""

    job_id: str
    status: JobStatus
    error: Optional[str] = None
    participant_identity: Optional[str] = None
    participant_name: Optional[str] = None
    participant_metadata: Optional[str] = None

Update job status message.

Instance variables

var error : str | None
var job_id : str
var participant_identity : str | None
var participant_metadata : str | None
var participant_name : str | None
var statusJobStatus
class UpdateWorkerStatus (status: WorkerStatus,
load: float | None = None,
job_count: int | None = None,
error: str | None = None)
Expand source code
@dataclass
class UpdateWorkerStatus:
    """Update worker status message."""

    status: WorkerStatus
    load: Optional[float] = None
    job_count: Optional[int] = None
    error: Optional[str] = None

Update worker status message.

Instance variables

var error : str | None
var job_count : int | None
var load : float | None
var statusWorkerStatus
class WorkerMessage (type: str,
worker_id: str | None = None,
agent_name: str | None = None,
namespace: str | None = None,
version: str | None = None,
capabilities: List[str] | None = None,
status: str | None = None,
load: float | None = None,
job_count: int | None = None,
job_id: str | None = None,
available: bool | None = None,
participant_identity: str | None = None,
participant_name: str | None = None,
participant_metadata: str | None = None,
error: str | None = None,
registry_uuid: str | None = None,
max_capacity: int | None = None,
current_load: float | None = None,
token: str | None = None,
load_threshold: float | None = None,
max_processes: int | None = None)
Expand source code
@dataclass
class WorkerMessage:
    """Base message from worker to server."""

    type: str
    worker_id: Optional[str] = None
    agent_name: Optional[str] = None
    namespace: Optional[str] = None
    version: Optional[str] = None
    capabilities: Optional[List[str]] = None
    status: Optional[str] = None
    load: Optional[float] = None
    job_count: Optional[int] = None
    job_id: Optional[str] = None
    available: Optional[bool] = None
    participant_identity: Optional[str] = None
    participant_name: Optional[str] = None
    participant_metadata: Optional[str] = None
    error: Optional[str] = None
    registry_uuid: Optional[str] = None
    max_capacity: Optional[int] = None
    current_load: Optional[float] = None
    token: Optional[str] = None  # Authentication token for registry

    # Workload configuration from agent
    load_threshold: Optional[float] = None  # Agent's load threshold (e.g., 0.8)
    max_processes: Optional[int] = None  # Agent's max processes (e.g., 3)

    def dict(self) -> Dict[str, Any]:
        """Convert to dictionary, excluding None values."""
        result = {}
        for key, value in self.__dict__.items():
            if value is not None:
                result[key] = value
        return result

Base message from worker to server.

Instance variables

var agent_name : str | None
var available : bool | None
var capabilities : List[str] | None
var current_load : float | None
var error : str | None
var job_count : int | None
var job_id : str | None
var load : float | None
var load_threshold : float | None
var max_capacity : int | None
var max_processes : int | None
var namespace : str | None
var participant_identity : str | None
var participant_metadata : str | None
var participant_name : str | None
var registry_uuid : str | None
var status : str | None
var token : str | None
var type : str
var version : str | None
var worker_id : str | None

Methods

def dict(self) ‑> Dict[str, Any]
Expand source code
def dict(self) -> Dict[str, Any]:
    """Convert to dictionary, excluding None values."""
    result = {}
    for key, value in self.__dict__.items():
        if value is not None:
            result[key] = value
    return result

Convert to dictionary, excluding None values.

class WorkerPing (type: str = 'ping', timestamp: int | None = None)
Expand source code
@dataclass
class WorkerPing:
    """Ping message from worker."""

    type: str = "ping"
    timestamp: Optional[int] = None

Ping message from worker.

Instance variables

var timestamp : int | None
var type : str
class WorkerPong (type: str = 'pong', timestamp: int | None = None)
Expand source code
@dataclass
class WorkerPong:
    """Pong response from server."""

    type: str = "pong"
    timestamp: Optional[int] = None

Pong response from server.

Instance variables

var timestamp : int | None
var type : str
class WorkerStatus (*args, **kwds)
Expand source code
class WorkerStatus(str, Enum):
    """Worker status enumeration."""

    IDLE = "idle"
    BUSY = "busy"
    OFFLINE = "offline"
    ERROR = "error"

Worker status enumeration.

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var BUSY
var ERROR
var IDLE
var OFFLINE