Skip to main content

Pub/Sub Messaging

Pub/Sub (Publish/Subscribe) messaging enables real-time, bidirectional communication between your AI agent and client applications within a VideoSDK meeting. This allows you to build interactive experiences where the client can send commands or data to the agent, and the agent can push updates or notifications back to the client, all without relying on voice.

Pub/Sub Architecture Diagram

Key Features

  • Send Messages: Agents can publish messages to any specified Pub/Sub topic, which can be received by any participant (including client applications) subscribed to that topic.
  • Receive Messages: Agents can subscribe to topics to receive messages published by client applications or other participants.
  • Bidirectional Flow: Communication is not one-way. Both the agent and the client can publish and subscribe, enabling a fully interactive loop.
  • Decoupled Communication: The client and agent do not need to know about each other's existence directly. They communicate through shared topics, which simplifies the architecture.

Implementation

Implementing Pub/Sub involves two main parts: subscribing to topics to receive messages and publishing messages. Subscribing is typically the first step on both the agent and client side.

Use the tabs below to see how to subscribe to a Pub/Sub topic across the AI Agent and client SDKs.

Subscribe on Room Context
from videosdk import PubSubSubscribeConfig

def on_client_message(message):
print(f"Received: {message}")

await ctx.room.subscribe_to_pubsub(PubSubSubscribeConfig(
topic="CHAT",
cb=on_client_message
))

The most effective way for an agent to publish messages is by exposing a function_tool. This allows the LLM to decide when to send a message based on the conversation.

To publish, you use PubSubPublishConfig and call the publish_to_pubsub method on the JobContext room object.

from videosdk import PubSubPublishConfig
from videosdk.agents import Agent, function_tool, JobContext

class MyPubSubAgent(Agent):
def __init__(self, ctx: JobContext):
super().__init__(
instructions="You can send messages to the client using the send_message tool."
)
self.ctx = ctx

@function_tool
async def send_message_to_client(self, message: str):
"""Sends a text message to the client application on the 'CHAT' topic."""
publish_config = PubSubPublishConfig(
topic="CHAT",
message=message
)
await self.ctx.room.publish_to_pubsub(publish_config)
return f"Message '{message}' sent to client."

To receive messages, the agent must subscribe to a topic using PubSubSubscribeConfig and the subscribe_to_pubsub method, which registers a callback function to handle incoming messages. This setup is typically done in your main entrypoint function after connecting to the room.

import asyncio
from videosdk import PubSubSubscribeConfig
from videosdk.agents import JobContext

# Define the callback function that will process incoming messages
def on_client_message(message):
print(f"Received message from client: {message}")
# Add your logic here to process the message.
# For example, you could pass it to the agent's pipeline.

async def entrypoint(ctx: JobContext):
# ... (agent and session setup)

try:
await ctx.connect()
await ctx.room.wait_for_participant()

# Configure the subscription
subscribe_config = PubSubSubscribeConfig(
topic="CHAT",
cb=on_client_message
)
# Subscribe to the topic
await ctx.room.subscribe_to_pubsub(subscribe_config)

# Start the agent session
await session.start()
await asyncio.Event().wait()

finally:
await session.close()
await ctx.shutdown()

Best Practices

  • Topic Naming Conventions: Use clear and consistent topic names (e.g., CHAT, AGENT_STATUS) to keep your application organized.
  • Structured Data: Use JSON for your message payloads. This makes messages easy to parse and allows for sending complex data structures.
  • Error Handling: Your callback function should gracefully handle malformed or unexpected messages to prevent crashes.
  • Asynchronous Callbacks: If your callback function performs long-running tasks, make sure it is async and consider running tasks in the background with asyncio.create_task() to avoid blocking the main event loop.

Example - Try Out Yourself

Check out our quickstart repository for a complete, runnable example of an agent using Pub/Sub.

Got a Question? Ask us on discord