Skip to main content

A2A Implementation Guide

This guide shows you how to build a complete Agent to Agent (A2A) system using the concepts from the A2A Overview. We'll create a banking customer service system with a main customer service agent and a loan specialist.

Implementation Overview

We'll build a system with:

  • Customer Service Agent: Voice-enabled interface agent using RealTimePipeline for low-latency voice interactions
  • Loan Specialist Agent: Text-based domain expert using CascadingPipeline for efficient text processing
  • Intelligent Routing: Automatic detection and forwarding of loan queries
  • Seamless Communication: Users get expert responses without knowing about the routing

Structure of the project

A2A
├── agents/
│ ├── customer_agent.py # CustomerServiceAgent definition
│ ├── loan_agent.py # LoanAgent definition

├── session_manager.py # Handles session creation, pipeline setup, meeting join/leave
└── main.py # Entry point: runs main() and starts agents

Sequence Diagram

A2A Architecture

Step 1: Create the Customer Service Agent

  • Interface Agent: Creates CustomerServiceAgent as the main user-facing agent with voice capabilities and customer service instructions.
  • Function Tool: Implements@function_tool forward_to_specialist()that uses A2A discovery to find and route queries to domain specialists.
  • Response Relay: Includes handle_specialist_response() method that automatically receives and relays specialist responses back to users.
agents/customer_agent.py
from videosdk.agents import Agent, AgentCard, A2AMessage, function_tool
import asyncio
from typing import Dict, Any

class CustomerServiceAgent(Agent):
def __init__(self):
super().__init__(
agent_id="customer_service_1",
instructions=(
"You are a helpful bank customer service agent. "
"For general banking queries (account balances, transactions, basic services), answer directly. "
"For ANY loan-related queries, questions, or follow-ups, ALWAYS use the forward_to_specialist function "
"with domain set to 'loan'. This includes initial loan questions AND all follow-up questions about loans. "
"Do NOT attempt to answer loan questions yourself - always forward them to the specialist. "
"After forwarding a loan query, stay engaged and automatically relay any response you receive from the specialist. "
"When you receive responses from specialists, immediately relay them naturally to the customer."
)
)

@function_tool
async def forward_to_specialist(self, query: str, domain: str) -> Dict[str, Any]:
"""Forward queries to domain specialist agents using A2A discovery"""

# Use A2A discovery to find specialists by domain
specialists = self.a2a.registry.find_agents_by_domain(domain)
id_of_target_agent = specialists[0] if specialists else None

if not id_of_target_agent:
return {"error": f"No specialist found for domain {domain}"}

# Send A2A message to the specialist
await self.a2a.send_message(
to_agent=id_of_target_agent,
message_type="specialist_query",
content={"query": query}
)

return {
"status": "forwarded",
"specialist": id_of_target_agent,
"message": "Let me get that information for you from our loan specialist..."
}

async def handle_specialist_response(self, message: A2AMessage) -> None:
"""Handle responses from specialist agents and relay to user"""
response = message.content.get("response")
if response:
# Brief pause for natural conversation flow
await asyncio.sleep(0.5)

# Try multiple methods to relay the response to the user
prompt = f"The loan specialist has responded: {response}"
methods_to_try = [
(self.session.pipeline.send_text_message, prompt),# While using Cascading as main agent, comment this
(self.session.pipeline.model.send_message, response),# While using Cascading as main agent, comment this
(self.session.say, response)
]

for method, arg in methods_to_try:
try:
await method(arg)
break
except Exception as e:
print(f"Error with {method.__name__}: {e}")

async def on_enter(self):
# Register this agent with the A2A system
await self.register_a2a(AgentCard(
id="customer_service_1",
name="Customer Service Agent",
domain="customer_service",
capabilities=["query_handling", "specialist_coordination"],
description="Handles customer queries and coordinates with specialists"
))
await self.session.say("Hello! I am your customer service agent. How can I help you?")

# Set up message listener for specialist responses
self.a2a.on_message("specialist_response", self.handle_specialist_response)

async def on_exit(self):
print("Customer agent left the meeting")

Step 2: Create the Loan Specialist Agent

  • Specialist Agent Setup: Creates LoanAgent class with specialized loan expertise instructions and agent_id "specialist_1".
  • Message Handlers: Implements handle_specialist_query() to process incoming queries and handle_model_response() to send responses back.
  • Registration: Registers with A2A system using domain "loan" so it can be discovered by other agents needing loan expertise.
agents/loan_agent.py
from videosdk.agents import Agent, AgentCard, A2AMessage

class LoanAgent(Agent):
def __init__(self):
super().__init__(
agent_id="specialist_1",
instructions=(
"You are a specialized loan expert at a bank. "
"Provide detailed, helpful information about loans including interest rates, terms, and requirements. "
"Give complete answers with specific details when possible. "
"You can discuss personal loans, car loans, home loans, and business loans. "
"Provide helpful guidance and next steps for loan applications. "
"Be friendly and professional in your responses. "
"Keep responses concise within 5-7 lines and easily understandable."
)
)

async def handle_specialist_query(self, message: A2AMessage):
"""Process incoming queries from customer service agent"""
query = message.content.get("query")
if query:
# Send the query to our AI model for processing
await self.session.pipeline.send_text_message(query)

