Skip to main content

Running Multiple Agents

The worker system provides a way to run multiple tasks in parallel using Python's multiprocessing. It's particularly useful for running multiple instances of the same task concurrently, such as handling multiple voice agent sessions.

Key Components​

1. WorkerJob​

WorkerJob is the main class that defines a task to be executed. It takes two parameters:

  • job_func: The function to be executed
  • jobctx: Context data for the job (can be static or a callable)
job = WorkerJob(job_func=my_function, jobctx=my_context)

2. Worker​

Worker manages the execution of jobs and provides a command-line interface to control them. It supports the following commands:

  • n: Start a new worker process
  • l: List all running worker processes
  • q <pid>: Stop a specific process
  • q: Stop all running processes
  • x: Exit the worker interface

Usage Example​

Here's a complete example of how to use the worker system with a voice agent:

import asyncio
from videosdk.plugins.openai import OpenAIRealtime, OpenAIRealtimeConfig
from videosdk.agents import Agent, AgentSession, RealTimePipeline, WorkerJob

class VoiceAgent(Agent):
def __init__(self):
super().__init__(
instructions="You are a helpful voice assistant that can answer questions and help with tasks.",
)

async def on_enter(self) -> None:
await self.session.say("Hi there! How can I help you today?")

async def start_session(jobctx):
model = OpenAIRealtime(
model="gpt-4o-realtime-preview",
)
pipeline = RealTimePipeline(model=model)
session = AgentSession(
agent=VoiceAgent(),
pipeline=pipeline,
context=jobctx
)

try:
await session.start()
await asyncio.Event().wait()
finally:
await session.close()

def entryPoint(jobctx):
asyncio.run(start_session(jobctx))

# Create job context
def make_context():
return {"meetingId": "<Meeting-ID>", "name": "<Agent-Name>"}

# Start a worker job
job = WorkerJob(job_func=entryPoint, jobctx=make_context)
job.start()

Got a Question? Ask us on discord