DRAFT Agentic Design Patterns - Inter-agent Communication (A2A)
Learn how to build production-ready multi-agent systems with Agent-to-Agent communication using TypeScript, LangGraph, and Vercel's serverless platform.
Mental Model: The Orchestra Conductor Pattern
Think of A2A communication like a distributed orchestra where each agent is a specialized musician. The conductor (supervisor agent) doesn't play every instrument but coordinates the performance. Musicians (specialized agents) can pass sheet music (context/tasks) directly to each other or through the conductor. Some pieces require all musicians to play together (synchronous), while others allow soloists to perform independently and join when ready (asynchronous). The venue (Vercel's platform) provides the acoustics and infrastructure, while the score (LangGraph) defines how the music flows. Just as musicians use standard notation to communicate regardless of their instrument, A2A protocols enable agents built with different frameworks to collaborate seamlessly.
Basic Example: Simple Agent Handoff Pattern
1. Project Setup with Core Dependencies
# Initialize NextJS 15 project with TypeScript
npx create-next-app@latest a2a-agents --typescript --tailwind --app --no-src-dir
cd a2a-agents
# Install LangGraph and agent communication packages
npm install @langchain/langgraph @langchain/core @langchain/community
npm install @langchain/google-genai zod uuid
npm install @tanstack/react-query es-toolkit daisyui
npm install @vercel/kv bullmq ioredis
Initializes a Next.js 15 project optimized for multi-agent development with all necessary dependencies for agent orchestration and communication.
2. Define Agent Communication Protocol
// lib/protocols/a2a-protocol.ts
import { z } from 'zod';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
import { pipe, map, filter } from 'es-toolkit';
// Define the A2A message schema
export const A2AMessageSchema = z.object({
id: z.string().uuid(),
from: z.string(),
to: z.string(),
timestamp: z.string().datetime(),
protocol: z.literal('a2a/v1'),
task: z.object({
id: z.string().uuid(),
type: z.enum(['request', 'response', 'handoff', 'error']),
skill: z.string().optional(),
context: z.record(z.any()),
payload: z.any(),
metadata: z.object({
priority: z.enum(['low', 'medium', 'high']).default('medium'),
timeout: z.number().default(30000),
retries: z.number().default(3),
}).optional(),
}),
});
export type A2AMessage = z.infer<typeof A2AMessageSchema>;
// Agent Card definition (agent capabilities)
export const AgentCardSchema = z.object({
name: z.string(),
description: z.string(),
url: z.string().url(),
version: z.string(),
capabilities: z.object({
streaming: z.boolean(),
async: z.boolean(),
maxConcurrent: z.number(),
}),
skills: z.array(z.object({
id: z.string(),
name: z.string(),
description: z.string(),
inputSchema: z.any(),
outputSchema: z.any(),
})),
authentication: z.object({
type: z.enum(['none', 'apiKey', 'oauth2']),
config: z.record(z.any()).optional(),
}),
});
export type AgentCard = z.infer<typeof AgentCardSchema>;
// Message factory using es-toolkit
export class A2AMessageFactory {
static createHandoffMessage(
from: string,
to: string,
task: any,
context: Record<string, any>
): A2AMessage {
return pipe(
{
id: crypto.randomUUID(),
from,
to,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1' as const,
task: {
id: crypto.randomUUID(),
type: 'handoff' as const,
context,
payload: task,
},
},
(msg) => A2AMessageSchema.parse(msg)
);
}
}
Establishes a type-safe A2A protocol with message schemas, agent cards for capability discovery, and factory methods for creating standardized messages.
3. Create Base Agent with Communication Interface
// lib/agents/base-agent.ts
import { StateGraph, StateGraphArgs } from '@langchain/langgraph';
import { BaseMessage } from '@langchain/core/messages';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { A2AMessage, AgentCard } from '@/lib/protocols/a2a-protocol';
import { debounce, throttle } from 'es-toolkit';
export interface AgentState {
messages: BaseMessage[];
currentAgent: string;
context: Record<string, any>;
pendingHandoffs: A2AMessage[];
}
export abstract class BaseA2Agent {
protected name: string;
protected model: ChatGoogleGenerativeAI;
protected graph: StateGraph<AgentState>;
protected card: AgentCard;
constructor(name: string, card: AgentCard) {
this.name = name;
this.card = card;
// Initialize with Gemini Flash for fast responses
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.7,
streaming: true,
maxOutputTokens: 2048,
});
// Setup state graph for agent workflow
this.graph = new StateGraph<AgentState>({
channels: {
messages: {
value: (old: BaseMessage[], next: BaseMessage[]) => [...old, ...next],
},
currentAgent: {
value: (old: string, next: string) => next,
},
context: {
value: (old: Record<string, any>, next: Record<string, any>) => ({
...old,
...next,
}),
},
pendingHandoffs: {
value: (old: A2AMessage[], next: A2AMessage[]) => [...old, ...next],
},
},
});
this.setupGraph();
}
// Abstract method for subclasses to implement
protected abstract setupGraph(): void;
// Process incoming A2A messages
async processMessage(message: A2AMessage): Promise<A2AMessage> {
const startTime = Date.now();
try {
// Validate message is for this agent
if (message.to !== this.name) {
throw new Error(`Message not for this agent: ${message.to}`);
}
// Process based on task type
const result = await this.handleTask(message.task);
// Create response message
return {
id: crypto.randomUUID(),
from: this.name,
to: message.from,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1',
task: {
id: message.task.id,
type: 'response',
context: {
...message.task.context,
processingTime: Date.now() - startTime,
},
payload: result,
},
};
} catch (error) {
return this.createErrorResponse(message, error);
}
}
// Handle different task types
protected async handleTask(task: A2AMessage['task']): Promise<any> {
switch (task.type) {
case 'request':
return await this.processRequest(task);
case 'handoff':
return await this.acceptHandoff(task);
default:
throw new Error(`Unknown task type: ${task.type}`);
}
}
protected abstract processRequest(task: A2AMessage['task']): Promise<any>;
protected abstract acceptHandoff(task: A2AMessage['task']): Promise<any>;
// Throttled error response creation
protected createErrorResponse = throttle(
(message: A2AMessage, error: any): A2AMessage => {
return {
id: crypto.randomUUID(),
from: this.name,
to: message.from,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1',
task: {
id: message.task.id,
type: 'error',
context: message.task.context,
payload: {
error: error.message || 'Unknown error',
stack: process.env.NODE_ENV === 'development' ? error.stack : undefined,
},
},
};
},
1000 // Throttle error responses to prevent spam
);
}
Provides an abstract base class for agents with built-in A2A message processing, state management via LangGraph, and error handling with es-toolkit utilities.
4. Implement Specialized Agents
// lib/agents/research-agent.ts
import { BaseA2Agent } from './base-agent';
import { WebBrowser } from '@langchain/community/tools/webbrowser';
import { GoogleGenerativeAIEmbeddings } from '@langchain/google-genai';
import { HumanMessage, AIMessage } from '@langchain/core/messages';
import { groupBy, chunk } from 'es-toolkit';
export class ResearchAgent extends BaseA2Agent {
private browser: WebBrowser;
private embeddings: GoogleGenerativeAIEmbeddings;
constructor() {
const card = {
name: 'research-agent',
description: 'Specialized in web research and information gathering',
url: process.env.VERCEL_URL ?
`https://${process.env.VERCEL_URL}/api/agents/research` :
'http://localhost:3000/api/agents/research',
version: '1.0.0',
capabilities: {
streaming: true,
async: true,
maxConcurrent: 5,
},
skills: [
{
id: 'web-search',
name: 'Web Search',
description: 'Search and extract information from the web',
inputSchema: { query: 'string' },
outputSchema: { results: 'array' },
},
{
id: 'summarize',
name: 'Summarize',
description: 'Create concise summaries of research findings',
inputSchema: { content: 'string' },
outputSchema: { summary: 'string' },
},
],
authentication: {
type: 'apiKey',
config: { header: 'X-API-Key' },
},
};
super('research-agent', card);
this.embeddings = new GoogleGenerativeAIEmbeddings({
modelName: 'embedding-001',
});
this.browser = new WebBrowser({
model: this.model,
embeddings: this.embeddings
});
}
protected setupGraph(): void {
// Define the research workflow
this.graph
.addNode('analyze', this.analyzeRequest.bind(this))
.addNode('search', this.performSearch.bind(this))
.addNode('synthesize', this.synthesizeResults.bind(this))
.addNode('decide_handoff', this.decideHandoff.bind(this))
.addEdge('__start__', 'analyze')
.addEdge('analyze', 'search')
.addEdge('search', 'synthesize')
.addEdge('synthesize', 'decide_handoff');
}
private async analyzeRequest(state: any) {
const lastMessage = state.messages[state.messages.length - 1];
const analysis = await this.model.invoke([
new HumanMessage(`Analyze this request and identify key search terms: ${lastMessage.content}`)
]);
return {
context: {
...state.context,
searchTerms: this.extractSearchTerms(analysis.content as string),
},
};
}
private async performSearch(state: any) {
const { searchTerms } = state.context;
// Batch search queries for efficiency
const searchBatches = chunk(searchTerms, 3);
const results = [];
for (const batch of searchBatches) {
const batchResults = await Promise.all(
batch.map(term => this.browser.invoke(term))
);
results.push(...batchResults);
}
return {
context: {
...state.context,
searchResults: results,
},
};
}
private async synthesizeResults(state: any) {
const { searchResults } = state.context;
// Group results by relevance
const grouped = groupBy(searchResults, (result: any) =>
result.relevance > 0.8 ? 'high' : result.relevance > 0.5 ? 'medium' : 'low'
);
const synthesis = await this.model.invoke([
new HumanMessage(`Synthesize these search results into a comprehensive answer:
${JSON.stringify(grouped.high || [])}`)
]);
return {
messages: [new AIMessage(synthesis.content as string)],
context: {
...state.context,
synthesis: synthesis.content,
confidence: grouped.high ? 'high' : 'medium',
},
};
}
private async decideHandoff(state: any) {
const { confidence, synthesis } = state.context;
// Decide if we need to handoff to another agent
if (confidence === 'low' || synthesis.includes('need more analysis')) {
return {
pendingHandoffs: [{
id: crypto.randomUUID(),
from: this.name,
to: 'analyst-agent',
timestamp: new Date().toISOString(),
protocol: 'a2a/v1' as const,
task: {
id: crypto.randomUUID(),
type: 'handoff' as const,
context: state.context,
payload: {
request: 'Deep analysis required',
preliminaryFindings: synthesis,
},
},
}],
};
}
return { pendingHandoffs: [] };
}
protected async processRequest(task: any): Promise<any> {
const result = await this.graph.invoke({
messages: [new HumanMessage(task.payload.query)],
currentAgent: this.name,
context: task.context || {},
pendingHandoffs: [],
});
return {
results: result.messages[result.messages.length - 1].content,
handoffs: result.pendingHandoffs,
};
}
protected async acceptHandoff(task: any): Promise<any> {
// Process handoff from another agent
return this.processRequest(task);
}
private extractSearchTerms(analysis: string): string[] {
// Simple extraction logic - in production use NLP
return analysis.match(/["'](.*?)["']/g)?.map(term =>
term.replace(/["']/g, '')
) || [];
}
}
Implements a research agent with web browsing capabilities, search batching for efficiency, and intelligent handoff decisions based on confidence levels.
5. Create Supervisor Agent for Orchestration
// lib/agents/supervisor-agent.ts
import { BaseA2Agent } from './base-agent';
import { A2AMessage, A2AMessageFactory } from '@/lib/protocols/a2a-protocol';
import { HumanMessage, AIMessage } from '@langchain/core/messages';
import { sortBy, uniqBy } from 'es-toolkit';
import { kv } from '@vercel/kv';
interface AgentRegistry {
[key: string]: {
card: any;
endpoint: string;
status: 'active' | 'inactive' | 'busy';
lastSeen: number;
};
}
export class SupervisorAgent extends BaseA2Agent {
private agentRegistry: AgentRegistry = {};
private taskQueue: A2AMessage[] = [];
constructor() {
const card = {
name: 'supervisor-agent',
description: 'Orchestrates and coordinates multiple specialized agents',
url: process.env.VERCEL_URL ?
`https://${process.env.VERCEL_URL}/api/agents/supervisor` :
'http://localhost:3000/api/agents/supervisor',
version: '1.0.0',
capabilities: {
streaming: true,
async: true,
maxConcurrent: 10,
},
skills: [
{
id: 'orchestrate',
name: 'Orchestrate',
description: 'Coordinate multiple agents to complete complex tasks',
inputSchema: { task: 'string', agents: 'array' },
outputSchema: { result: 'any', trace: 'array' },
},
{
id: 'delegate',
name: 'Delegate',
description: 'Assign tasks to appropriate specialized agents',
inputSchema: { task: 'string' },
outputSchema: { assignedTo: 'string', taskId: 'string' },
},
],
authentication: {
type: 'apiKey',
config: { header: 'X-Supervisor-Key' },
},
};
super('supervisor-agent', card);
this.initializeRegistry();
}
private async initializeRegistry() {
// Load agent registry from Vercel KV
try {
const registry = await kv.get<AgentRegistry>('agent-registry');
if (registry) {
this.agentRegistry = registry;
}
} catch (error) {
console.log('No existing registry, starting fresh');
}
// Register default agents
this.registerAgent('research-agent', {
card: { /* Research agent card */ },
endpoint: '/api/agents/research',
status: 'active',
lastSeen: Date.now(),
});
this.registerAgent('analyst-agent', {
card: { /* Analyst agent card */ },
endpoint: '/api/agents/analyst',
status: 'active',
lastSeen: Date.now(),
});
// Persist registry
await kv.set('agent-registry', this.agentRegistry);
}
protected setupGraph(): void {
this.graph
.addNode('analyze_task', this.analyzeTask.bind(this))
.addNode('select_agents', this.selectAgents.bind(this))
.addNode('delegate_tasks', this.delegateTasks.bind(this))
.addNode('monitor_progress', this.monitorProgress.bind(this))
.addNode('aggregate_results', this.aggregateResults.bind(this))
.addEdge('__start__', 'analyze_task')
.addEdge('analyze_task', 'select_agents')
.addEdge('select_agents', 'delegate_tasks')
.addEdge('delegate_tasks', 'monitor_progress')
.addEdge('monitor_progress', 'aggregate_results');
}
private async analyzeTask(state: any) {
const lastMessage = state.messages[state.messages.length - 1];
// Use LLM to understand task requirements
const analysis = await this.model.invoke([
new HumanMessage(`
Analyze this task and determine:
1. What type of expertise is needed
2. Whether it needs sequential or parallel processing
3. Estimated complexity (simple/moderate/complex)
Task: ${lastMessage.content}
`)
]);
return {
context: {
...state.context,
taskAnalysis: this.parseTaskAnalysis(analysis.content as string),
originalTask: lastMessage.content,
},
};
}
private async selectAgents(state: any) {
const { taskAnalysis } = state.context;
// Get active agents sorted by relevance
const activeAgents = Object.entries(this.agentRegistry)
.filter(([_, agent]) => agent.status === 'active')
.map(([name, agent]) => ({ name, ...agent }));
// Score agents based on task requirements
const scoredAgents = activeAgents.map(agent => ({
...agent,
score: this.calculateAgentScore(agent, taskAnalysis),
}));
// Select top agents for the task
const selectedAgents = sortBy(scoredAgents, 'score')
.reverse()
.slice(0, taskAnalysis.complexity === 'complex' ? 3 : 2);
return {
context: {
...state.context,
selectedAgents: selectedAgents.map(a => a.name),
},
};
}
private async delegateTasks(state: any) {
const { selectedAgents, taskAnalysis, originalTask } = state.context;
const delegatedTasks: A2AMessage[] = [];
for (const agentName of selectedAgents) {
const message = A2AMessageFactory.createHandoffMessage(
this.name,
agentName,
{
task: originalTask,
requirements: taskAnalysis,
deadline: Date.now() + 30000, // 30 second deadline
},
state.context
);
delegatedTasks.push(message);
this.taskQueue.push(message);
}
// Store task delegation in KV for tracking
await kv.set(`tasks:${state.context.sessionId}`, delegatedTasks, {
ex: 3600, // Expire after 1 hour
});
return {
context: {
...state.context,
delegatedTasks: delegatedTasks.map(t => t.id),
},
};
}
private async monitorProgress(state: any) {
const { delegatedTasks } = state.context;
const responses: any[] = [];
const timeout = 30000; // 30 seconds
const startTime = Date.now();
// Poll for responses with timeout
while (responses.length < delegatedTasks.length) {
if (Date.now() - startTime > timeout) {
console.log('Timeout waiting for agent responses');
break;
}
// Check for responses in KV
for (const taskId of delegatedTasks) {
const response = await kv.get(`response:${taskId}`);
if (response && !responses.find(r => r.taskId === taskId)) {
responses.push(response);
}
}
// Wait before next poll
await new Promise(resolve => setTimeout(resolve, 1000));
}
return {
context: {
...state.context,
agentResponses: responses,
},
};
}
private async aggregateResults(state: any) {
const { agentResponses } = state.context;
// Remove duplicate information
const uniqueResponses = uniqBy(agentResponses, (r: any) =>
JSON.stringify(r.payload)
);
// Use LLM to synthesize all responses
const synthesis = await this.model.invoke([
new HumanMessage(`
Synthesize these agent responses into a comprehensive answer:
${JSON.stringify(uniqueResponses)}
Create a unified response that combines all insights.
`)
]);
return {
messages: [new AIMessage(synthesis.content as string)],
context: {
...state.context,
finalResult: synthesis.content,
contributingAgents: uniqueResponses.map((r: any) => r.from),
},
};
}
private registerAgent(name: string, info: any) {
this.agentRegistry[name] = info;
}
private parseTaskAnalysis(analysis: string): any {
// Simple parsing - in production use structured output
return {
expertise: analysis.includes('research') ? ['research'] : ['general'],
processing: analysis.includes('parallel') ? 'parallel' : 'sequential',
complexity: analysis.includes('complex') ? 'complex' : 'simple',
};
}
private calculateAgentScore(agent: any, taskAnalysis: any): number {
// Simple scoring algorithm
let score = 0;
// Check if agent has required skills
if (agent.card?.skills) {
score += agent.card.skills.length * 10;
}
// Prefer recently active agents
const hoursSinceActive = (Date.now() - agent.lastSeen) / 3600000;
score -= hoursSinceActive * 5;
// Boost score if agent matches expertise
if (taskAnalysis.expertise?.some((exp: string) =>
agent.name.includes(exp)
)) {
score += 50;
}
return Math.max(0, score);
}
protected async processRequest(task: any): Promise<any> {
const result = await this.graph.invoke({
messages: [new HumanMessage(task.payload.query)],
currentAgent: this.name,
context: {
...task.context,
sessionId: task.id,
},
pendingHandoffs: [],
});
return {
result: result.context.finalResult,
trace: result.context.contributingAgents,
metrics: {
totalAgents: result.context.selectedAgents.length,
processingTime: Date.now() - result.context.startTime,
},
};
}
protected async acceptHandoff(task: any): Promise<any> {
// Supervisor doesn't typically accept handoffs
return { error: 'Supervisor does not accept handoffs' };
}
}
Implements a supervisor agent that analyzes tasks, selects appropriate agents, delegates work, monitors progress, and aggregates results using Vercel KV for state persistence.
6. API Routes for Agent Endpoints
// app/api/agents/supervisor/route.ts
import { SupervisorAgent } from '@/lib/agents/supervisor-agent';
import { A2AMessageSchema } from '@/lib/protocols/a2a-protocol';
import { NextResponse } from 'next/server';
import { kv } from '@vercel/kv';
export const runtime = 'nodejs';
export const maxDuration = 300;
const supervisor = new SupervisorAgent();
export async function POST(req: Request) {
try {
// Parse and validate incoming A2A message
const rawMessage = await req.json();
const message = A2AMessageSchema.parse(rawMessage);
// Check authentication
const apiKey = req.headers.get('X-Supervisor-Key');
if (!apiKey || apiKey !== process.env.SUPERVISOR_API_KEY) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
}
// Process message with supervisor
const response = await supervisor.processMessage(message);
// Store response for async retrieval
await kv.set(`response:${message.task.id}`, response, {
ex: 3600, // Expire after 1 hour
});
return NextResponse.json(response);
} catch (error: any) {
console.error('Supervisor error:', error);
return NextResponse.json(
{ error: error.message || 'Internal server error' },
{ status: 500 }
);
}
}
// Agent discovery endpoint
export async function GET(req: Request) {
const agent = new SupervisorAgent();
return NextResponse.json(agent.card);
}
Creates RESTful API endpoints for the supervisor agent with authentication, message validation, and agent discovery support.
7. Client Component with React Query
// components/MultiAgentChat.tsx
'use client';
import { useState } from 'react';
import { useMutation, useQuery } from '@tanstack/react-query';
import { debounce } from 'es-toolkit';
interface AgentResponse {
result: string;
trace: string[];
metrics: {
totalAgents: number;
processingTime: number;
};
}
async function sendToSupervisor(query: string): Promise<AgentResponse> {
const message = {
id: crypto.randomUUID(),
from: 'user',
to: 'supervisor-agent',
timestamp: new Date().toISOString(),
protocol: 'a2a/v1',
task: {
id: crypto.randomUUID(),
type: 'request',
context: {},
payload: { query },
},
};
const response = await fetch('/api/agents/supervisor', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Supervisor-Key': process.env.NEXT_PUBLIC_SUPERVISOR_KEY || '',
},
body: JSON.stringify(message),
});
if (!response.ok) {
throw new Error('Failed to process request');
}
const data = await response.json();
return data.task.payload;
}
export default function MultiAgentChat() {
const [input, setInput] = useState('');
const [messages, setMessages] = useState<Array<{
role: 'user' | 'assistant';
content: string;
metadata?: any;
}>>([]);
const mutation = useMutation({
mutationFn: sendToSupervisor,
onSuccess: (data) => {
setMessages(prev => [...prev, {
role: 'assistant',
content: data.result,
metadata: {
agents: data.trace,
processingTime: data.metrics.processingTime,
},
}]);
},
});
const handleSubmit = debounce(async (e: React.FormEvent) => {
e.preventDefault();
if (!input.trim()) return;
const userMessage = input;
setInput('');
setMessages(prev => [...prev, {
role: 'user',
content: userMessage,
}]);
mutation.mutate(userMessage);
}, 500);
return (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">Multi-Agent System</h2>
{/* Chat messages */}
<div className="h-96 overflow-y-auto space-y-4 p-4 bg-base-200 rounded-lg">
{messages.map((msg, idx) => (
<div key={idx} className={`chat ${msg.role === 'user' ? 'chat-end' : 'chat-start'}`}>
<div className="chat-bubble">
{msg.content}
{msg.metadata && (
<div className="text-xs mt-2 opacity-70">
Processed by: {msg.metadata.agents.join(', ')}
({msg.metadata.processingTime}ms)
</div>
)}
</div>
</div>
))}
{mutation.isPending && (
<div className="chat chat-start">
<div className="chat-bubble">
<span className="loading loading-dots loading-sm"></span>
</div>
</div>
)}
</div>
{/* Input form */}
<form onSubmit={handleSubmit} className="join w-full">
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="Ask the agents..."
className="input input-bordered join-item flex-1"
disabled={mutation.isPending}
/>
<button
type="submit"
className="btn btn-primary join-item"
disabled={mutation.isPending || !input.trim()}
>
Send
</button>
</form>
</div>
</div>
);
}
Creates a React component for interacting with the multi-agent system, displaying agent traces and processing metrics with real-time updates.
Advanced Example: Event-Driven Agent Swarm
1. Setup Message Queue Infrastructure
// lib/queue/agent-queue.ts
import { Queue, Worker, Job } from 'bullmq';
import Redis from 'ioredis';
import { A2AMessage, A2AMessageSchema } from '@/lib/protocols/a2a-protocol';
import { pipe, groupBy, partition } from 'es-toolkit';
// Create Redis connection for BullMQ
const connection = new Redis(process.env.REDIS_URL || 'redis://localhost:6379', {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
// Define queue names for different agent types
export const QUEUE_NAMES = {
RESEARCH: 'research-queue',
ANALYSIS: 'analysis-queue',
SYNTHESIS: 'synthesis-queue',
SUPERVISOR: 'supervisor-queue',
PRIORITY: 'priority-queue',
} as const;
// Create typed queue wrapper
export class AgentQueue {
private queues: Map<string, Queue<A2AMessage>> = new Map();
private workers: Map<string, Worker<A2AMessage>> = new Map();
constructor() {
this.initializeQueues();
}
private initializeQueues() {
// Create a queue for each agent type
Object.values(QUEUE_NAMES).forEach(queueName => {
this.queues.set(queueName, new Queue<A2AMessage>(queueName, {
connection,
defaultJobOptions: {
removeOnComplete: { count: 100 },
removeOnFail: { count: 500 },
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
},
}));
});
}
// Add message to appropriate queue based on routing rules
async routeMessage(message: A2AMessage): Promise<Job<A2AMessage>> {
// Validate message
const validated = A2AMessageSchema.parse(message);
// Determine queue based on destination agent
const queueName = this.getQueueForAgent(validated.to);
const queue = this.queues.get(queueName);
if (!queue) {
throw new Error(`No queue found for agent: ${validated.to}`);
}
// Add to queue with priority
const priority = this.calculatePriority(validated);
return await queue.add(
`${validated.to}:${validated.task.type}`,
validated,
{
priority,
delay: validated.task.metadata?.delay || 0,
}
);
}
// Batch route multiple messages efficiently
async batchRoute(messages: A2AMessage[]): Promise<Job<A2AMessage>[]> {
// Group messages by destination queue
const grouped = groupBy(messages, msg => this.getQueueForAgent(msg.to));
const jobs: Job<A2AMessage>[] = [];
for (const [queueName, msgs] of Object.entries(grouped)) {
const queue = this.queues.get(queueName);
if (!queue) continue;
// Bulk add to queue
const bulkJobs = await queue.addBulk(
msgs.map(msg => ({
name: `${msg.to}:${msg.task.type}`,
data: msg,
opts: {
priority: this.calculatePriority(msg),
},
}))
);
jobs.push(...bulkJobs);
}
return jobs;
}
// Create worker for processing queue
createWorker(
queueName: string,
processor: (job: Job<A2AMessage>) => Promise<any>
): Worker<A2AMessage> {
const worker = new Worker<A2AMessage>(
queueName,
async (job) => {
console.log(`Processing job ${job.id} in ${queueName}`);
return await processor(job);
},
{
connection,
concurrency: 5,
limiter: {
max: 10,
duration: 1000,
},
}
);
// Add event listeners
worker.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
worker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed:`, err);
});
this.workers.set(queueName, worker);
return worker;
}
private getQueueForAgent(agentName: string): string {
// Route to appropriate queue based on agent type
if (agentName.includes('research')) return QUEUE_NAMES.RESEARCH;
if (agentName.includes('analysis')) return QUEUE_NAMES.ANALYSIS;
if (agentName.includes('synthesis')) return QUEUE_NAMES.SYNTHESIS;
if (agentName === 'supervisor-agent') return QUEUE_NAMES.SUPERVISOR;
return QUEUE_NAMES.PRIORITY; // Default high-priority queue
}
private calculatePriority(message: A2AMessage): number {
// Lower number = higher priority
const basePriority = message.task.metadata?.priority === 'high' ? 1 :
message.task.metadata?.priority === 'low' ? 10 : 5;
// Adjust based on task type
if (message.task.type === 'error') return 0; // Highest priority
if (message.task.type === 'handoff') return basePriority - 1;
return basePriority;
}
// Get queue metrics
async getMetrics() {
const metrics: Record<string, any> = {};
for (const [name, queue] of this.queues.entries()) {
const counts = await queue.getJobCounts();
metrics[name] = {
waiting: counts.waiting,
active: counts.active,
completed: counts.completed,
failed: counts.failed,
delayed: counts.delayed,
};
}
return metrics;
}
// Graceful shutdown
async close() {
// Close all workers
await Promise.all(
Array.from(this.workers.values()).map(worker => worker.close())
);
// Close all queues
await Promise.all(
Array.from(this.queues.values()).map(queue => queue.close())
);
await connection.quit();
}
}
Implements a robust message queue system using BullMQ for asynchronous agent communication with priority routing, batch processing, and automatic retries.
2. Event-Driven Agent Base
// lib/agents/event-agent.ts
import { EventEmitter } from 'events';
import { BaseA2Agent } from './base-agent';
import { A2AMessage } from '@/lib/protocols/a2a-protocol';
import { AgentQueue } from '@/lib/queue/agent-queue';
import { throttle, retry } from 'es-toolkit';
export interface AgentEvent {
type: 'task_received' | 'task_completed' | 'task_failed' | 'handoff_initiated';
agentId: string;
taskId: string;
timestamp: number;
data?: any;
}
export abstract class EventDrivenAgent extends BaseA2Agent {
protected events: EventEmitter;
protected queue: AgentQueue;
private subscriptions: Map<string, Function> = new Map();
constructor(name: string, card: any) {
super(name, card);
this.events = new EventEmitter();
this.queue = new AgentQueue();
this.setupEventHandlers();
this.startQueueWorker();
}
private setupEventHandlers() {
// Emit events for monitoring
this.events.on('task_received', throttle((event: AgentEvent) => {
console.log(`[${this.name}] Task received:`, event.taskId);
this.broadcastEvent(event);
}, 100));
this.events.on('task_completed', (event: AgentEvent) => {
console.log(`[${this.name}] Task completed:`, event.taskId);
this.broadcastEvent(event);
});
this.events.on('task_failed', (event: AgentEvent) => {
console.error(`[${this.name}] Task failed:`, event.taskId, event.data);
this.broadcastEvent(event);
});
this.events.on('handoff_initiated', (event: AgentEvent) => {
console.log(`[${this.name}] Handoff initiated to:`, event.data?.targetAgent);
this.broadcastEvent(event);
});
}
private startQueueWorker() {
// Create worker for this agent's queue
const queueName = `${this.name}-queue`;
this.queue.createWorker(queueName, async (job) => {
const message = job.data;
// Emit task received event
this.events.emit('task_received', {
type: 'task_received',
agentId: this.name,
taskId: message.task.id,
timestamp: Date.now(),
});
try {
// Process with retry logic
const result = await retry(
() => this.processMessage(message),
{ retries: 3, delay: 1000 }
);
// Emit completion event
this.events.emit('task_completed', {
type: 'task_completed',
agentId: this.name,
taskId: message.task.id,
timestamp: Date.now(),
data: result,
});
return result;
} catch (error) {
// Emit failure event
this.events.emit('task_failed', {
type: 'task_failed',
agentId: this.name,
taskId: message.task.id,
timestamp: Date.now(),
data: { error: error.message },
});
throw error;
}
});
}
// Subscribe to events from other agents
subscribe(agentId: string, eventType: string, handler: Function) {
const key = `${agentId}:${eventType}`;
this.subscriptions.set(key, handler);
// Set up SSE or WebSocket connection to receive events
this.setupEventStream(agentId, eventType);
}
private async setupEventStream(agentId: string, eventType: string) {
// In production, use WebSocket or SSE for real-time events
// For serverless, poll from KV or database
const pollInterval = setInterval(async () => {
try {
const event = await this.fetchLatestEvent(agentId, eventType);
if (event) {
const handler = this.subscriptions.get(`${agentId}:${eventType}`);
if (handler) {
handler(event);
}
}
} catch (error) {
console.error('Error polling events:', error);
}
}, 1000);
// Store interval for cleanup
this.subscriptions.set(`${agentId}:${eventType}:interval`, pollInterval);
}
private async fetchLatestEvent(agentId: string, eventType: string): Promise<AgentEvent | null> {
// Fetch from KV or database
// This is a simplified example
return null;
}
// Broadcast events to subscribers
private async broadcastEvent(event: AgentEvent) {
// In serverless, store in KV with TTL
const key = `events:${event.agentId}:${event.type}:${event.timestamp}`;
await fetch('/api/events', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ key, event }),
});
}
// Initiate handoff to another agent
protected async handoffTo(
targetAgent: string,
task: any,
context: Record<string, any>
): Promise<void> {
const handoffMessage: A2AMessage = {
id: crypto.randomUUID(),
from: this.name,
to: targetAgent,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1',
task: {
id: crypto.randomUUID(),
type: 'handoff',
context: {
...context,
previousAgent: this.name,
handoffReason: task.reason || 'Task delegation',
},
payload: task,
},
};
// Queue the handoff message
await this.queue.routeMessage(handoffMessage);
// Emit handoff event
this.events.emit('handoff_initiated', {
type: 'handoff_initiated',
agentId: this.name,
taskId: handoffMessage.task.id,
timestamp: Date.now(),
data: { targetAgent, task },
});
}
// Cleanup on shutdown
async cleanup() {
// Clear all intervals
this.subscriptions.forEach((value, key) => {
if (key.endsWith(':interval')) {
clearInterval(value as any);
}
});
// Remove all event listeners
this.events.removeAllListeners();
// Close queue connection
await this.queue.close();
}
}
Creates an event-driven base agent with pub/sub capabilities, queue integration, automatic retry logic, and event broadcasting for monitoring.
3. Swarm Coordinator
// lib/agents/swarm-coordinator.ts
import { EventDrivenAgent } from './event-agent';
import { A2AMessage } from '@/lib/protocols/a2a-protocol';
import { HumanMessage, AIMessage } from '@langchain/core/messages';
import { partition, chunk, shuffle } from 'es-toolkit';
interface SwarmState {
activeAgents: Set<string>;
taskDistribution: Map<string, string[]>; // agent -> taskIds
consensusResults: Map<string, any[]>; // taskId -> results
swarmMetrics: {
totalTasks: number;
completedTasks: number;
averageResponseTime: number;
consensusAgreement: number;
};
}
export class SwarmCoordinator extends EventDrivenAgent {
private swarmState: SwarmState = {
activeAgents: new Set(),
taskDistribution: new Map(),
consensusResults: new Map(),
swarmMetrics: {
totalTasks: 0,
completedTasks: 0,
averageResponseTime: 0,
consensusAgreement: 0,
},
};
constructor() {
const card = {
name: 'swarm-coordinator',
description: 'Coordinates distributed agent swarms for parallel processing',
url: process.env.VERCEL_URL ?
`https://${process.env.VERCEL_URL}/api/agents/swarm` :
'http://localhost:3000/api/agents/swarm',
version: '1.0.0',
capabilities: {
streaming: true,
async: true,
maxConcurrent: 20,
},
skills: [
{
id: 'swarm_execute',
name: 'Swarm Execute',
description: 'Execute task across multiple agents in parallel',
inputSchema: { task: 'string', agents: 'number' },
outputSchema: { consensus: 'any', votes: 'array' },
},
{
id: 'adaptive_routing',
name: 'Adaptive Routing',
description: 'Dynamically route tasks based on agent performance',
inputSchema: { task: 'string' },
outputSchema: { route: 'string' },
},
],
authentication: {
type: 'apiKey',
config: { header: 'X-Swarm-Key' },
},
};
super('swarm-coordinator', card);
this.initializeSwarm();
}
private async initializeSwarm() {
// Discover available agents
const agents = await this.discoverAgents();
agents.forEach(agent => this.swarmState.activeAgents.add(agent));
// Subscribe to agent events for monitoring
for (const agent of agents) {
this.subscribe(agent, 'task_completed', (event: any) => {
this.handleAgentCompletion(agent, event);
});
this.subscribe(agent, 'task_failed', (event: any) => {
this.handleAgentFailure(agent, event);
});
}
}
protected setupGraph(): void {
this.graph
.addNode('analyze_for_swarm', this.analyzeForSwarm.bind(this))
.addNode('distribute_tasks', this.distributeTasks.bind(this))
.addNode('monitor_swarm', this.monitorSwarm.bind(this))
.addNode('achieve_consensus', this.achieveConsensus.bind(this))
.addNode('adaptive_rebalance', this.adaptiveRebalance.bind(this))
.addEdge('__start__', 'analyze_for_swarm')
.addEdge('analyze_for_swarm', 'distribute_tasks')
.addEdge('distribute_tasks', 'monitor_swarm')
.addConditionalEdges(
'monitor_swarm',
(state: any) => {
const completion = state.context.completionRate || 0;
if (completion < 0.5) return 'adaptive_rebalance';
if (completion >= 0.8) return 'achieve_consensus';
return 'monitor_swarm'; // Continue monitoring
},
{
'adaptive_rebalance': 'adaptive_rebalance',
'achieve_consensus': 'achieve_consensus',
'monitor_swarm': 'monitor_swarm',
}
)
.addEdge('adaptive_rebalance', 'monitor_swarm');
}
private async analyzeForSwarm(state: any) {
const lastMessage = state.messages[state.messages.length - 1];
// Determine optimal swarm size and strategy
const analysis = await this.model.invoke([
new HumanMessage(`
Analyze this task for swarm processing:
1. Can it be parallelized?
2. How many agents should process it?
3. Does it need consensus or first-response-wins?
Task: ${lastMessage.content}
`)
]);
const swarmConfig = this.parseSwarmConfig(analysis.content as string);
// Update metrics
this.swarmState.swarmMetrics.totalTasks++;
return {
context: {
...state.context,
swarmConfig,
originalTask: lastMessage.content,
startTime: Date.now(),
},
};
}
private async distributeTasks(state: any) {
const { swarmConfig, originalTask } = state.context;
const { agentCount, strategy } = swarmConfig;
// Select agents for the swarm
const availableAgents = Array.from(this.swarmState.activeAgents);
const selectedAgents = shuffle(availableAgents).slice(0, agentCount);
// Create task variations for diversity
const taskVariations = await this.createTaskVariations(
originalTask,
selectedAgents.length
);
// Distribute tasks to agents
const distributions: A2AMessage[] = [];
for (let i = 0; i < selectedAgents.length; i++) {
const agent = selectedAgents[i];
const variation = taskVariations[i] || originalTask;
const message: A2AMessage = {
id: crypto.randomUUID(),
from: this.name,
to: agent,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1',
task: {
id: crypto.randomUUID(),
type: 'request',
context: {
...state.context,
swarmId: state.context.sessionId,
variation: i,
},
payload: {
query: variation,
strategy,
},
},
};
distributions.push(message);
// Track distribution
const tasks = this.swarmState.taskDistribution.get(agent) || [];
tasks.push(message.task.id);
this.swarmState.taskDistribution.set(agent, tasks);
}
// Batch route all messages
await this.queue.batchRoute(distributions);
return {
context: {
...state.context,
distributions: distributions.map(d => ({
agent: d.to,
taskId: d.task.id,
})),
expectedResponses: selectedAgents.length,
},
};
}
private async monitorSwarm(state: any) {
const { distributions, expectedResponses, startTime } = state.context;
const timeout = 30000; // 30 seconds
const checkInterval = 500; // Check every 500ms
const responses: any[] = [];
let iterations = 0;
while (responses.length < expectedResponses * 0.8) { // 80% threshold
if (Date.now() - startTime > timeout) {
console.log('Swarm timeout reached');
break;
}
// Check consensus results
for (const dist of distributions) {
const results = this.swarmState.consensusResults.get(dist.taskId) || [];
if (results.length > 0 && !responses.find(r => r.taskId === dist.taskId)) {
responses.push(...results);
}
}
// Update completion rate
const completionRate = responses.length / expectedResponses;
// Emit monitoring event every 10 iterations
if (iterations % 10 === 0) {
this.events.emit('swarm_progress', {
completionRate,
responseCount: responses.length,
expectedCount: expectedResponses,
});
}
iterations++;
await new Promise(resolve => setTimeout(resolve, checkInterval));
}
return {
context: {
...state.context,
responses,
completionRate: responses.length / expectedResponses,
monitoringTime: Date.now() - startTime,
},
};
}
private async achieveConsensus(state: any) {
const { responses, swarmConfig } = state.context;
if (swarmConfig.strategy === 'first-wins') {
// Return first valid response
const firstValid = responses.find((r: any) => r.status === 'success');
return {
messages: [new AIMessage(firstValid?.content || 'No valid response')],
context: {
...state.context,
consensus: firstValid,
consensusType: 'first-wins',
},
};
}
// Majority voting consensus
const [successful, failed] = partition(
responses,
(r: any) => r.status === 'success'
);
if (successful.length === 0) {
return {
messages: [new AIMessage('Swarm failed to reach consensus')],
context: {
...state.context,
consensus: null,
consensusType: 'failed',
},
};
}
// Use LLM to synthesize consensus
const consensus = await this.model.invoke([
new HumanMessage(`
Synthesize these swarm responses into a consensus answer:
${JSON.stringify(successful.slice(0, 5))}
Identify common patterns and create a unified response.
`)
]);
// Calculate agreement score
const agreementScore = this.calculateAgreement(successful);
// Update metrics
this.swarmState.swarmMetrics.completedTasks++;
this.swarmState.swarmMetrics.consensusAgreement = agreementScore;
return {
messages: [new AIMessage(consensus.content as string)],
context: {
...state.context,
consensus: consensus.content,
consensusType: 'majority',
agreementScore,
participantCount: successful.length,
},
};
}
private async adaptiveRebalance(state: any) {
const { distributions, completionRate } = state.context;
// Identify slow or failed agents
const slowAgents: string[] = [];
const failedAgents: string[] = [];
for (const dist of distributions) {
const results = this.swarmState.consensusResults.get(dist.taskId) || [];
if (results.length === 0) {
failedAgents.push(dist.agent);
} else if (results[0]?.responseTime > 10000) {
slowAgents.push(dist.agent);
}
}
// Remove failed agents from active pool
failedAgents.forEach(agent => {
this.swarmState.activeAgents.delete(agent);
});
// Redistribute tasks from failed agents
if (failedAgents.length > 0) {
const healthyAgents = Array.from(this.swarmState.activeAgents);
const redistributions = [];
for (const failedAgent of failedAgents) {
const tasks = this.swarmState.taskDistribution.get(failedAgent) || [];
const targetAgent = healthyAgents[Math.floor(Math.random() * healthyAgents.length)];
for (const taskId of tasks) {
redistributions.push({
from: failedAgent,
to: targetAgent,
taskId,
});
}
}
// Queue redistributed tasks
await this.redistributeTasks(redistributions, state.context.originalTask);
}
return {
context: {
...state.context,
rebalanced: true,
removedAgents: failedAgents,
slowAgents,
},
};
}
private async createTaskVariations(task: string, count: number): Promise<string[]> {
// Create slight variations to get diverse responses
const prompts = [
`${task}`,
`Please help with: ${task}`,
`I need assistance with: ${task}`,
`Can you solve: ${task}`,
`Analyze and respond to: ${task}`,
];
return Array(count).fill(0).map((_, i) => prompts[i % prompts.length]);
}
private async redistributeTasks(
redistributions: any[],
originalTask: string
) {
const messages = redistributions.map(r => ({
id: crypto.randomUUID(),
from: this.name,
to: r.to,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1' as const,
task: {
id: r.taskId,
type: 'request' as const,
context: { redistributed: true, originalAgent: r.from },
payload: { query: originalTask },
},
}));
await this.queue.batchRoute(messages);
}
private calculateAgreement(responses: any[]): number {
// Simple agreement calculation
if (responses.length < 2) return 1;
// Compare response similarity (simplified)
const contents = responses.map(r => r.content?.toLowerCase() || '');
let agreements = 0;
let comparisons = 0;
for (let i = 0; i < contents.length - 1; i++) {
for (let j = i + 1; j < contents.length; j++) {
comparisons++;
// Check if responses share common keywords
const words1 = new Set(contents[i].split(' '));
const words2 = new Set(contents[j].split(' '));
const intersection = new Set([...words1].filter(x => words2.has(x)));
const similarity = intersection.size / Math.max(words1.size, words2.size);
if (similarity > 0.5) agreements++;
}
}
return comparisons > 0 ? agreements / comparisons : 0;
}
private async discoverAgents(): Promise<string[]> {
// In production, use service discovery or registry
return ['research-agent-1', 'research-agent-2', 'analyst-agent-1', 'analyst-agent-2'];
}
private async handleAgentCompletion(agent: string, event: any) {
// Store result for consensus
const results = this.swarmState.consensusResults.get(event.taskId) || [];
results.push({
agent,
content: event.data?.result,
status: 'success',
responseTime: event.timestamp - event.startTime,
});
this.swarmState.consensusResults.set(event.taskId, results);
// Update metrics
const responseTimes = results.map(r => r.responseTime);
this.swarmState.swarmMetrics.averageResponseTime =
responseTimes.reduce((a, b) => a + b, 0) / responseTimes.length;
}
private async handleAgentFailure(agent: string, event: any) {
// Mark agent as potentially unreliable
const results = this.swarmState.consensusResults.get(event.taskId) || [];
results.push({
agent,
error: event.data?.error,
status: 'failed',
responseTime: event.timestamp - event.startTime,
});
this.swarmState.consensusResults.set(event.taskId, results);
// Consider removing agent from active pool if too many failures
const agentTasks = this.swarmState.taskDistribution.get(agent) || [];
const failureRate = results.filter(r => r.agent === agent && r.status === 'failed').length / agentTasks.length;
if (failureRate > 0.5) {
this.swarmState.activeAgents.delete(agent);
console.log(`Removed agent ${agent} due to high failure rate`);
}
}
private parseSwarmConfig(analysis: string): any {
// Extract configuration from LLM response
const config = {
parallelizable: analysis.includes('parallel'),
agentCount: 3, // Default
strategy: 'consensus' as 'consensus' | 'first-wins',
};
// Extract agent count if mentioned
const countMatch = analysis.match(/(\d+)\s*agents?/i);
if (countMatch) {
config.agentCount = Math.min(parseInt(countMatch[1]), 10);
}
// Determine strategy
if (analysis.includes('first') || analysis.includes('speed')) {
config.strategy = 'first-wins';
}
return config;
}
protected async processRequest(task: any): Promise<any> {
const result = await this.graph.invoke({
messages: [new HumanMessage(task.payload.query)],
currentAgent: this.name,
context: {
...task.context,
sessionId: task.id,
},
pendingHandoffs: [],
});
return {
consensus: result.context.consensus,
metrics: this.swarmState.swarmMetrics,
participantAgents: Array.from(this.swarmState.activeAgents),
agreementScore: result.context.agreementScore,
};
}
protected async acceptHandoff(task: any): Promise<any> {
// Swarm coordinator can accept handoffs for complex parallel tasks
return this.processRequest(task);
}
}
Implements a sophisticated swarm coordinator that distributes tasks across multiple agents, monitors progress, achieves consensus, and adaptively rebalances work based on agent performance.
4. Streaming API with Server-Sent Events
// app/api/agents/stream/route.ts
import { A2AMessageSchema } from '@/lib/protocols/a2a-protocol';
import { SwarmCoordinator } from '@/lib/agents/swarm-coordinator';
import { ResearchAgent } from '@/lib/agents/research-agent';
export const runtime = 'nodejs';
export const maxDuration = 300;
// Initialize agents
const swarmCoordinator = new SwarmCoordinator();
const researchAgent = new ResearchAgent();
export async function POST(req: Request) {
const { message } = await req.json();
// Validate message
const validated = A2AMessageSchema.parse(message);
// Create SSE stream
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
// Process in background
(async () => {
try {
// Determine which agent to use
const agent = validated.to === 'swarm-coordinator' ?
swarmCoordinator : researchAgent;
// Subscribe to agent events for streaming
agent.events.on('task_received', async (event) => {
await writer.write(encoder.encode(
`event: status\ndata: ${JSON.stringify({
type: 'task_received',
taskId: event.taskId,
agent: event.agentId,
})}\n\n`
));
});
agent.events.on('swarm_progress', async (event) => {
await writer.write(encoder.encode(
`event: progress\ndata: ${JSON.stringify({
type: 'swarm_progress',
completion: event.completionRate,
responses: event.responseCount,
})}\n\n`
));
});
agent.events.on('task_completed', async (event) => {
await writer.write(encoder.encode(
`event: completed\ndata: ${JSON.stringify({
type: 'task_completed',
result: event.data,
})}\n\n`
));
});
// Process the message
const result = await agent.processMessage(validated);
// Send final result
await writer.write(encoder.encode(
`event: result\ndata: ${JSON.stringify(result)}\n\n`
));
// Send done signal
await writer.write(encoder.encode('event: done\ndata: {}\n\n'));
} catch (error) {
await writer.write(encoder.encode(
`event: error\ndata: ${JSON.stringify({
error: error.message,
})}\n\n`
));
} finally {
await writer.close();
}
})();
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Content-Type-Options': 'nosniff',
},
});
}
Creates a streaming API endpoint using Server-Sent Events to provide real-time updates on agent processing, swarm progress, and task completion.
5. React Hook for Agent Communication
// hooks/useA2ACommunication.ts
import { useState, useCallback, useEffect, useRef } from 'react';
import { useMutation } from '@tanstack/react-query';
import { A2AMessage } from '@/lib/protocols/a2a-protocol';
import { debounce } from 'es-toolkit';
interface UseA2AOptions {
targetAgent: string;
streaming?: boolean;
onProgress?: (progress: any) => void;
onComplete?: (result: any) => void;
onError?: (error: any) => void;
}
interface A2AState {
isProcessing: boolean;
progress: number;
events: any[];
result: any;
error: any;
}
export function useA2ACommunication(options: UseA2AOptions) {
const [state, setState] = useState<A2AState>({
isProcessing: false,
progress: 0,
events: [],
result: null,
error: null,
});
const eventSourceRef = useRef<EventSource | null>(null);
const abortControllerRef = useRef<AbortController | null>(null);
// Send message via traditional POST
const sendMessage = useMutation({
mutationFn: async (query: string) => {
const message: A2AMessage = {
id: crypto.randomUUID(),
from: 'user',
to: options.targetAgent,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1',
task: {
id: crypto.randomUUID(),
type: 'request',
context: {},
payload: { query },
},
};
const response = await fetch(`/api/agents/${options.targetAgent}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(message),
});
if (!response.ok) {
throw new Error('Failed to send message');
}
return response.json();
},
onSuccess: (data) => {
setState(prev => ({ ...prev, result: data, isProcessing: false }));
options.onComplete?.(data);
},
onError: (error) => {
setState(prev => ({ ...prev, error, isProcessing: false }));
options.onError?.(error);
},
});
// Send message with streaming
const sendStreamingMessage = useCallback(
debounce(async (query: string) => {
// Clean up previous stream
if (eventSourceRef.current) {
eventSourceRef.current.close();
}
setState(prev => ({
...prev,
isProcessing: true,
progress: 0,
events: [],
error: null,
}));
const message: A2AMessage = {
id: crypto.randomUUID(),
from: 'user',
to: options.targetAgent,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1',
task: {
id: crypto.randomUUID(),
type: 'request',
context: {},
payload: { query },
},
};
// Use fetch with streaming
abortControllerRef.current = new AbortController();
try {
const response = await fetch('/api/agents/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
signal: abortControllerRef.current.signal,
});
if (!response.ok) {
throw new Error('Stream request failed');
}
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (!reader) {
throw new Error('No response body');
}
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('event:')) {
const eventType = line.substring(6).trim();
const dataLine = lines[lines.indexOf(line) + 1];
if (dataLine?.startsWith('data:')) {
const data = JSON.parse(dataLine.substring(5));
switch (eventType) {
case 'progress':
setState(prev => ({
...prev,
progress: data.completion * 100,
}));
options.onProgress?.(data);
break;
case 'status':
setState(prev => ({
...prev,
events: [...prev.events, data],
}));
break;
case 'result':
setState(prev => ({
...prev,
result: data,
isProcessing: false,
}));
options.onComplete?.(data);
break;
case 'error':
setState(prev => ({
...prev,
error: data.error,
isProcessing: false,
}));
options.onError?.(data.error);
break;
case 'done':
setState(prev => ({
...prev,
isProcessing: false,
}));
break;
}
}
}
}
}
} catch (error: any) {
if (error.name !== 'AbortError') {
setState(prev => ({
...prev,
error,
isProcessing: false,
}));
options.onError?.(error);
}
}
}, 500),
[options]
);
// Send message based on mode
const send = useCallback(
async (query: string) => {
if (options.streaming) {
await sendStreamingMessage(query);
} else {
sendMessage.mutate(query);
}
},
[options.streaming, sendMessage, sendStreamingMessage]
);
// Cancel ongoing operation
const cancel = useCallback(() => {
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
if (abortControllerRef.current) {
abortControllerRef.current.abort();
abortControllerRef.current = null;
}
setState(prev => ({
...prev,
isProcessing: false,
}));
}, []);
// Cleanup on unmount
useEffect(() => {
return () => {
cancel();
};
}, [cancel]);
return {
send,
cancel,
state,
isProcessing: state.isProcessing || sendMessage.isPending,
};
}
Provides a React hook for seamless agent communication with support for both traditional request-response and streaming modes, progress tracking, and cancellation.
6. Dashboard for Multi-Agent Monitoring
// app/dashboard/page.tsx
'use client';
import { useState, useEffect } from 'react';
import { useA2ACommunication } from '@/hooks/useA2ACommunication';
import { useQuery } from '@tanstack/react-query';
import { Line, Bar } from 'react-chartjs-2';
async function fetchAgentMetrics() {
const response = await fetch('/api/agents/metrics');
if (!response.ok) throw new Error('Failed to fetch metrics');
return response.json();
}
export default function Dashboard() {
const [selectedAgent, setSelectedAgent] = useState('swarm-coordinator');
const [query, setQuery] = useState('');
const [progressData, setProgressData] = useState<any[]>([]);
// Fetch real-time metrics
const { data: metrics, refetch } = useQuery({
queryKey: ['agent-metrics'],
queryFn: fetchAgentMetrics,
refetchInterval: 5000, // Refresh every 5 seconds
});
// Setup agent communication with streaming
const { send, cancel, state, isProcessing } = useA2ACommunication({
targetAgent: selectedAgent,
streaming: true,
onProgress: (progress) => {
setProgressData(prev => [...prev, {
time: Date.now(),
completion: progress.completion * 100,
responses: progress.responses,
}]);
},
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (query.trim()) {
setProgressData([]);
send(query);
}
};
return (
<div className="min-h-screen bg-base-200 p-4">
<div className="container mx-auto">
<h1 className="text-4xl font-bold mb-8">Multi-Agent Dashboard</h1>
{/* Agent Status Grid */}
<div className="grid grid-cols-1 md:grid-cols-3 gap-4 mb-8">
{metrics?.agents?.map((agent: any) => (
<div key={agent.name} className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">{agent.name}</h2>
<div className="stats stats-vertical">
<div className="stat">
<div className="stat-title">Status</div>
<div className={`stat-value text-sm ${
agent.status === 'active' ? 'text-success' : 'text-warning'
}`}>
{agent.status}
</div>
</div>
<div className="stat">
<div className="stat-title">Tasks</div>
<div className="stat-value text-sm">{agent.taskCount}</div>
</div>
<div className="stat">
<div className="stat-title">Avg Response</div>
<div className="stat-value text-sm">{agent.avgResponseTime}ms</div>
</div>
</div>
</div>
</div>
))}
</div>
{/* Queue Metrics */}
{metrics?.queues && (
<div className="card bg-base-100 shadow-xl mb-8">
<div className="card-body">
<h2 className="card-title">Queue Status</h2>
<div className="overflow-x-auto">
<table className="table">
<thead>
<tr>
<th>Queue</th>
<th>Waiting</th>
<th>Active</th>
<th>Completed</th>
<th>Failed</th>
</tr>
</thead>
<tbody>
{Object.entries(metrics.queues).map(([name, stats]: [string, any]) => (
<tr key={name}>
<td>{name}</td>
<td>{stats.waiting}</td>
<td>{stats.active}</td>
<td>{stats.completed}</td>
<td>{stats.failed}</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
</div>
)}
{/* Agent Interaction */}
<div className="card bg-base-100 shadow-xl mb-8">
<div className="card-body">
<h2 className="card-title">Test Agent Communication</h2>
{/* Agent selector */}
<select
className="select select-bordered w-full max-w-xs mb-4"
value={selectedAgent}
onChange={(e) => setSelectedAgent(e.target.value)}
>
<option value="swarm-coordinator">Swarm Coordinator</option>
<option value="supervisor-agent">Supervisor Agent</option>
<option value="research-agent">Research Agent</option>
</select>
{/* Query form */}
<form onSubmit={handleSubmit} className="join w-full">
<input
type="text"
value={query}
onChange={(e) => setQuery(e.target.value)}
placeholder="Enter your query..."
className="input input-bordered join-item flex-1"
disabled={isProcessing}
/>
<button
type="submit"
className="btn btn-primary join-item"
disabled={isProcessing || !query.trim()}
>
{isProcessing ? 'Processing...' : 'Send'}
</button>
{isProcessing && (
<button
type="button"
onClick={cancel}
className="btn btn-error join-item"
>
Cancel
</button>
)}
</form>
{/* Progress visualization */}
{progressData.length > 0 && (
<div className="mt-4">
<progress
className="progress progress-primary w-full"
value={progressData[progressData.length - 1]?.completion || 0}
max="100"
></progress>
<p className="text-sm mt-2">
Progress: {Math.round(progressData[progressData.length - 1]?.completion || 0)}%
</p>
</div>
)}
{/* Events log */}
{state.events.length > 0 && (
<div className="mt-4">
<h3 className="font-bold mb-2">Processing Events:</h3>
<div className="max-h-40 overflow-y-auto bg-base-200 p-2 rounded">
{state.events.map((event, idx) => (
<div key={idx} className="text-xs mb-1">
[{event.type}] {event.agent}: Task {event.taskId}
</div>
))}
</div>
</div>
)}
{/* Result display */}
{state.result && (
<div className="alert alert-success mt-4">
<div>
<h3 className="font-bold">Result:</h3>
<pre className="text-sm">{JSON.stringify(state.result, null, 2)}</pre>
</div>
</div>
)}
{/* Error display */}
{state.error && (
<div className="alert alert-error mt-4">
<div>
<h3 className="font-bold">Error:</h3>
<p>{state.error.message || 'An error occurred'}</p>
</div>
</div>
)}
</div>
</div>
</div>
</div>
);
}
Creates a comprehensive dashboard for monitoring multi-agent systems with real-time metrics, queue status, agent testing interface, and progress visualization.
7. Deployment Configuration
// vercel.json
{
"functions": {
"app/api/agents/*/route.ts": {
"maxDuration": 300
},
"app/api/agents/stream/route.ts": {
"maxDuration": 300
}
},
"env": {
"REDIS_URL": "@redis-url",
"GOOGLE_API_KEY": "@google-api-key",
"SUPERVISOR_API_KEY": "@supervisor-api-key"
},
"crons": [
{
"path": "/api/agents/health",
"schedule": "*/5 * * * *"
}
]
}
Configures Vercel deployment with extended timeouts for agent operations, environment variables, and health check crons.
Conclusion
Inter-agent Communication (A2A) represents a fundamental shift in how we build AI systems, moving from isolated agents to collaborative swarms that can tackle complex, multi-faceted problems. By leveraging TypeScript's type safety, LangGraph's orchestration capabilities, and Vercel's serverless infrastructure with 800-second execution times, developers can now build production-ready multi-agent systems that were technically impossible just two years ago.
The patterns demonstrated here—from simple handoff mechanisms to sophisticated swarm coordination with consensus algorithms—provide a foundation for building scalable, resilient agent networks. The combination of event-driven architectures, message queues, and streaming interfaces ensures these systems can handle real-world production loads while maintaining observability and control.
As the ecosystem continues to evolve with standardized protocols like Google's A2A and Anthropic's MCP, the ability for agents built with different frameworks to seamlessly collaborate will unlock new possibilities for AI applications. The future of AI lies not in singular, monolithic models, but in orchestrated networks of specialized agents working in concert—and the tools and patterns to build these systems are available today.