async def handle_model_response(self, message: A2AMessage):
"""Send processed responses back to requesting agent"""
response = message.content.get("response")
requesting_agent = message.to_agent
if response and requesting_agent:
# Send the specialist response back to the customer service agent
await self.a2a.send_message(
to_agent=requesting_agent,
message_type="specialist_response",
content={"response": response}
)

async def on_enter(self):
await self.register_a2a(AgentCard(
id="specialist_1",
name="Loan Specialist Agent",
domain="loan",
capabilities=["loan_consultation", "loan_information", "interest_rates"],
description="Handles loan queries"
))
self.a2a.on_message("specialist_query", self.handle_specialist_query)
self.a2a.on_message("model_response", self.handle_model_response)

async def on_exit(self):
print("LoanAgent Left")

Step 3: Configure Session Management

  • Pipeline Architecture: Uses RealTimePipeline for customer agent (audio-enabled Gemini for voice interaction) and CascadingPipeline for specialist agent (text-only OpenAI for efficient processing).
  • Session Factory: Provides create_pipeline() and create_session() functions to properly configure agent sessions based on their roles.
  • Modality Separation: Ensures customer agent can handle voice while specialist processes text in background.
session_manager.py
from videosdk.agents import AgentSession, CascadingPipeline, RealTimePipeline, ConversationFlow
from videosdk.plugins.openai import OpenAILLM
from videosdk.plugins.google import GeminiRealtime, GeminiLiveConfig
import os

class MyConversationFlow(ConversationFlow):
async def on_turn_start(self, transcript: str) -> None:
pass

async def on_turn_end(self) -> None:
pass

def create_pipeline(agent_type: str):
if agent_type == "customer":
# Customer agent: RealTimePipeline for voice interaction
return RealTimePipeline(
model=GeminiRealtime(
model="gemini-2.0-flash-live-001",
config=GeminiLiveConfig(
voice="Leda",
response_modalities=["AUDIO"]
)
)
)
else:
# Specialist agent: CascadingPipeline for text processing
return CascadingPipeline(
llm=OpenAILLM(api_key=os.getenv("OPENAI_API_KEY")),
)

def create_session(agent, pipeline) -> AgentSession:
return AgentSession(
agent=agent,
pipeline=pipeline,
conversation_flow=MyConversationFlow(agent=agent),
)
info

Pipeline Support: The VideoSDK AI Agents framework supports both RealTimePipeline and CascadingPipeline, enabling flexible configurations for voice and text processing with A2A. You can run a full RealTimePipeline or CascadingPipeline for both modalities, or create a hybrid setup that combines the two. This allows you to tailor the use of STT, TTS, and LLM to suit your specific use case, whether for low-latency interactions, complex processing flows, or a mix of both.

Step 4: Deploy A2A System on VideoSDK Platform

  • Meeting Setup: Customer agent joins VideoSDK meeting for user interaction while specialist runs in background mode. Requires environment variables: VIDEOSDK_AUTH_TOKEN, GOOGLE_API_KEY, and OPENAI_API_KEY.
  • System Orchestration: Uses JobContext and WorkerJob to manage the meeting lifecycle and agent coordination.
  • Resource Management: Handles startup sequence, keeps system running, and provides clean shutdown with proper A2A unregistration
main.py
import asyncio
from contextlib import suppress
from agents.customer_agent import CustomerServiceAgent
from agents.loan_agent import LoanAgent
from session_manager import create_pipeline, create_session
from videosdk.agents import JobContext, RoomOptions, WorkerJob

async def main(ctx: JobContext):
specialist_agent = LoanAgent()
specialist_pipeline = create_pipeline("specialist")
specialist_session = create_session(specialist_agent, specialist_pipeline)

customer_agent = CustomerServiceAgent()
customer_pipeline = create_pipeline("customer")
customer_session = create_session(customer_agent, customer_pipeline)

specialist_task = asyncio.create_task(specialist_session.start())

try:
await ctx.connect()
await customer_session.start()
await asyncio.Event().wait()
except (KeyboardInterrupt, asyncio.CancelledError):
print("Shutting down...")
finally:
specialist_task.cancel()
with suppress(asyncio.CancelledError):
await specialist_task

await specialist_session.close()
await customer_session.close()

await specialist_agent.unregister_a2a()
await customer_agent.unregister_a2a()
await ctx.shutdown()


def customer_agent_context() -> JobContext:
room_options = RoomOptions(room_id="<meeting_id>", name="Customer Service Agent", playground=True)

return JobContext(
room_options=room_options
)


if __name__ == "__main__":
job = WorkerJob(entrypoint=main, jobctx=customer_agent_context)
job.start()

Running the Application

Set the required environment variables:

export VIDEOSDK_AUTH_TOKEN="your_videosdk_token"
export GOOGLE_API_KEY="your_google_api_key"
export OPENAI_API_KEY="your_openai_api_key"

Replace <meeting_id> in the code with your actual meeting ID, then run:

cd A2A
python main.py
Quick Start

Get the complete working example at A2A Quick Start Repository with all the code ready to run.

Got a Question? Ask us on discord