A2A Implementation Guide
This guide shows you how to build a complete Agent to Agent (A2A) system using the concepts from the A2A Overview. We'll create a banking customer service system with a main customer service agent and a loan specialist.
Implementation Overview
We'll build a system with:
- Customer Service Agent: Voice-enabled interface agent using a cascading Pipeline (STT + LLM + TTS) for voice interactions
- Loan Specialist Agent: Text-based domain expert using an LLM-only Pipeline for efficient background text processing
- Intelligent Routing: Automatic detection and forwarding of loan queries
- Seamless Communication: Users get expert responses without knowing about the routing
Supported Pipeline Configurations
| Configuration | Customer Agent | Specialist Agent | Description |
|---|---|---|---|
| Cascading + Cascading (LLM-only) | STT + LLM + TTS + VAD | LLM only | Customer uses full voice pipeline, specialist processes text in background |
| Realtime + Cascading (LLM-only) | Realtime model (e.g., Gemini) | LLM only | Customer uses realtime model for voice, specialist processes text in background |
Structure of the project
A2A
├── agents/
│ ├── customer_agent.py # CustomerServiceAgent definition
│ ├── loan_agent.py # LoanAgent definition
│
├── session_manager.py # Handles session creation, pipeline setup
└── main.py # Entry point: runs main() and starts agents
Sequence Diagram

