Skip to main content

A2A Implementation Guide

This guide shows you how to build a complete Agent to Agent (A2A) system using the concepts from the A2A Overview. We'll create a banking customer service system with a main customer service agent and a loan specialist.

Implementation Overview​

We'll build a system with:

  • Customer Service Agent: Voice-enabled interface agent that users interact with
  • Loan Specialist Agent: Text-based domain expert for loan-related queries
  • Intelligent Routing: Automatic detection and forwarding of loan queries
  • Seamless Communication: Users get expert responses without knowing about the routing

Structure of the project​

A2A
├── agents/
│ ├── customer_agent.py # CustomerServiceAgent definition
│ ├── loan_agent.py # LoanAgent definition
│
├── session_manager.py # Handles session creation, pipeline setup, meeting join/leave
└── main.py # Entry point: runs main() and starts agents

Sequence Diagram​

A2A Architecture

Step 1: Create the Customer Service Agent​

  • Interface Agent: Creates CustomerServiceAgent as the main user-facing agent with voice capabilities and customer service instructions.
  • Function Tool: Implements@function_tool forward_to_specialist()that uses A2A discovery to find and route queries to domain specialists.
  • Response Relay: Includes handle_specialist_response() method that automatically receives and relays specialist responses back to users.
agents/customer_agent.py
from videosdk.agents import Agent, AgentCard, A2AMessage, function_tool
import asyncio
from typing import Dict, Any

class CustomerServiceAgent(Agent):
def __init__(self):
super().__init__(
agent_id="customer_service_1",
instructions=(
"You are a helpful bank customer service agent. "
"For general banking queries (account balances, transactions, basic services), answer directly. "
"For ANY loan-related queries, questions, or follow-ups, ALWAYS use the forward_to_specialist function "
"with domain set to 'loan'. This includes initial loan questions AND all follow-up questions about loans. "
"Do NOT attempt to answer loan questions yourself - always forward them to the specialist. "
"After forwarding a loan query, stay engaged and automatically relay any response you receive from the specialist. "
"When you receive responses from specialists, immediately relay them naturally to the customer."
)
)

@function_tool
async def forward_to_specialist(self, query: str, domain: str) -> Dict[str, Any]:
"""Forward queries to domain specialist agents using A2A discovery"""

# Use A2A discovery to find specialists by domain
specialists = self.a2a.registry.find_agents_by_domain(domain)
id_of_target_agent = specialists[0] if specialists else None

if not id_of_target_agent:
return {"error": f"No specialist found for domain {domain}"}

# Send A2A message to the specialist
await self.a2a.send_message(
to_agent=id_of_target_agent,
message_type="specialist_query",
content={"query": query}
)

return {
"status": "forwarded",
"specialist": id_of_target_agent,
"message": "Let me get that information for you from our loan specialist..."
}

async def handle_specialist_response(self, message: A2AMessage) -> None:
"""Handle responses from specialist agents and relay to user"""
response = message.content.get("response")
if response:
# Brief pause for natural conversation flow
await asyncio.sleep(0.5)

# Try multiple methods to relay the response to the user
prompt = f"The loan specialist has responded: {response}"
methods_to_try = [
(self.session.pipeline.send_text_message, prompt),
(self.session.pipeline.model.send_message, response),
(self.session.say, response)
]

for method, arg in methods_to_try:
try:
await method(arg)
break
except Exception as e:
print(f"Error with {method.__name__}: {e}")

async def on_enter(self):
# Register this agent with the A2A system
await self.register_a2a(AgentCard(
id="customer_service_1",
name="Customer Service Agent",
domain="customer_service",
capabilities=["query_handling", "specialist_coordination"],
description="Handles customer queries and coordinates with specialists",
metadata={"version": "1.0", "type": "interface"}
))

# Greet the user
await self.session.say("Hello! I am your customer service agent. How can I help you?")

# Set up message listener for specialist responses
self.a2a.on_message("specialist_response", self.handle_specialist_response)

async def on_exit(self):
print("Customer agent left the meeting")

Step 2: Create the Loan Specialist Agent​

  • Specialist Agent Setup: Creates LoanAgent class with specialized loan expertise instructions and agent_id "specialist_1".
  • Message Handlers: Implements handle_specialist_query() to process incoming queries and handle_model_response() to send responses back.
  • Registration: Registers with A2A system using domain "loan" so it can be discovered by other agents needing loan expertise.
agents/loan_agent.py
from videosdk.agents import Agent, AgentCard, A2AMessage

class LoanAgent(Agent):
def __init__(self):
super().__init__(
agent_id="specialist_1",
instructions=(
"You are a specialized loan expert at a bank. "
"Provide detailed, helpful information about loans including interest rates, terms, and requirements. "
"Give complete answers with specific details when possible. "
"You can discuss personal loans, car loans, home loans, and business loans. "
"Provide helpful guidance and next steps for loan applications. "
"Be friendly and professional in your responses. "
"Keep responses concise within 5-7 lines and easily understandable."
)
)

