A2A Implementation Guide
This guide shows you how to build a complete Agent to Agent (A2A) system using the concepts from the A2A Overview. We'll create a banking customer service system with a main customer service agent and a loan specialist.
Implementation Overview
We'll build a system with:
- Customer Service Agent: Voice-enabled interface agent using RealTimePipeline for low-latency voice interactions
- Loan Specialist Agent: Text-based domain expert using CascadingPipeline for efficient text processing
- Intelligent Routing: Automatic detection and forwarding of loan queries
- Seamless Communication: Users get expert responses without knowing about the routing
Structure of the project
A2A
├── agents/
│ ├── customer_agent.py # CustomerServiceAgent definition
│ ├── loan_agent.py # LoanAgent definition
│
├── session_manager.py # Handles session creation, pipeline setup, meeting join/leave
└── main.py # Entry point: runs main() and starts agents
Sequence Diagram
Step 1: Create the Customer Service Agent
Interface Agent
: CreatesCustomerServiceAgent
as the main user-facing agent with voice capabilities and customer service instructions.Function Tool
: Implements@function_tool forward_to_specialist()
that uses A2A discovery to find and route queries to domain specialists.Response Relay
: Includeshandle_specialist_response()
method that automatically receives and relays specialist responses back to users.
from videosdk.agents import Agent, AgentCard, A2AMessage, function_tool
import asyncio
from typing import Dict, Any
class CustomerServiceAgent(Agent):
def __init__(self):
super().__init__(
agent_id="customer_service_1",
instructions=(
"You are a helpful bank customer service agent. "
"For general banking queries (account balances, transactions, basic services), answer directly. "
"For ANY loan-related queries, questions, or follow-ups, ALWAYS use the forward_to_specialist function "
"with domain set to 'loan'. This includes initial loan questions AND all follow-up questions about loans. "
"Do NOT attempt to answer loan questions yourself - always forward them to the specialist. "
"After forwarding a loan query, stay engaged and automatically relay any response you receive from the specialist. "
"When you receive responses from specialists, immediately relay them naturally to the customer."
)
)
@function_tool
async def forward_to_specialist(self, query: str, domain: str) -> Dict[str, Any]:
"""Forward queries to domain specialist agents using A2A discovery"""
# Use A2A discovery to find specialists by domain
specialists = self.a2a.registry.find_agents_by_domain(domain)
id_of_target_agent = specialists[0] if specialists else None
if not id_of_target_agent:
return {"error": f"No specialist found for domain {domain}"}
# Send A2A message to the specialist
await self.a2a.send_message(
to_agent=id_of_target_agent,
message_type="specialist_query",
content={"query": query}
)
return {
"status": "forwarded",
"specialist": id_of_target_agent,
"message": "Let me get that information for you from our loan specialist..."
}
async def handle_specialist_response(self, message: A2AMessage) -> None:
"""Handle responses from specialist agents and relay to user"""
response = message.content.get("response")
if response:
# Brief pause for natural conversation flow
await asyncio.sleep(0.5)
# Try multiple methods to relay the response to the user
prompt = f"The loan specialist has responded: {response}"
methods_to_try = [
(self.session.pipeline.send_text_message, prompt),# While using Cascading as main agent, comment this
(self.session.pipeline.model.send_message, response),# While using Cascading as main agent, comment this
(self.session.say, response)
]
for method, arg in methods_to_try:
try:
await method(arg)
break
except Exception as e:
print(f"Error with {method.__name__}: {e}")
async def on_enter(self):
# Register this agent with the A2A system
await self.register_a2a(AgentCard(
id="customer_service_1",
name="Customer Service Agent",
domain="customer_service",
capabilities=["query_handling", "specialist_coordination"],
description="Handles customer queries and coordinates with specialists"
))
await self.session.say("Hello! I am your customer service agent. How can I help you?")
# Set up message listener for specialist responses
self.a2a.on_message("specialist_response", self.handle_specialist_response)
async def on_exit(self):
print("Customer agent left the meeting")
Step 2: Create the Loan Specialist Agent
Specialist Agent Setup
: CreatesLoanAgent
class with specialized loan expertise instructions and agent_id"specialist_1"
.Message Handlers
: Implementshandle_specialist_query()
to process incoming queries and handle_model_response() to send responses back.Registration
: Registers with A2A system using domain "loan" so it can bediscovered
by other agents needing loan expertise.
from videosdk.agents import Agent, AgentCard, A2AMessage
class LoanAgent(Agent):
def __init__(self):
super().__init__(
agent_id="specialist_1",
instructions=(
"You are a specialized loan expert at a bank. "
"Provide detailed, helpful information about loans including interest rates, terms, and requirements. "
"Give complete answers with specific details when possible. "
"You can discuss personal loans, car loans, home loans, and business loans. "
"Provide helpful guidance and next steps for loan applications. "
"Be friendly and professional in your responses. "
"Keep responses concise within 5-7 lines and easily understandable."
)
)
async def handle_specialist_query(self, message: A2AMessage):
"""Process incoming queries from customer service agent"""
query = message.content.get("query")
if query:
# Send the query to our AI model for processing
await self.session.pipeline.send_text_message(query)
async def handle_model_response(self, message: A2AMessage):
"""Send processed responses back to requesting agent"""
response = message.content.get("response")
requesting_agent = message.to_agent
if response and requesting_agent:
# Send the specialist response back to the customer service agent
await self.a2a.send_message(
to_agent=requesting_agent,
message_type="specialist_response",
content={"response": response}
)
async def on_enter(self):
await self.register_a2a(AgentCard(
id="specialist_1",
name="Loan Specialist Agent",
domain="loan",
capabilities=["loan_consultation", "loan_information", "interest_rates"],
description="Handles loan queries"
))
self.a2a.on_message("specialist_query", self.handle_specialist_query)
self.a2a.on_message("model_response", self.handle_model_response)
async def on_exit(self):
print("LoanAgent Left")
Step 3: Configure Session Management
Pipeline Architecture
: Uses RealTimePipeline for customer agent (audio-enabled Gemini for voice interaction) and CascadingPipeline for specialist agent (text-only OpenAI for efficient processing).Session Factory
: Providescreate_pipeline()
andcreate_session()
functions to properly configure agent sessions based on their roles.Modality Separation
: Ensures customer agent can handle voice while specialist processes text in background.
from videosdk.agents import AgentSession, CascadingPipeline, RealTimePipeline, ConversationFlow
from videosdk.plugins.openai import OpenAILLM
from videosdk.plugins.google import GeminiRealtime, GeminiLiveConfig
import os
class MyConversationFlow(ConversationFlow):
async def on_turn_start(self, transcript: str) -> None:
pass
async def on_turn_end(self) -> None:
pass
def create_pipeline(agent_type: str):
if agent_type == "customer":
# Customer agent: RealTimePipeline for voice interaction
return RealTimePipeline(
model=GeminiRealtime(
model="gemini-2.0-flash-live-001",
config=GeminiLiveConfig(
voice="Leda",
response_modalities=["AUDIO"]
)
)
)
else:
# Specialist agent: CascadingPipeline for text processing
return CascadingPipeline(
llm=OpenAILLM(api_key=os.getenv("OPENAI_API_KEY")),
)
def create_session(agent, pipeline) -> AgentSession:
return AgentSession(
agent=agent,
pipeline=pipeline,
conversation_flow=MyConversationFlow(agent=agent),
)
Pipeline Support: The VideoSDK AI Agents framework supports both RealTimePipeline and CascadingPipeline, enabling flexible configurations for voice and text processing with A2A. You can run a full RealTimePipeline
or CascadingPipeline
for both modalities, or create a hybrid setup that combines the two. This allows you to tailor the use of STT, TTS, and LLM to suit your specific use case, whether for low-latency interactions, complex processing flows, or a mix of both.
Step 4: Deploy A2A System on VideoSDK Platform
Meeting Setup
: Customer agent joins VideoSDK meeting for user interaction while specialist runs in background mode. Requires environment variables:VIDEOSDK_AUTH_TOKEN
,GOOGLE_API_KEY
, andOPENAI_API_KEY
.System Orchestration
: UsesJobContext
andWorkerJob
to manage the meeting lifecycle and agent coordination.Resource Management
: Handles startup sequence, keeps system running, and provides clean shutdown with proper A2A unregistration
import asyncio
from contextlib import suppress
from agents.customer_agent import CustomerServiceAgent
from agents.loan_agent import LoanAgent
from session_manager import create_pipeline, create_session
from videosdk.agents import JobContext, RoomOptions, WorkerJob
async def main(ctx: JobContext):
specialist_agent = LoanAgent()
specialist_pipeline = create_pipeline("specialist")
specialist_session = create_session(specialist_agent, specialist_pipeline)
customer_agent = CustomerServiceAgent()
customer_pipeline = create_pipeline("customer")
customer_session = create_session(customer_agent, customer_pipeline)
specialist_task = asyncio.create_task(specialist_session.start())
try:
await ctx.connect()
await customer_session.start()
await asyncio.Event().wait()
except (KeyboardInterrupt, asyncio.CancelledError):
print("Shutting down...")
finally:
specialist_task.cancel()
with suppress(asyncio.CancelledError):
await specialist_task
await specialist_session.close()
await customer_session.close()
await specialist_agent.unregister_a2a()
await customer_agent.unregister_a2a()
await ctx.shutdown()
def customer_agent_context() -> JobContext:
room_options = RoomOptions(room_id="<meeting_id>", name="Customer Service Agent", playground=True)
return JobContext(
room_options=room_options
)
if __name__ == "__main__":
job = WorkerJob(entrypoint=main, jobctx=customer_agent_context)
job.start()
Running the Application
Set the required environment variables:
export VIDEOSDK_AUTH_TOKEN="your_videosdk_token"
export GOOGLE_API_KEY="your_google_api_key"
export OPENAI_API_KEY="your_openai_api_key"
Replace <meeting_id>
in the code with your actual meeting ID, then run:
cd A2A
python main.py
Get the complete working example at A2A Quick Start Repository with all the code ready to run.
Got a Question? Ask us on discord