Skip to main content

Running Agents with Worker

The worker system provides a robust way to run AI agent instances using Python's multiprocessing. It offers process isolation, proper lifecycle management, and a clean separation between agent logic and infrastructure concerns.

Key Components

1. WorkerJob

WorkerJob is the main class that defines an agent task to be executed in a separate process. It takes two parameters:

  • entrypoint: An async function that accepts a JobContext parameter
  • jobctx: A JobContext object or a callable that returns a JobContext
job = WorkerJob(entrypoint=my_function, jobctx=my_context)

2. JobContext

JobContext provides the runtime environment for your agent, including:

  • Room Management: Handles VideoSDK room connections
  • Shutdown Callbacks: Allows cleanup operations
  • Process Isolation: Each job runs in its own process

3. Worker

Worker manages the execution of jobs in separate processes, providing:

  • Process isolation for each agent instance
  • Automatic cleanup on shutdown
  • Error handling and logging

Usage Example

Here's a complete example of how to use the worker system with a voice agent:

import asyncio
import aiohttp
from videosdk.plugins.google import GeminiRealtime, GeminiLiveConfig
from videosdk.agents import Agent, AgentSession, RealTimePipeline, WorkerJob, JobContext, RoomOptions
class MyVoiceAgent(Agent):
def __init__(self):
super().__init__(
instructions="You are a helpful voice assistant that can answer questions and help with tasks.",
)

async def on_enter(self) -> None:
await self.session.say("Hello, how can I help you today?")

async def on_exit(self) -> None:
await self.session.say("Goodbye!")

async def entrypoint(ctx: JobContext):
model = GeminiRealtime(
model="gemini-2.0-flash-live-001",
config=GeminiLiveConfig(
voice="Leda",
response_modalities=["AUDIO"]
)
)

pipeline = RealTimePipeline(model=model)
agent = MyVoiceAgent(ctx)

session = AgentSession(
agent=agent,
pipeline=pipeline,
)

async def cleanup_session():
print("Cleaning up session...")

ctx.add_shutdown_callback(cleanup_session)

try:
# connect to the room
await ctx.connect()
await ctx.room.wait_for_participant()
await session.start()
await asyncio.Event().wait()
except KeyboardInterrupt:
print("Shutting down...")
finally:
await session.close()
await ctx.shutdown()

def make_context() -> JobContext:
room_options = RoomOptions(
room_id="<meeting_id>",
name="Sandbox Agent",
playground=True
)

return JobContext(room_options=room_options)

if __name__ == "__main__":
job = WorkerJob(entrypoint=entrypoint, jobctx=make_context)
job.start()

Configuration Options

RoomOptions

  • room_id: The VideoSDK meeting ID
  • auth_token: Authentication token (or use VIDEOSDK_AUTH_TOKEN env var)
  • name: Agent name displayed in the meeting
  • playground: Enable playground mode for testing
  • vision: Enable vision capabilities
  • avatar: Use virtual avatars from available providers

Best Practices

  1. Always use cleanup callbacks: Register shutdown callbacks to ensure proper resource cleanup
  2. Handle exceptions gracefully: Use try-finally blocks to ensure cleanup happens
  3. Use playground mode for testing: Set playground=True for easy testing and debugging
  4. Set environment variables: Use VIDEOSDK_AUTH_TOKEN for authentication
  5. Wait for participants: Use wait_for_participant() to ensure agent waits for a participant

The worker system provides a production-ready way to deploy AI agents with proper isolation, lifecycle management, and error handling.

Got a Question? Ask us on discord