Step 1: Create the Customer Service Agent
Interface Agent: CreatesCustomerServiceAgentas the main user-facing agent with voice capabilities and customer service instructions.Function Tool: Implements@function_tool forward_to_specialist()that uses A2A discovery to find and route queries to domain specialists.Response Relay: Includeshandle_specialist_response()method that automatically receives and relays specialist responses back to users viasession.say().
from videosdk.agents import Agent, AgentCard, A2AMessage, function_tool
import asyncio
from typing import Dict, Any
class CustomerServiceAgent(Agent):
def __init__(self):
super().__init__(
agent_id="customer_service_1",
instructions=(
"You are a helpful bank customer service agent. "
"For general banking queries (account balances, transactions, basic services), answer directly. "
"For ANY loan-related queries, questions, or follow-ups, ALWAYS use the forward_to_specialist function "
"with domain set to 'loan'. This includes initial loan questions AND all follow-up questions about loans. "
"Do NOT attempt to answer loan questions yourself - always forward them to the specialist. "
"After forwarding a loan query, stay engaged and automatically relay any response you receive from the specialist. "
"When you receive responses from specialists, immediately relay them naturally to the customer."
)
)
@function_tool
async def forward_to_specialist(self, query: str, domain: str) -> Dict[str, Any]:
"""Forward queries to domain specialist agents using A2A discovery"""
# Use A2A discovery to find specialists by domain
specialists = self.a2a.registry.find_agents_by_domain(domain)
id_of_target_agent = specialists[0] if specialists else None
if not id_of_target_agent:
return {"error": f"No specialist found for domain {domain}"}
# Send A2A message to the specialist
await self.a2a.send_message(
to_agent=id_of_target_agent,
message_type="specialist_query",
content={"query": query}
)
return {
"status": "forwarded",
"specialist": id_of_target_agent,
"message": "Let me get that information for you from our loan specialist..."
}
async def handle_specialist_response(self, message: A2AMessage) -> None:
"""Handle responses from specialist agents and relay to user via TTS directly"""
response = message.content.get("response")
if response:
# Brief pause for natural conversation flow
await asyncio.sleep(0.5)
await self.session.say(response)
async def on_enter(self):
# Register this agent with the A2A system
await self.register_a2a(AgentCard(
id="customer_service_1",
name="Customer Service Agent",
domain="customer_service",
capabilities=["query_handling", "specialist_coordination"],
description="Handles customer queries and coordinates with specialists"
))
await self.session.say("Hello! I am your customer service agent. How can I help you?")
# Set up message listener for specialist responses
self.a2a.on_message("specialist_response", self.handle_specialist_response)
async def on_exit(self):
print("Customer agent left the meeting")
Step 2: Create the Loan Specialist Agent
Specialist Agent Setup: CreatesLoanAgentclass with specialized loan expertise instructions and agent_id"specialist_1".Message Handlers: Implementshandle_specialist_query()to process incoming queries andhandle_model_response()to send responses back.Registration: Registers with A2A system using domain "loan" so it can bediscoveredby other agents needing loan expertise.
from videosdk.agents import Agent, AgentCard, A2AMessage
class LoanAgent(Agent):
def __init__(self):
super().__init__(
agent_id="specialist_1",
instructions=(
"You are a specialized loan expert at a bank. "
"Provide detailed, helpful information about loans including interest rates, terms, and requirements. "
"Give complete answers with specific details when possible. "
"You can discuss personal loans, car loans, home loans, and business loans. "
"Provide helpful guidance and next steps for loan applications. "
"Be friendly and professional in your responses. "
"Keep responses concise within 5-7 lines and easily understandable."
)
)
async def handle_specialist_query(self, message: A2AMessage):
"""Process incoming queries from customer service agent"""
query = message.content.get("query")
if query:
# Send the query to our AI model for processing
await self.session.pipeline.send_text_message(query)
async def handle_model_response(self, message: A2AMessage):
"""Send processed responses back to requesting agent"""
response = message.content.get("response")
requesting_agent = message.to_agent
if response and requesting_agent:
# Send the specialist response back to the customer service agent
await self.a2a.send_message(
to_agent=requesting_agent,
message_type="specialist_response",
content={"response": response}
)
async def on_enter(self):
await self.register_a2a(AgentCard(
id="specialist_1",
name="Loan Specialist Agent",
domain="loan",
capabilities=["loan_consultation", "loan_information", "interest_rates"],
description="Handles loan queries"
))
self.a2a.on_message("specialist_query", self.handle_specialist_query)
self.a2a.on_message("model_response", self.handle_model_response)
async def on_exit(self):
print("LoanAgent Left")
Step 3: Configure Session Management
Unified Pipeline: Uses a singlePipelineclass for both agents. The Pipeline auto-detects the mode based on the components you provide.Session Factory: Providescreate_pipeline()andcreate_session()functions to configure agent sessions based on their roles.Modality Separation: Ensures customer agent can handle voice while specialist processes text in background.
Option A: Cascading Customer + Cascading Specialist (Recommended)
The customer agent uses a full cascade (STT + LLM + TTS + VAD) for voice interaction, and the specialist uses an LLM-only pipeline for text processing. The specialist's response goes directly to the customer's TTS via session.say() — only 1 LLM call for the specialist query (no duplicate processing).
from videosdk.agents import AgentSession, Pipeline
from videosdk.plugins.openai import OpenAILLM
from videosdk.plugins.deepgram import DeepgramSTT
from videosdk.plugins.google import GoogleLLM, GoogleTTS
from videosdk.plugins.silero import SileroVAD
from videosdk.plugins.turn_detector import TurnDetector, pre_download_model
import os
pre_download_model()
def create_pipeline(agent_type: str) -> Pipeline:
if agent_type == "customer":
# Customer agent: Full cascade for voice interaction
return Pipeline(
stt=DeepgramSTT(),
llm=GoogleLLM(api_key=os.getenv("GOOGLE_API_KEY")),
tts=GoogleTTS(api_key=os.getenv("GOOGLE_API_KEY")),
vad=SileroVAD(),
turn_detector=TurnDetector(),
)
else:
# Specialist agent: LLM-only pipeline for background text processing
return Pipeline(
llm=OpenAILLM(api_key=os.getenv("OPENAI_API_KEY")),
)
def create_session(agent, pipeline) -> AgentSession:
return AgentSession(
agent=agent,
pipeline=pipeline,
)
Option B: Realtime Customer + Cascading Specialist
The customer agent uses a realtime model (Gemini) for low-latency voice interaction, and the specialist uses an LLM-only cascade.
from videosdk.agents import AgentSession, Pipeline
from videosdk.plugins.openai import OpenAILLM
from videosdk.plugins.google import GeminiRealtime, GeminiLiveConfig
import os
def create_pipeline(agent_type: str) -> Pipeline:
if agent_type == "customer":
# Customer agent: Realtime model for voice interaction
return Pipeline(
llm=GeminiRealtime(
model="gemini-3.1-flash-live-preview",
config=GeminiLiveConfig(
voice="Leda",
response_modalities=["AUDIO"]
)
)
)
else:
# Specialist agent: LLM-only pipeline for background text processing
return Pipeline(
llm=OpenAILLM(api_key=os.getenv("OPENAI_API_KEY")),
)
def create_session(agent, pipeline) -> AgentSession:
return AgentSession(
agent=agent,
pipeline=pipeline,
)
While setting up pipelines, make sure:
- The customer agent has voice capabilities (via cascading STT+LLM+TTS or a realtime model in the
Pipeline). - The specialist agent (Loan Agent) operates in text-only mode (via a standard LLM like
OpenAILLMin thePipeline).
Pipeline Support: The VideoSDK AI Agents framework uses a unified Pipeline class that automatically detects whether you're using a realtime model or cascading components. You can pass a realtime model (like GeminiRealtime or OpenAIRealtime) or cascading components (STT + LLM + TTS + VAD) to the same Pipeline class. This enables flexible configurations for voice and text processing with A2A.
Step 4: Deploy A2A System on VideoSDK Platform
Meeting Setup: Customer agent joins VideoSDK meeting for user interaction while specialist runs in background mode. Requires environment variables:VIDEOSDK_AUTH_TOKEN,GOOGLE_API_KEY, andOPENAI_API_KEY.System Orchestration: UsesJobContextandWorkerJobto manage the meeting lifecycle and agent coordination.Resource Management: Handles startup sequence, keeps system running, and provides clean shutdown with proper A2A unregistration
import asyncio
from contextlib import suppress
from agents.customer_agent import CustomerServiceAgent
from agents.loan_agent import LoanAgent
from session_manager import create_pipeline, create_session
from videosdk.agents import JobContext, RoomOptions, WorkerJob
async def main(ctx: JobContext):
specialist_agent = LoanAgent()
specialist_pipeline = create_pipeline("specialist")
specialist_session = create_session(specialist_agent, specialist_pipeline)
customer_agent = CustomerServiceAgent()
customer_pipeline = create_pipeline("customer")
customer_session = create_session(customer_agent, customer_pipeline)
specialist_task = asyncio.create_task(specialist_session.start())
try:
await ctx.connect()
await customer_session.start()
await asyncio.Event().wait()
except (KeyboardInterrupt, asyncio.CancelledError):
print("Shutting down...")
finally:
specialist_task.cancel()
with suppress(asyncio.CancelledError):
await specialist_task
await specialist_session.close()
await customer_session.close()
await specialist_agent.unregister_a2a()
await customer_agent.unregister_a2a()
await ctx.shutdown()
def customer_agent_context() -> JobContext:
room_options = RoomOptions(room_id="<meeting_id>", name="Customer Service Agent", playground=True)
return JobContext(
room_options=room_options
)
if __name__ == "__main__":
job = WorkerJob(entrypoint=main, jobctx=customer_agent_context)
job.start()
Ensure that the JobContext is created only for the primary (main) agent, i.e., the agent responsible for user-facing interaction (e.g., Customer Agent).
The background agent (e.g., Loan Agent) should not have its own context or initiate a separate connection.
Running the Application
Set the required environment variables:
export VIDEOSDK_AUTH_TOKEN="your_videosdk_token"
export GOOGLE_API_KEY="your_google_api_key"
export OPENAI_API_KEY="your_openai_api_key"
Replace <meeting_id> in the code with your actual meeting ID, then run:
cd A2A
python main.py
Get the complete working example at A2A Quick Start Repository with all the code ready to run.
Got a Question? Ask us on discord

