Running Agents with Worker
The worker system provides a robust way to run AI agent instances using Python's multiprocessing. It offers process isolation, proper lifecycle management, and a clean separation between agent logic and infrastructure concerns.
Key Components
1. WorkerJob
WorkerJob
is the main class that defines an agent task to be executed in a separate process. It takes two parameters:
entrypoint
: An async function that accepts a JobContext parameterjobctx
: A JobContext object or a callable that returns a JobContext
job = WorkerJob(entrypoint=my_function, jobctx=my_context)
2. JobContext
JobContext
provides the runtime environment for your agent, including:
- Room Management: Handles VideoSDK room connections
- Shutdown Callbacks: Allows cleanup operations
- Process Isolation: Each job runs in its own process
3. Worker
Worker
manages the execution of jobs in separate processes, providing:
- Process isolation for each agent instance
- Automatic cleanup on shutdown
- Error handling and logging
Usage Example
Here's a complete example of how to use the worker system with a voice agent:
import asyncio
import aiohttp
from videosdk.plugins.google import GeminiRealtime, GeminiLiveConfig
from videosdk.agents import Agent, AgentSession, RealTimePipeline, WorkerJob, JobContext, RoomOptions
class MyVoiceAgent(Agent):
def __init__(self):
super().__init__(
instructions="You are a helpful voice assistant that can answer questions and help with tasks.",
)
async def on_enter(self) -> None:
await self.session.say("Hello, how can I help you today?")
async def on_exit(self) -> None:
await self.session.say("Goodbye!")
async def entrypoint(ctx: JobContext):
model = GeminiRealtime(
model="gemini-2.0-flash-live-001",
config=GeminiLiveConfig(
voice="Leda",
response_modalities=["AUDIO"]
)
)
pipeline = RealTimePipeline(model=model)
agent = MyVoiceAgent(ctx)
session = AgentSession(
agent=agent,
pipeline=pipeline,
)
async def cleanup_session():
print("Cleaning up session...")
ctx.add_shutdown_callback(cleanup_session)
try:
# connect to the room
await ctx.connect()
await ctx.room.wait_for_participant()
await session.start()
await asyncio.Event().wait()
except KeyboardInterrupt:
print("Shutting down...")
finally:
await session.close()
await ctx.shutdown()
def make_context() -> JobContext:
room_options = RoomOptions(
room_id="<meeting_id>",
name="Sandbox Agent",
playground=True
)
return JobContext(room_options=room_options)
if __name__ == "__main__":
job = WorkerJob(entrypoint=entrypoint, jobctx=make_context)
job.start()
Configuration Options
RoomOptions
room_id
: The VideoSDK meeting IDauth_token
: Authentication token (or use VIDEOSDK_AUTH_TOKEN env var)name
: Agent name displayed in the meetingplayground
: Enable playground mode for testingvision
: Enable vision capabilitiesavatar
: Use virtual avatars from available providers
Best Practices
- Always use cleanup callbacks: Register shutdown callbacks to ensure proper resource cleanup
- Handle exceptions gracefully: Use try-finally blocks to ensure cleanup happens
- Use playground mode for testing: Set
playground=True
for easy testing and debugging - Set environment variables: Use
VIDEOSDK_AUTH_TOKEN
for authentication - Wait for participants: Use
wait_for_participant()
to ensure agent waits for a participant
The worker system provides a production-ready way to deploy AI agents with proper isolation, lifecycle management, and error handling.
Got a Question? Ask us on discord