async def handle_specialist_query(self, message: A2AMessage):
"""Process incoming queries from customer service agent"""
query = message.content.get("query")
if query:
# Send the query to our AI model for processing
await self.session.pipeline.send_text_message(query)

async def handle_model_response(self, message: A2AMessage):
"""Send processed responses back to requesting agent"""
response = message.content.get("response")
requesting_agent = message.to_agent
if response and requesting_agent:
# Send the specialist response back to the customer service agent
await self.a2a.send_message(
to_agent=requesting_agent,
message_type="specialist_response",
content={"response": response}
)

async def on_enter(self):
print("LoanAgent joined the system")

# Register this agent with the A2A system
await self.register_a2a(AgentCard(
id="specialist_1",
name="Loan Specialist Agent",
domain="loan",
capabilities=["loan_consultation", "loan_information", "interest_rates"],
description="Handles loan queries with specialized expertise",
metadata={"version": "1.0", "type": "specialist"}
))

# Set up message listeners for different message types
self.a2a.on_message("specialist_query", self.handle_specialist_query)
self.a2a.on_message("model_response", self.handle_model_response)

async def on_exit(self):
print("LoanAgent left the system")

Step 3: Configure Session Management​

  • Pipeline Creation: Sets up different pipeline configurations - customer agent gets audio-enabled Gemini for voice interaction, specialist gets text-only for efficiency.
  • Session Factory: Provides create_pipeline() and create_session() functions to properly configure agent sessions based on their roles.
  • Modality Separation: Ensures customer agent can handle voice while specialist processes text in background.
session_manager.py
from videosdk.agents import AgentSession, RealTimePipeline
from videosdk.plugins.google import GeminiRealtime, GeminiLiveConfig
from typing import Dict

def create_pipeline(agent_type: str) -> RealTimePipeline:
"""Create appropriate pipeline based on agent type"""

if agent_type == "customer":
# Customer agent: Audio-enabled for real-time voice interaction
model = GeminiRealtime(
model="gemini-2.0-flash-live-001",
config=GeminiLiveConfig(
voice="Leda", # Available voices: Puck, Charon, Kore, Fenrir, Aoede, Leda, Orus, Zephyr
response_modalities=["AUDIO"]
)
)
else:
# Specialist agent: Text-only for efficient processing
model = GeminiRealtime(
model="gemini-2.0-flash-live-001",
config=GeminiLiveConfig(response_modalities=["TEXT"])
)

return RealTimePipeline(model=model)

def create_session(agent, pipeline, context: Dict) -> AgentSession:
"""Create agent session with given configuration"""
return AgentSession(agent=agent, pipeline=pipeline, context=context)
info

You can use OpenAI's text and audio features, Gemini's text and audio features, or a combination of both.

Step 4: Deploy A2A System on VideoSDK Platform​

  • Meeting Setup: Customer agent joins VideoSDK meeting for user interaction while specialist runs in background mode. Requires environment variables: VIDEOSDK_AUTH_TOKEN and GOOGLE_API_KEY, plus meeting ID configuration in session context.
  • System Orchestration: Initializes both agents, creates their respective pipelines, and starts their sessions with proper meeting configurations
  • Resource Management: Handles startup sequence, keeps system running, and provides clean shutdown with proper A2A unregistration
main.py
import asyncio
from agents.customer_agent import CustomerServiceAgent
from agents.loan_agent import LoanAgent
from session_manager import create_pipeline, create_session

async def main():
# Initialize both customer and specialist agents
customer_agent = CustomerServiceAgent()
specialist_agent = LoanAgent()

# Create pipelines for different agent types
customer_pipeline = create_pipeline("customer")
specialist_pipeline = create_pipeline("specialist")

# Create sessions with appropriate configurations
customer_session = create_session(customer_agent, customer_pipeline, {
"name": "Customer Service Assistant",
"meetingId": "YOUR_MEETING_ID", # Replace with your meeting ID
"join_meeting": True # Customer agent joins the meeting for user interaction
})

specialist_session = create_session(specialist_agent, specialist_pipeline, {
"join_meeting": False # Specialist agent runs in background
})

try:
# Start both agent sessions
await customer_session.start()
await specialist_session.start()
# Keep the system running until manually terminated
await asyncio.Event().wait()

except KeyboardInterrupt:
print("\n Shutting down A2A system...")
finally:
# Clean up resources
await customer_session.close()
await specialist_session.close()
await customer_agent.unregister_a2a()
await specialist_agent.unregister_a2a()
print(" System shutdown complete")

if __name__ == "__main__":
asyncio.run(main())

Running the Application​

cd A2A
python main.py
Quick Start

Get the complete working example at A2A Quick Start Repository with all the code ready to run.

Got a Question? Ask us on discord