DRAFT Agentic Design Patterns - Inter-agent Communication (A2A)

agentsa2atypescriptlanggraphvercelaimulti-agent
By sko X opus 4.19/21/202532 min read

Learn how to build production-ready multi-agent systems with Agent-to-Agent communication using TypeScript, LangGraph, and Vercel's serverless platform.

Mental Model: The Orchestra Conductor Pattern

Think of A2A communication like a distributed orchestra where each agent is a specialized musician. The conductor (supervisor agent) doesn't play every instrument but coordinates the performance. Musicians (specialized agents) can pass sheet music (context/tasks) directly to each other or through the conductor. Some pieces require all musicians to play together (synchronous), while others allow soloists to perform independently and join when ready (asynchronous). The venue (Vercel's platform) provides the acoustics and infrastructure, while the score (LangGraph) defines how the music flows. Just as musicians use standard notation to communicate regardless of their instrument, A2A protocols enable agents built with different frameworks to collaborate seamlessly.

Basic Example: Simple Agent Handoff Pattern

1. Project Setup with Core Dependencies

# Initialize NextJS 15 project with TypeScript
npx create-next-app@latest a2a-agents --typescript --tailwind --app --no-src-dir
cd a2a-agents

# Install LangGraph and agent communication packages
npm install @langchain/langgraph @langchain/core @langchain/community
npm install @langchain/google-genai zod uuid
npm install @tanstack/react-query es-toolkit daisyui
npm install @vercel/kv bullmq ioredis

Initializes a Next.js 15 project optimized for multi-agent development with all necessary dependencies for agent orchestration and communication.

2. Define Agent Communication Protocol

// lib/protocols/a2a-protocol.ts
import { z } from 'zod';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
import { pipe, map, filter } from 'es-toolkit';

// Define the A2A message schema
export const A2AMessageSchema = z.object({
  id: z.string().uuid(),
  from: z.string(),
  to: z.string(),
  timestamp: z.string().datetime(),
  protocol: z.literal('a2a/v1'),
  task: z.object({
    id: z.string().uuid(),
    type: z.enum(['request', 'response', 'handoff', 'error']),
    skill: z.string().optional(),
    context: z.record(z.any()),
    payload: z.any(),
    metadata: z.object({
      priority: z.enum(['low', 'medium', 'high']).default('medium'),
      timeout: z.number().default(30000),
      retries: z.number().default(3),
    }).optional(),
  }),
});

export type A2AMessage = z.infer<typeof A2AMessageSchema>;

// Agent Card definition (agent capabilities)
export const AgentCardSchema = z.object({
  name: z.string(),
  description: z.string(),
  url: z.string().url(),
  version: z.string(),
  capabilities: z.object({
    streaming: z.boolean(),
    async: z.boolean(),
    maxConcurrent: z.number(),
  }),
  skills: z.array(z.object({
    id: z.string(),
    name: z.string(),
    description: z.string(),
    inputSchema: z.any(),
    outputSchema: z.any(),
  })),
  authentication: z.object({
    type: z.enum(['none', 'apiKey', 'oauth2']),
    config: z.record(z.any()).optional(),
  }),
});

export type AgentCard = z.infer<typeof AgentCardSchema>;

// Message factory using es-toolkit
export class A2AMessageFactory {
  static createHandoffMessage(
    from: string,
    to: string,
    task: any,
    context: Record<string, any>
  ): A2AMessage {
    return pipe(
      {
        id: crypto.randomUUID(),
        from,
        to,
        timestamp: new Date().toISOString(),
        protocol: 'a2a/v1' as const,
        task: {
          id: crypto.randomUUID(),
          type: 'handoff' as const,
          context,
          payload: task,
        },
      },
      (msg) => A2AMessageSchema.parse(msg)
    );
  }
}

Establishes a type-safe A2A protocol with message schemas, agent cards for capability discovery, and factory methods for creating standardized messages.

3. Create Base Agent with Communication Interface

// lib/agents/base-agent.ts
import { StateGraph, StateGraphArgs } from '@langchain/langgraph';
import { BaseMessage } from '@langchain/core/messages';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { A2AMessage, AgentCard } from '@/lib/protocols/a2a-protocol';
import { debounce, throttle } from 'es-toolkit';

export interface AgentState {
  messages: BaseMessage[];
  currentAgent: string;
  context: Record<string, any>;
  pendingHandoffs: A2AMessage[];
}

export abstract class BaseA2Agent {
  protected name: string;
  protected model: ChatGoogleGenerativeAI;
  protected graph: StateGraph<AgentState>;
  protected card: AgentCard;
  
  constructor(name: string, card: AgentCard) {
    this.name = name;
    this.card = card;
    
    // Initialize with Gemini Flash for fast responses
    this.model = new ChatGoogleGenerativeAI({
      modelName: 'gemini-2.5-flash',
      temperature: 0.7,
      streaming: true,
      maxOutputTokens: 2048,
    });
    
    // Setup state graph for agent workflow
    this.graph = new StateGraph<AgentState>({
      channels: {
        messages: {
          value: (old: BaseMessage[], next: BaseMessage[]) => [...old, ...next],
        },
        currentAgent: {
          value: (old: string, next: string) => next,
        },
        context: {
          value: (old: Record<string, any>, next: Record<string, any>) => ({
            ...old,
            ...next,
          }),
        },
        pendingHandoffs: {
          value: (old: A2AMessage[], next: A2AMessage[]) => [...old, ...next],
        },
      },
    });
    
    this.setupGraph();
  }
  
  // Abstract method for subclasses to implement
  protected abstract setupGraph(): void;
  
  // Process incoming A2A messages
  async processMessage(message: A2AMessage): Promise<A2AMessage> {
    const startTime = Date.now();
    
    try {
      // Validate message is for this agent
      if (message.to !== this.name) {
        throw new Error(`Message not for this agent: ${message.to}`);
      }
      
      // Process based on task type
      const result = await this.handleTask(message.task);
      
      // Create response message
      return {
        id: crypto.randomUUID(),
        from: this.name,
        to: message.from,
        timestamp: new Date().toISOString(),
        protocol: 'a2a/v1',
        task: {
          id: message.task.id,
          type: 'response',
          context: {
            ...message.task.context,
            processingTime: Date.now() - startTime,
          },
          payload: result,
        },
      };
    } catch (error) {
      return this.createErrorResponse(message, error);
    }
  }
  
  // Handle different task types
  protected async handleTask(task: A2AMessage['task']): Promise<any> {
    switch (task.type) {
      case 'request':
        return await this.processRequest(task);
      case 'handoff':
        return await this.acceptHandoff(task);
      default:
        throw new Error(`Unknown task type: ${task.type}`);
    }
  }
  
  protected abstract processRequest(task: A2AMessage['task']): Promise<any>;
  protected abstract acceptHandoff(task: A2AMessage['task']): Promise<any>;
  
  // Throttled error response creation
  protected createErrorResponse = throttle(
    (message: A2AMessage, error: any): A2AMessage => {
      return {
        id: crypto.randomUUID(),
        from: this.name,
        to: message.from,
        timestamp: new Date().toISOString(),
        protocol: 'a2a/v1',
        task: {
          id: message.task.id,
          type: 'error',
          context: message.task.context,
          payload: {
            error: error.message || 'Unknown error',
            stack: process.env.NODE_ENV === 'development' ? error.stack : undefined,
          },
        },
      };
    },
    1000 // Throttle error responses to prevent spam
  );
}

Provides an abstract base class for agents with built-in A2A message processing, state management via LangGraph, and error handling with es-toolkit utilities.

4. Implement Specialized Agents

// lib/agents/research-agent.ts
import { BaseA2Agent } from './base-agent';
import { WebBrowser } from '@langchain/community/tools/webbrowser';
import { GoogleGenerativeAIEmbeddings } from '@langchain/google-genai';
import { HumanMessage, AIMessage } from '@langchain/core/messages';
import { groupBy, chunk } from 'es-toolkit';

export class ResearchAgent extends BaseA2Agent {
  private browser: WebBrowser;
  private embeddings: GoogleGenerativeAIEmbeddings;
  
  constructor() {
    const card = {
      name: 'research-agent',
      description: 'Specialized in web research and information gathering',
      url: process.env.VERCEL_URL ? 
        `https://${process.env.VERCEL_URL}/api/agents/research` : 
        'http://localhost:3000/api/agents/research',
      version: '1.0.0',
      capabilities: {
        streaming: true,
        async: true,
        maxConcurrent: 5,
      },
      skills: [
        {
          id: 'web-search',
          name: 'Web Search',
          description: 'Search and extract information from the web',
          inputSchema: { query: 'string' },
          outputSchema: { results: 'array' },
        },
        {
          id: 'summarize',
          name: 'Summarize',
          description: 'Create concise summaries of research findings',
          inputSchema: { content: 'string' },
          outputSchema: { summary: 'string' },
        },
      ],
      authentication: {
        type: 'apiKey',
        config: { header: 'X-API-Key' },
      },
    };
    
    super('research-agent', card);
    
    this.embeddings = new GoogleGenerativeAIEmbeddings({
      modelName: 'embedding-001',
    });
    
    this.browser = new WebBrowser({ 
      model: this.model, 
      embeddings: this.embeddings 
    });
  }
  
  protected setupGraph(): void {
    // Define the research workflow
    this.graph
      .addNode('analyze', this.analyzeRequest.bind(this))
      .addNode('search', this.performSearch.bind(this))
      .addNode('synthesize', this.synthesizeResults.bind(this))
      .addNode('decide_handoff', this.decideHandoff.bind(this))
      .addEdge('__start__', 'analyze')
      .addEdge('analyze', 'search')
      .addEdge('search', 'synthesize')
      .addEdge('synthesize', 'decide_handoff');
  }
  
  private async analyzeRequest(state: any) {
    const lastMessage = state.messages[state.messages.length - 1];
    const analysis = await this.model.invoke([
      new HumanMessage(`Analyze this request and identify key search terms: ${lastMessage.content}`)
    ]);
    
    return {
      context: {
        ...state.context,
        searchTerms: this.extractSearchTerms(analysis.content as string),
      },
    };
  }
  
  private async performSearch(state: any) {
    const { searchTerms } = state.context;
    
    // Batch search queries for efficiency
    const searchBatches = chunk(searchTerms, 3);
    const results = [];
    
    for (const batch of searchBatches) {
      const batchResults = await Promise.all(
        batch.map(term => this.browser.invoke(term))
      );
      results.push(...batchResults);
    }
    
    return {
      context: {
        ...state.context,
        searchResults: results,
      },
    };
  }
  
  private async synthesizeResults(state: any) {
    const { searchResults } = state.context;
    
    // Group results by relevance
    const grouped = groupBy(searchResults, (result: any) => 
      result.relevance > 0.8 ? 'high' : result.relevance > 0.5 ? 'medium' : 'low'
    );
    
    const synthesis = await this.model.invoke([
      new HumanMessage(`Synthesize these search results into a comprehensive answer: 
        ${JSON.stringify(grouped.high || [])}`)
    ]);
    
    return {
      messages: [new AIMessage(synthesis.content as string)],
      context: {
        ...state.context,
        synthesis: synthesis.content,
        confidence: grouped.high ? 'high' : 'medium',
      },
    };
  }
  
  private async decideHandoff(state: any) {
    const { confidence, synthesis } = state.context;
    
    // Decide if we need to handoff to another agent
    if (confidence === 'low' || synthesis.includes('need more analysis')) {
      return {
        pendingHandoffs: [{
          id: crypto.randomUUID(),
          from: this.name,
          to: 'analyst-agent',
          timestamp: new Date().toISOString(),
          protocol: 'a2a/v1' as const,
          task: {
            id: crypto.randomUUID(),
            type: 'handoff' as const,
            context: state.context,
            payload: {
              request: 'Deep analysis required',
              preliminaryFindings: synthesis,
            },
          },
        }],
      };
    }
    
    return { pendingHandoffs: [] };
  }
  
  protected async processRequest(task: any): Promise<any> {
    const result = await this.graph.invoke({
      messages: [new HumanMessage(task.payload.query)],
      currentAgent: this.name,
      context: task.context || {},
      pendingHandoffs: [],
    });
    
    return {
      results: result.messages[result.messages.length - 1].content,
      handoffs: result.pendingHandoffs,
    };
  }
  
  protected async acceptHandoff(task: any): Promise<any> {
    // Process handoff from another agent
    return this.processRequest(task);
  }
  
  private extractSearchTerms(analysis: string): string[] {
    // Simple extraction logic - in production use NLP
    return analysis.match(/["'](.*?)["']/g)?.map(term => 
      term.replace(/["']/g, '')
    ) || [];
  }
}

Implements a research agent with web browsing capabilities, search batching for efficiency, and intelligent handoff decisions based on confidence levels.

5. Create Supervisor Agent for Orchestration

// lib/agents/supervisor-agent.ts
import { BaseA2Agent } from './base-agent';
import { A2AMessage, A2AMessageFactory } from '@/lib/protocols/a2a-protocol';
import { HumanMessage, AIMessage } from '@langchain/core/messages';
import { sortBy, uniqBy } from 'es-toolkit';
import { kv } from '@vercel/kv';

interface AgentRegistry {
  [key: string]: {
    card: any;
    endpoint: string;
    status: 'active' | 'inactive' | 'busy';
    lastSeen: number;
  };
}

export class SupervisorAgent extends BaseA2Agent {
  private agentRegistry: AgentRegistry = {};
  private taskQueue: A2AMessage[] = [];
  
  constructor() {
    const card = {
      name: 'supervisor-agent',
      description: 'Orchestrates and coordinates multiple specialized agents',
      url: process.env.VERCEL_URL ? 
        `https://${process.env.VERCEL_URL}/api/agents/supervisor` : 
        'http://localhost:3000/api/agents/supervisor',
      version: '1.0.0',
      capabilities: {
        streaming: true,
        async: true,
        maxConcurrent: 10,
      },
      skills: [
        {
          id: 'orchestrate',
          name: 'Orchestrate',
          description: 'Coordinate multiple agents to complete complex tasks',
          inputSchema: { task: 'string', agents: 'array' },
          outputSchema: { result: 'any', trace: 'array' },
        },
        {
          id: 'delegate',
          name: 'Delegate',
          description: 'Assign tasks to appropriate specialized agents',
          inputSchema: { task: 'string' },
          outputSchema: { assignedTo: 'string', taskId: 'string' },
        },
      ],
      authentication: {
        type: 'apiKey',
        config: { header: 'X-Supervisor-Key' },
      },
    };
    
    super('supervisor-agent', card);
    this.initializeRegistry();
  }
  
  private async initializeRegistry() {
    // Load agent registry from Vercel KV
    try {
      const registry = await kv.get<AgentRegistry>('agent-registry');
      if (registry) {
        this.agentRegistry = registry;
      }
    } catch (error) {
      console.log('No existing registry, starting fresh');
    }
    
    // Register default agents
    this.registerAgent('research-agent', {
      card: { /* Research agent card */ },
      endpoint: '/api/agents/research',
      status: 'active',
      lastSeen: Date.now(),
    });
    
    this.registerAgent('analyst-agent', {
      card: { /* Analyst agent card */ },
      endpoint: '/api/agents/analyst',
      status: 'active',
      lastSeen: Date.now(),
    });
    
    // Persist registry
    await kv.set('agent-registry', this.agentRegistry);
  }
  
  protected setupGraph(): void {
    this.graph
      .addNode('analyze_task', this.analyzeTask.bind(this))
      .addNode('select_agents', this.selectAgents.bind(this))
      .addNode('delegate_tasks', this.delegateTasks.bind(this))
      .addNode('monitor_progress', this.monitorProgress.bind(this))
      .addNode('aggregate_results', this.aggregateResults.bind(this))
      .addEdge('__start__', 'analyze_task')
      .addEdge('analyze_task', 'select_agents')
      .addEdge('select_agents', 'delegate_tasks')
      .addEdge('delegate_tasks', 'monitor_progress')
      .addEdge('monitor_progress', 'aggregate_results');
  }
  
  private async analyzeTask(state: any) {
    const lastMessage = state.messages[state.messages.length - 1];
    
    // Use LLM to understand task requirements
    const analysis = await this.model.invoke([
      new HumanMessage(`
        Analyze this task and determine:
        1. What type of expertise is needed
        2. Whether it needs sequential or parallel processing
        3. Estimated complexity (simple/moderate/complex)
        
        Task: ${lastMessage.content}
      `)
    ]);
    
    return {
      context: {
        ...state.context,
        taskAnalysis: this.parseTaskAnalysis(analysis.content as string),
        originalTask: lastMessage.content,
      },
    };
  }
  
  private async selectAgents(state: any) {
    const { taskAnalysis } = state.context;
    
    // Get active agents sorted by relevance
    const activeAgents = Object.entries(this.agentRegistry)
      .filter(([_, agent]) => agent.status === 'active')
      .map(([name, agent]) => ({ name, ...agent }));
    
    // Score agents based on task requirements
    const scoredAgents = activeAgents.map(agent => ({
      ...agent,
      score: this.calculateAgentScore(agent, taskAnalysis),
    }));
    
    // Select top agents for the task
    const selectedAgents = sortBy(scoredAgents, 'score')
      .reverse()
      .slice(0, taskAnalysis.complexity === 'complex' ? 3 : 2);
    
    return {
      context: {
        ...state.context,
        selectedAgents: selectedAgents.map(a => a.name),
      },
    };
  }
  
  private async delegateTasks(state: any) {
    const { selectedAgents, taskAnalysis, originalTask } = state.context;
    const delegatedTasks: A2AMessage[] = [];
    
    for (const agentName of selectedAgents) {
      const message = A2AMessageFactory.createHandoffMessage(
        this.name,
        agentName,
        {
          task: originalTask,
          requirements: taskAnalysis,
          deadline: Date.now() + 30000, // 30 second deadline
        },
        state.context
      );
      
      delegatedTasks.push(message);
      this.taskQueue.push(message);
    }
    
    // Store task delegation in KV for tracking
    await kv.set(`tasks:${state.context.sessionId}`, delegatedTasks, {
      ex: 3600, // Expire after 1 hour
    });
    
    return {
      context: {
        ...state.context,
        delegatedTasks: delegatedTasks.map(t => t.id),
      },
    };
  }
  
  private async monitorProgress(state: any) {
    const { delegatedTasks } = state.context;
    const responses: any[] = [];
    const timeout = 30000; // 30 seconds
    const startTime = Date.now();
    
    // Poll for responses with timeout
    while (responses.length < delegatedTasks.length) {
      if (Date.now() - startTime > timeout) {
        console.log('Timeout waiting for agent responses');
        break;
      }
      
      // Check for responses in KV
      for (const taskId of delegatedTasks) {
        const response = await kv.get(`response:${taskId}`);
        if (response && !responses.find(r => r.taskId === taskId)) {
          responses.push(response);
        }
      }
      
      // Wait before next poll
      await new Promise(resolve => setTimeout(resolve, 1000));
    }
    
    return {
      context: {
        ...state.context,
        agentResponses: responses,
      },
    };
  }
  
  private async aggregateResults(state: any) {
    const { agentResponses } = state.context;
    
    // Remove duplicate information
    const uniqueResponses = uniqBy(agentResponses, (r: any) => 
      JSON.stringify(r.payload)
    );
    
    // Use LLM to synthesize all responses
    const synthesis = await this.model.invoke([
      new HumanMessage(`
        Synthesize these agent responses into a comprehensive answer:
        ${JSON.stringify(uniqueResponses)}
        
        Create a unified response that combines all insights.
      `)
    ]);
    
    return {
      messages: [new AIMessage(synthesis.content as string)],
      context: {
        ...state.context,
        finalResult: synthesis.content,
        contributingAgents: uniqueResponses.map((r: any) => r.from),
      },
    };
  }
  
  private registerAgent(name: string, info: any) {
    this.agentRegistry[name] = info;
  }
  
  private parseTaskAnalysis(analysis: string): any {
    // Simple parsing - in production use structured output
    return {
      expertise: analysis.includes('research') ? ['research'] : ['general'],
      processing: analysis.includes('parallel') ? 'parallel' : 'sequential',
      complexity: analysis.includes('complex') ? 'complex' : 'simple',
    };
  }
  
  private calculateAgentScore(agent: any, taskAnalysis: any): number {
    // Simple scoring algorithm
    let score = 0;
    
    // Check if agent has required skills
    if (agent.card?.skills) {
      score += agent.card.skills.length * 10;
    }
    
    // Prefer recently active agents
    const hoursSinceActive = (Date.now() - agent.lastSeen) / 3600000;
    score -= hoursSinceActive * 5;
    
    // Boost score if agent matches expertise
    if (taskAnalysis.expertise?.some((exp: string) => 
      agent.name.includes(exp)
    )) {
      score += 50;
    }
    
    return Math.max(0, score);
  }
  
  protected async processRequest(task: any): Promise<any> {
    const result = await this.graph.invoke({
      messages: [new HumanMessage(task.payload.query)],
      currentAgent: this.name,
      context: {
        ...task.context,
        sessionId: task.id,
      },
      pendingHandoffs: [],
    });
    
    return {
      result: result.context.finalResult,
      trace: result.context.contributingAgents,
      metrics: {
        totalAgents: result.context.selectedAgents.length,
        processingTime: Date.now() - result.context.startTime,
      },
    };
  }
  
  protected async acceptHandoff(task: any): Promise<any> {
    // Supervisor doesn't typically accept handoffs
    return { error: 'Supervisor does not accept handoffs' };
  }
}

Implements a supervisor agent that analyzes tasks, selects appropriate agents, delegates work, monitors progress, and aggregates results using Vercel KV for state persistence.

6. API Routes for Agent Endpoints

// app/api/agents/supervisor/route.ts
import { SupervisorAgent } from '@/lib/agents/supervisor-agent';
import { A2AMessageSchema } from '@/lib/protocols/a2a-protocol';
import { NextResponse } from 'next/server';
import { kv } from '@vercel/kv';

export const runtime = 'nodejs';
export const maxDuration = 300;

const supervisor = new SupervisorAgent();

export async function POST(req: Request) {
  try {
    // Parse and validate incoming A2A message
    const rawMessage = await req.json();
    const message = A2AMessageSchema.parse(rawMessage);
    
    // Check authentication
    const apiKey = req.headers.get('X-Supervisor-Key');
    if (!apiKey || apiKey !== process.env.SUPERVISOR_API_KEY) {
      return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
    }
    
    // Process message with supervisor
    const response = await supervisor.processMessage(message);
    
    // Store response for async retrieval
    await kv.set(`response:${message.task.id}`, response, {
      ex: 3600, // Expire after 1 hour
    });
    
    return NextResponse.json(response);
  } catch (error: any) {
    console.error('Supervisor error:', error);
    return NextResponse.json(
      { error: error.message || 'Internal server error' },
      { status: 500 }
    );
  }
}

// Agent discovery endpoint
export async function GET(req: Request) {
  const agent = new SupervisorAgent();
  return NextResponse.json(agent.card);
}

Creates RESTful API endpoints for the supervisor agent with authentication, message validation, and agent discovery support.

7. Client Component with React Query

// components/MultiAgentChat.tsx
'use client';

import { useState } from 'react';
import { useMutation, useQuery } from '@tanstack/react-query';
import { debounce } from 'es-toolkit';

interface AgentResponse {
  result: string;
  trace: string[];
  metrics: {
    totalAgents: number;
    processingTime: number;
  };
}

async function sendToSupervisor(query: string): Promise<AgentResponse> {
  const message = {
    id: crypto.randomUUID(),
    from: 'user',
    to: 'supervisor-agent',
    timestamp: new Date().toISOString(),
    protocol: 'a2a/v1',
    task: {
      id: crypto.randomUUID(),
      type: 'request',
      context: {},
      payload: { query },
    },
  };
  
  const response = await fetch('/api/agents/supervisor', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'X-Supervisor-Key': process.env.NEXT_PUBLIC_SUPERVISOR_KEY || '',
    },
    body: JSON.stringify(message),
  });
  
  if (!response.ok) {
    throw new Error('Failed to process request');
  }
  
  const data = await response.json();
  return data.task.payload;
}

export default function MultiAgentChat() {
  const [input, setInput] = useState('');
  const [messages, setMessages] = useState<Array<{
    role: 'user' | 'assistant';
    content: string;
    metadata?: any;
  }>>([]);
  
  const mutation = useMutation({
    mutationFn: sendToSupervisor,
    onSuccess: (data) => {
      setMessages(prev => [...prev, {
        role: 'assistant',
        content: data.result,
        metadata: {
          agents: data.trace,
          processingTime: data.metrics.processingTime,
        },
      }]);
    },
  });
  
  const handleSubmit = debounce(async (e: React.FormEvent) => {
    e.preventDefault();
    if (!input.trim()) return;
    
    const userMessage = input;
    setInput('');
    setMessages(prev => [...prev, {
      role: 'user',
      content: userMessage,
    }]);
    
    mutation.mutate(userMessage);
  }, 500);
  
  return (
    <div className="card bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">Multi-Agent System</h2>
        
        {/* Chat messages */}
        <div className="h-96 overflow-y-auto space-y-4 p-4 bg-base-200 rounded-lg">
          {messages.map((msg, idx) => (
            <div key={idx} className={`chat ${msg.role === 'user' ? 'chat-end' : 'chat-start'}`}>
              <div className="chat-bubble">
                {msg.content}
                {msg.metadata && (
                  <div className="text-xs mt-2 opacity-70">
                    Processed by: {msg.metadata.agents.join(', ')} 
                    ({msg.metadata.processingTime}ms)
                  </div>
                )}
              </div>
            </div>
          ))}
          
          {mutation.isPending && (
            <div className="chat chat-start">
              <div className="chat-bubble">
                <span className="loading loading-dots loading-sm"></span>
              </div>
            </div>
          )}
        </div>
        
        {/* Input form */}
        <form onSubmit={handleSubmit} className="join w-full">
          <input
            type="text"
            value={input}
            onChange={(e) => setInput(e.target.value)}
            placeholder="Ask the agents..."
            className="input input-bordered join-item flex-1"
            disabled={mutation.isPending}
          />
          <button 
            type="submit" 
            className="btn btn-primary join-item"
            disabled={mutation.isPending || !input.trim()}
          >
            Send
          </button>
        </form>
      </div>
    </div>
  );
}

Creates a React component for interacting with the multi-agent system, displaying agent traces and processing metrics with real-time updates.

Advanced Example: Event-Driven Agent Swarm

1. Setup Message Queue Infrastructure

// lib/queue/agent-queue.ts
import { Queue, Worker, Job } from 'bullmq';
import Redis from 'ioredis';
import { A2AMessage, A2AMessageSchema } from '@/lib/protocols/a2a-protocol';
import { pipe, groupBy, partition } from 'es-toolkit';

// Create Redis connection for BullMQ
const connection = new Redis(process.env.REDIS_URL || 'redis://localhost:6379', {
  maxRetriesPerRequest: null,
  enableReadyCheck: false,
});

// Define queue names for different agent types
export const QUEUE_NAMES = {
  RESEARCH: 'research-queue',
  ANALYSIS: 'analysis-queue',
  SYNTHESIS: 'synthesis-queue',
  SUPERVISOR: 'supervisor-queue',
  PRIORITY: 'priority-queue',
} as const;

// Create typed queue wrapper
export class AgentQueue {
  private queues: Map<string, Queue<A2AMessage>> = new Map();
  private workers: Map<string, Worker<A2AMessage>> = new Map();
  
  constructor() {
    this.initializeQueues();
  }
  
  private initializeQueues() {
    // Create a queue for each agent type
    Object.values(QUEUE_NAMES).forEach(queueName => {
      this.queues.set(queueName, new Queue<A2AMessage>(queueName, {
        connection,
        defaultJobOptions: {
          removeOnComplete: { count: 100 },
          removeOnFail: { count: 500 },
          attempts: 3,
          backoff: {
            type: 'exponential',
            delay: 2000,
          },
        },
      }));
    });
  }
  
  // Add message to appropriate queue based on routing rules
  async routeMessage(message: A2AMessage): Promise<Job<A2AMessage>> {
    // Validate message
    const validated = A2AMessageSchema.parse(message);
    
    // Determine queue based on destination agent
    const queueName = this.getQueueForAgent(validated.to);
    const queue = this.queues.get(queueName);
    
    if (!queue) {
      throw new Error(`No queue found for agent: ${validated.to}`);
    }
    
    // Add to queue with priority
    const priority = this.calculatePriority(validated);
    return await queue.add(
      `${validated.to}:${validated.task.type}`,
      validated,
      {
        priority,
        delay: validated.task.metadata?.delay || 0,
      }
    );
  }
  
  // Batch route multiple messages efficiently
  async batchRoute(messages: A2AMessage[]): Promise<Job<A2AMessage>[]> {
    // Group messages by destination queue
    const grouped = groupBy(messages, msg => this.getQueueForAgent(msg.to));
    
    const jobs: Job<A2AMessage>[] = [];
    
    for (const [queueName, msgs] of Object.entries(grouped)) {
      const queue = this.queues.get(queueName);
      if (!queue) continue;
      
      // Bulk add to queue
      const bulkJobs = await queue.addBulk(
        msgs.map(msg => ({
          name: `${msg.to}:${msg.task.type}`,
          data: msg,
          opts: {
            priority: this.calculatePriority(msg),
          },
        }))
      );
      
      jobs.push(...bulkJobs);
    }
    
    return jobs;
  }
  
  // Create worker for processing queue
  createWorker(
    queueName: string,
    processor: (job: Job<A2AMessage>) => Promise<any>
  ): Worker<A2AMessage> {
    const worker = new Worker<A2AMessage>(
      queueName,
      async (job) => {
        console.log(`Processing job ${job.id} in ${queueName}`);
        return await processor(job);
      },
      {
        connection,
        concurrency: 5,
        limiter: {
          max: 10,
          duration: 1000,
        },
      }
    );
    
    // Add event listeners
    worker.on('completed', (job) => {
      console.log(`Job ${job.id} completed`);
    });
    
    worker.on('failed', (job, err) => {
      console.error(`Job ${job?.id} failed:`, err);
    });
    
    this.workers.set(queueName, worker);
    return worker;
  }
  
  private getQueueForAgent(agentName: string): string {
    // Route to appropriate queue based on agent type
    if (agentName.includes('research')) return QUEUE_NAMES.RESEARCH;
    if (agentName.includes('analysis')) return QUEUE_NAMES.ANALYSIS;
    if (agentName.includes('synthesis')) return QUEUE_NAMES.SYNTHESIS;
    if (agentName === 'supervisor-agent') return QUEUE_NAMES.SUPERVISOR;
    return QUEUE_NAMES.PRIORITY; // Default high-priority queue
  }
  
  private calculatePriority(message: A2AMessage): number {
    // Lower number = higher priority
    const basePriority = message.task.metadata?.priority === 'high' ? 1 :
                        message.task.metadata?.priority === 'low' ? 10 : 5;
    
    // Adjust based on task type
    if (message.task.type === 'error') return 0; // Highest priority
    if (message.task.type === 'handoff') return basePriority - 1;
    
    return basePriority;
  }
  
  // Get queue metrics
  async getMetrics() {
    const metrics: Record<string, any> = {};
    
    for (const [name, queue] of this.queues.entries()) {
      const counts = await queue.getJobCounts();
      metrics[name] = {
        waiting: counts.waiting,
        active: counts.active,
        completed: counts.completed,
        failed: counts.failed,
        delayed: counts.delayed,
      };
    }
    
    return metrics;
  }
  
  // Graceful shutdown
  async close() {
    // Close all workers
    await Promise.all(
      Array.from(this.workers.values()).map(worker => worker.close())
    );
    
    // Close all queues
    await Promise.all(
      Array.from(this.queues.values()).map(queue => queue.close())
    );
    
    await connection.quit();
  }
}

Implements a robust message queue system using BullMQ for asynchronous agent communication with priority routing, batch processing, and automatic retries.

2. Event-Driven Agent Base

// lib/agents/event-agent.ts
import { EventEmitter } from 'events';
import { BaseA2Agent } from './base-agent';
import { A2AMessage } from '@/lib/protocols/a2a-protocol';
import { AgentQueue } from '@/lib/queue/agent-queue';
import { throttle, retry } from 'es-toolkit';

export interface AgentEvent {
  type: 'task_received' | 'task_completed' | 'task_failed' | 'handoff_initiated';
  agentId: string;
  taskId: string;
  timestamp: number;
  data?: any;
}

export abstract class EventDrivenAgent extends BaseA2Agent {
  protected events: EventEmitter;
  protected queue: AgentQueue;
  private subscriptions: Map<string, Function> = new Map();
  
  constructor(name: string, card: any) {
    super(name, card);
    this.events = new EventEmitter();
    this.queue = new AgentQueue();
    
    this.setupEventHandlers();
    this.startQueueWorker();
  }
  
  private setupEventHandlers() {
    // Emit events for monitoring
    this.events.on('task_received', throttle((event: AgentEvent) => {
      console.log(`[${this.name}] Task received:`, event.taskId);
      this.broadcastEvent(event);
    }, 100));
    
    this.events.on('task_completed', (event: AgentEvent) => {
      console.log(`[${this.name}] Task completed:`, event.taskId);
      this.broadcastEvent(event);
    });
    
    this.events.on('task_failed', (event: AgentEvent) => {
      console.error(`[${this.name}] Task failed:`, event.taskId, event.data);
      this.broadcastEvent(event);
    });
    
    this.events.on('handoff_initiated', (event: AgentEvent) => {
      console.log(`[${this.name}] Handoff initiated to:`, event.data?.targetAgent);
      this.broadcastEvent(event);
    });
  }
  
  private startQueueWorker() {
    // Create worker for this agent's queue
    const queueName = `${this.name}-queue`;
    
    this.queue.createWorker(queueName, async (job) => {
      const message = job.data;
      
      // Emit task received event
      this.events.emit('task_received', {
        type: 'task_received',
        agentId: this.name,
        taskId: message.task.id,
        timestamp: Date.now(),
      });
      
      try {
        // Process with retry logic
        const result = await retry(
          () => this.processMessage(message),
          { retries: 3, delay: 1000 }
        );
        
        // Emit completion event
        this.events.emit('task_completed', {
          type: 'task_completed',
          agentId: this.name,
          taskId: message.task.id,
          timestamp: Date.now(),
          data: result,
        });
        
        return result;
      } catch (error) {
        // Emit failure event
        this.events.emit('task_failed', {
          type: 'task_failed',
          agentId: this.name,
          taskId: message.task.id,
          timestamp: Date.now(),
          data: { error: error.message },
        });
        
        throw error;
      }
    });
  }
  
  // Subscribe to events from other agents
  subscribe(agentId: string, eventType: string, handler: Function) {
    const key = `${agentId}:${eventType}`;
    this.subscriptions.set(key, handler);
    
    // Set up SSE or WebSocket connection to receive events
    this.setupEventStream(agentId, eventType);
  }
  
  private async setupEventStream(agentId: string, eventType: string) {
    // In production, use WebSocket or SSE for real-time events
    // For serverless, poll from KV or database
    const pollInterval = setInterval(async () => {
      try {
        const event = await this.fetchLatestEvent(agentId, eventType);
        if (event) {
          const handler = this.subscriptions.get(`${agentId}:${eventType}`);
          if (handler) {
            handler(event);
          }
        }
      } catch (error) {
        console.error('Error polling events:', error);
      }
    }, 1000);
    
    // Store interval for cleanup
    this.subscriptions.set(`${agentId}:${eventType}:interval`, pollInterval);
  }
  
  private async fetchLatestEvent(agentId: string, eventType: string): Promise<AgentEvent | null> {
    // Fetch from KV or database
    // This is a simplified example
    return null;
  }
  
  // Broadcast events to subscribers
  private async broadcastEvent(event: AgentEvent) {
    // In serverless, store in KV with TTL
    const key = `events:${event.agentId}:${event.type}:${event.timestamp}`;
    await fetch('/api/events', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ key, event }),
    });
  }
  
  // Initiate handoff to another agent
  protected async handoffTo(
    targetAgent: string,
    task: any,
    context: Record<string, any>
  ): Promise<void> {
    const handoffMessage: A2AMessage = {
      id: crypto.randomUUID(),
      from: this.name,
      to: targetAgent,
      timestamp: new Date().toISOString(),
      protocol: 'a2a/v1',
      task: {
        id: crypto.randomUUID(),
        type: 'handoff',
        context: {
          ...context,
          previousAgent: this.name,
          handoffReason: task.reason || 'Task delegation',
        },
        payload: task,
      },
    };
    
    // Queue the handoff message
    await this.queue.routeMessage(handoffMessage);
    
    // Emit handoff event
    this.events.emit('handoff_initiated', {
      type: 'handoff_initiated',
      agentId: this.name,
      taskId: handoffMessage.task.id,
      timestamp: Date.now(),
      data: { targetAgent, task },
    });
  }
  
  // Cleanup on shutdown
  async cleanup() {
    // Clear all intervals
    this.subscriptions.forEach((value, key) => {
      if (key.endsWith(':interval')) {
        clearInterval(value as any);
      }
    });
    
    // Remove all event listeners
    this.events.removeAllListeners();
    
    // Close queue connection
    await this.queue.close();
  }
}

Creates an event-driven base agent with pub/sub capabilities, queue integration, automatic retry logic, and event broadcasting for monitoring.

3. Swarm Coordinator

// lib/agents/swarm-coordinator.ts
import { EventDrivenAgent } from './event-agent';
import { A2AMessage } from '@/lib/protocols/a2a-protocol';
import { HumanMessage, AIMessage } from '@langchain/core/messages';
import { partition, chunk, shuffle } from 'es-toolkit';

interface SwarmState {
  activeAgents: Set<string>;
  taskDistribution: Map<string, string[]>; // agent -> taskIds
  consensusResults: Map<string, any[]>; // taskId -> results
  swarmMetrics: {
    totalTasks: number;
    completedTasks: number;
    averageResponseTime: number;
    consensusAgreement: number;
  };
}

export class SwarmCoordinator extends EventDrivenAgent {
  private swarmState: SwarmState = {
    activeAgents: new Set(),
    taskDistribution: new Map(),
    consensusResults: new Map(),
    swarmMetrics: {
      totalTasks: 0,
      completedTasks: 0,
      averageResponseTime: 0,
      consensusAgreement: 0,
    },
  };
  
  constructor() {
    const card = {
      name: 'swarm-coordinator',
      description: 'Coordinates distributed agent swarms for parallel processing',
      url: process.env.VERCEL_URL ? 
        `https://${process.env.VERCEL_URL}/api/agents/swarm` : 
        'http://localhost:3000/api/agents/swarm',
      version: '1.0.0',
      capabilities: {
        streaming: true,
        async: true,
        maxConcurrent: 20,
      },
      skills: [
        {
          id: 'swarm_execute',
          name: 'Swarm Execute',
          description: 'Execute task across multiple agents in parallel',
          inputSchema: { task: 'string', agents: 'number' },
          outputSchema: { consensus: 'any', votes: 'array' },
        },
        {
          id: 'adaptive_routing',
          name: 'Adaptive Routing',
          description: 'Dynamically route tasks based on agent performance',
          inputSchema: { task: 'string' },
          outputSchema: { route: 'string' },
        },
      ],
      authentication: {
        type: 'apiKey',
        config: { header: 'X-Swarm-Key' },
      },
    };
    
    super('swarm-coordinator', card);
    this.initializeSwarm();
  }
  
  private async initializeSwarm() {
    // Discover available agents
    const agents = await this.discoverAgents();
    agents.forEach(agent => this.swarmState.activeAgents.add(agent));
    
    // Subscribe to agent events for monitoring
    for (const agent of agents) {
      this.subscribe(agent, 'task_completed', (event: any) => {
        this.handleAgentCompletion(agent, event);
      });
      
      this.subscribe(agent, 'task_failed', (event: any) => {
        this.handleAgentFailure(agent, event);
      });
    }
  }
  
  protected setupGraph(): void {
    this.graph
      .addNode('analyze_for_swarm', this.analyzeForSwarm.bind(this))
      .addNode('distribute_tasks', this.distributeTasks.bind(this))
      .addNode('monitor_swarm', this.monitorSwarm.bind(this))
      .addNode('achieve_consensus', this.achieveConsensus.bind(this))
      .addNode('adaptive_rebalance', this.adaptiveRebalance.bind(this))
      .addEdge('__start__', 'analyze_for_swarm')
      .addEdge('analyze_for_swarm', 'distribute_tasks')
      .addEdge('distribute_tasks', 'monitor_swarm')
      .addConditionalEdges(
        'monitor_swarm',
        (state: any) => {
          const completion = state.context.completionRate || 0;
          if (completion < 0.5) return 'adaptive_rebalance';
          if (completion >= 0.8) return 'achieve_consensus';
          return 'monitor_swarm'; // Continue monitoring
        },
        {
          'adaptive_rebalance': 'adaptive_rebalance',
          'achieve_consensus': 'achieve_consensus',
          'monitor_swarm': 'monitor_swarm',
        }
      )
      .addEdge('adaptive_rebalance', 'monitor_swarm');
  }
  
  private async analyzeForSwarm(state: any) {
    const lastMessage = state.messages[state.messages.length - 1];
    
    // Determine optimal swarm size and strategy
    const analysis = await this.model.invoke([
      new HumanMessage(`
        Analyze this task for swarm processing:
        1. Can it be parallelized? 
        2. How many agents should process it?
        3. Does it need consensus or first-response-wins?
        
        Task: ${lastMessage.content}
      `)
    ]);
    
    const swarmConfig = this.parseSwarmConfig(analysis.content as string);
    
    // Update metrics
    this.swarmState.swarmMetrics.totalTasks++;
    
    return {
      context: {
        ...state.context,
        swarmConfig,
        originalTask: lastMessage.content,
        startTime: Date.now(),
      },
    };
  }
  
  private async distributeTasks(state: any) {
    const { swarmConfig, originalTask } = state.context;
    const { agentCount, strategy } = swarmConfig;
    
    // Select agents for the swarm
    const availableAgents = Array.from(this.swarmState.activeAgents);
    const selectedAgents = shuffle(availableAgents).slice(0, agentCount);
    
    // Create task variations for diversity
    const taskVariations = await this.createTaskVariations(
      originalTask,
      selectedAgents.length
    );
    
    // Distribute tasks to agents
    const distributions: A2AMessage[] = [];
    
    for (let i = 0; i < selectedAgents.length; i++) {
      const agent = selectedAgents[i];
      const variation = taskVariations[i] || originalTask;
      
      const message: A2AMessage = {
        id: crypto.randomUUID(),
        from: this.name,
        to: agent,
        timestamp: new Date().toISOString(),
        protocol: 'a2a/v1',
        task: {
          id: crypto.randomUUID(),
          type: 'request',
          context: {
            ...state.context,
            swarmId: state.context.sessionId,
            variation: i,
          },
          payload: {
            query: variation,
            strategy,
          },
        },
      };
      
      distributions.push(message);
      
      // Track distribution
      const tasks = this.swarmState.taskDistribution.get(agent) || [];
      tasks.push(message.task.id);
      this.swarmState.taskDistribution.set(agent, tasks);
    }
    
    // Batch route all messages
    await this.queue.batchRoute(distributions);
    
    return {
      context: {
        ...state.context,
        distributions: distributions.map(d => ({
          agent: d.to,
          taskId: d.task.id,
        })),
        expectedResponses: selectedAgents.length,
      },
    };
  }
  
  private async monitorSwarm(state: any) {
    const { distributions, expectedResponses, startTime } = state.context;
    const timeout = 30000; // 30 seconds
    const checkInterval = 500; // Check every 500ms
    
    const responses: any[] = [];
    let iterations = 0;
    
    while (responses.length < expectedResponses * 0.8) { // 80% threshold
      if (Date.now() - startTime > timeout) {
        console.log('Swarm timeout reached');
        break;
      }
      
      // Check consensus results
      for (const dist of distributions) {
        const results = this.swarmState.consensusResults.get(dist.taskId) || [];
        if (results.length > 0 && !responses.find(r => r.taskId === dist.taskId)) {
          responses.push(...results);
        }
      }
      
      // Update completion rate
      const completionRate = responses.length / expectedResponses;
      
      // Emit monitoring event every 10 iterations
      if (iterations % 10 === 0) {
        this.events.emit('swarm_progress', {
          completionRate,
          responseCount: responses.length,
          expectedCount: expectedResponses,
        });
      }
      
      iterations++;
      await new Promise(resolve => setTimeout(resolve, checkInterval));
    }
    
    return {
      context: {
        ...state.context,
        responses,
        completionRate: responses.length / expectedResponses,
        monitoringTime: Date.now() - startTime,
      },
    };
  }
  
  private async achieveConsensus(state: any) {
    const { responses, swarmConfig } = state.context;
    
    if (swarmConfig.strategy === 'first-wins') {
      // Return first valid response
      const firstValid = responses.find((r: any) => r.status === 'success');
      return {
        messages: [new AIMessage(firstValid?.content || 'No valid response')],
        context: {
          ...state.context,
          consensus: firstValid,
          consensusType: 'first-wins',
        },
      };
    }
    
    // Majority voting consensus
    const [successful, failed] = partition(
      responses,
      (r: any) => r.status === 'success'
    );
    
    if (successful.length === 0) {
      return {
        messages: [new AIMessage('Swarm failed to reach consensus')],
        context: {
          ...state.context,
          consensus: null,
          consensusType: 'failed',
        },
      };
    }
    
    // Use LLM to synthesize consensus
    const consensus = await this.model.invoke([
      new HumanMessage(`
        Synthesize these swarm responses into a consensus answer:
        ${JSON.stringify(successful.slice(0, 5))} 
        
        Identify common patterns and create a unified response.
      `)
    ]);
    
    // Calculate agreement score
    const agreementScore = this.calculateAgreement(successful);
    
    // Update metrics
    this.swarmState.swarmMetrics.completedTasks++;
    this.swarmState.swarmMetrics.consensusAgreement = agreementScore;
    
    return {
      messages: [new AIMessage(consensus.content as string)],
      context: {
        ...state.context,
        consensus: consensus.content,
        consensusType: 'majority',
        agreementScore,
        participantCount: successful.length,
      },
    };
  }
  
  private async adaptiveRebalance(state: any) {
    const { distributions, completionRate } = state.context;
    
    // Identify slow or failed agents
    const slowAgents: string[] = [];
    const failedAgents: string[] = [];
    
    for (const dist of distributions) {
      const results = this.swarmState.consensusResults.get(dist.taskId) || [];
      if (results.length === 0) {
        failedAgents.push(dist.agent);
      } else if (results[0]?.responseTime > 10000) {
        slowAgents.push(dist.agent);
      }
    }
    
    // Remove failed agents from active pool
    failedAgents.forEach(agent => {
      this.swarmState.activeAgents.delete(agent);
    });
    
    // Redistribute tasks from failed agents
    if (failedAgents.length > 0) {
      const healthyAgents = Array.from(this.swarmState.activeAgents);
      const redistributions = [];
      
      for (const failedAgent of failedAgents) {
        const tasks = this.swarmState.taskDistribution.get(failedAgent) || [];
        const targetAgent = healthyAgents[Math.floor(Math.random() * healthyAgents.length)];
        
        for (const taskId of tasks) {
          redistributions.push({
            from: failedAgent,
            to: targetAgent,
            taskId,
          });
        }
      }
      
      // Queue redistributed tasks
      await this.redistributeTasks(redistributions, state.context.originalTask);
    }
    
    return {
      context: {
        ...state.context,
        rebalanced: true,
        removedAgents: failedAgents,
        slowAgents,
      },
    };
  }
  
  private async createTaskVariations(task: string, count: number): Promise<string[]> {
    // Create slight variations to get diverse responses
    const prompts = [
      `${task}`,
      `Please help with: ${task}`,
      `I need assistance with: ${task}`,
      `Can you solve: ${task}`,
      `Analyze and respond to: ${task}`,
    ];
    
    return Array(count).fill(0).map((_, i) => prompts[i % prompts.length]);
  }
  
  private async redistributeTasks(
    redistributions: any[],
    originalTask: string
  ) {
    const messages = redistributions.map(r => ({
      id: crypto.randomUUID(),
      from: this.name,
      to: r.to,
      timestamp: new Date().toISOString(),
      protocol: 'a2a/v1' as const,
      task: {
        id: r.taskId,
        type: 'request' as const,
        context: { redistributed: true, originalAgent: r.from },
        payload: { query: originalTask },
      },
    }));
    
    await this.queue.batchRoute(messages);
  }
  
  private calculateAgreement(responses: any[]): number {
    // Simple agreement calculation
    if (responses.length < 2) return 1;
    
    // Compare response similarity (simplified)
    const contents = responses.map(r => r.content?.toLowerCase() || '');
    let agreements = 0;
    let comparisons = 0;
    
    for (let i = 0; i < contents.length - 1; i++) {
      for (let j = i + 1; j < contents.length; j++) {
        comparisons++;
        // Check if responses share common keywords
        const words1 = new Set(contents[i].split(' '));
        const words2 = new Set(contents[j].split(' '));
        const intersection = new Set([...words1].filter(x => words2.has(x)));
        const similarity = intersection.size / Math.max(words1.size, words2.size);
        if (similarity > 0.5) agreements++;
      }
    }
    
    return comparisons > 0 ? agreements / comparisons : 0;
  }
  
  private async discoverAgents(): Promise<string[]> {
    // In production, use service discovery or registry
    return ['research-agent-1', 'research-agent-2', 'analyst-agent-1', 'analyst-agent-2'];
  }
  
  private async handleAgentCompletion(agent: string, event: any) {
    // Store result for consensus
    const results = this.swarmState.consensusResults.get(event.taskId) || [];
    results.push({
      agent,
      content: event.data?.result,
      status: 'success',
      responseTime: event.timestamp - event.startTime,
    });
    this.swarmState.consensusResults.set(event.taskId, results);
    
    // Update metrics
    const responseTimes = results.map(r => r.responseTime);
    this.swarmState.swarmMetrics.averageResponseTime = 
      responseTimes.reduce((a, b) => a + b, 0) / responseTimes.length;
  }
  
  private async handleAgentFailure(agent: string, event: any) {
    // Mark agent as potentially unreliable
    const results = this.swarmState.consensusResults.get(event.taskId) || [];
    results.push({
      agent,
      error: event.data?.error,
      status: 'failed',
      responseTime: event.timestamp - event.startTime,
    });
    this.swarmState.consensusResults.set(event.taskId, results);
    
    // Consider removing agent from active pool if too many failures
    const agentTasks = this.swarmState.taskDistribution.get(agent) || [];
    const failureRate = results.filter(r => r.agent === agent && r.status === 'failed').length / agentTasks.length;
    
    if (failureRate > 0.5) {
      this.swarmState.activeAgents.delete(agent);
      console.log(`Removed agent ${agent} due to high failure rate`);
    }
  }
  
  private parseSwarmConfig(analysis: string): any {
    // Extract configuration from LLM response
    const config = {
      parallelizable: analysis.includes('parallel'),
      agentCount: 3, // Default
      strategy: 'consensus' as 'consensus' | 'first-wins',
    };
    
    // Extract agent count if mentioned
    const countMatch = analysis.match(/(\d+)\s*agents?/i);
    if (countMatch) {
      config.agentCount = Math.min(parseInt(countMatch[1]), 10);
    }
    
    // Determine strategy
    if (analysis.includes('first') || analysis.includes('speed')) {
      config.strategy = 'first-wins';
    }
    
    return config;
  }
  
  protected async processRequest(task: any): Promise<any> {
    const result = await this.graph.invoke({
      messages: [new HumanMessage(task.payload.query)],
      currentAgent: this.name,
      context: {
        ...task.context,
        sessionId: task.id,
      },
      pendingHandoffs: [],
    });
    
    return {
      consensus: result.context.consensus,
      metrics: this.swarmState.swarmMetrics,
      participantAgents: Array.from(this.swarmState.activeAgents),
      agreementScore: result.context.agreementScore,
    };
  }
  
  protected async acceptHandoff(task: any): Promise<any> {
    // Swarm coordinator can accept handoffs for complex parallel tasks
    return this.processRequest(task);
  }
}

Implements a sophisticated swarm coordinator that distributes tasks across multiple agents, monitors progress, achieves consensus, and adaptively rebalances work based on agent performance.

4. Streaming API with Server-Sent Events

// app/api/agents/stream/route.ts
import { A2AMessageSchema } from '@/lib/protocols/a2a-protocol';
import { SwarmCoordinator } from '@/lib/agents/swarm-coordinator';
import { ResearchAgent } from '@/lib/agents/research-agent';

export const runtime = 'nodejs';
export const maxDuration = 300;

// Initialize agents
const swarmCoordinator = new SwarmCoordinator();
const researchAgent = new ResearchAgent();

export async function POST(req: Request) {
  const { message } = await req.json();
  
  // Validate message
  const validated = A2AMessageSchema.parse(message);
  
  // Create SSE stream
  const encoder = new TextEncoder();
  const stream = new TransformStream();
  const writer = stream.writable.getWriter();
  
  // Process in background
  (async () => {
    try {
      // Determine which agent to use
      const agent = validated.to === 'swarm-coordinator' ? 
        swarmCoordinator : researchAgent;
      
      // Subscribe to agent events for streaming
      agent.events.on('task_received', async (event) => {
        await writer.write(encoder.encode(
          `event: status\ndata: ${JSON.stringify({
            type: 'task_received',
            taskId: event.taskId,
            agent: event.agentId,
          })}\n\n`
        ));
      });
      
      agent.events.on('swarm_progress', async (event) => {
        await writer.write(encoder.encode(
          `event: progress\ndata: ${JSON.stringify({
            type: 'swarm_progress',
            completion: event.completionRate,
            responses: event.responseCount,
          })}\n\n`
        ));
      });
      
      agent.events.on('task_completed', async (event) => {
        await writer.write(encoder.encode(
          `event: completed\ndata: ${JSON.stringify({
            type: 'task_completed',
            result: event.data,
          })}\n\n`
        ));
      });
      
      // Process the message
      const result = await agent.processMessage(validated);
      
      // Send final result
      await writer.write(encoder.encode(
        `event: result\ndata: ${JSON.stringify(result)}\n\n`
      ));
      
      // Send done signal
      await writer.write(encoder.encode('event: done\ndata: {}\n\n'));
    } catch (error) {
      await writer.write(encoder.encode(
        `event: error\ndata: ${JSON.stringify({
          error: error.message,
        })}\n\n`
      ));
    } finally {
      await writer.close();
    }
  })();
  
  return new Response(stream.readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
      'X-Content-Type-Options': 'nosniff',
    },
  });
}

Creates a streaming API endpoint using Server-Sent Events to provide real-time updates on agent processing, swarm progress, and task completion.

5. React Hook for Agent Communication

// hooks/useA2ACommunication.ts
import { useState, useCallback, useEffect, useRef } from 'react';
import { useMutation } from '@tanstack/react-query';
import { A2AMessage } from '@/lib/protocols/a2a-protocol';
import { debounce } from 'es-toolkit';

interface UseA2AOptions {
  targetAgent: string;
  streaming?: boolean;
  onProgress?: (progress: any) => void;
  onComplete?: (result: any) => void;
  onError?: (error: any) => void;
}

interface A2AState {
  isProcessing: boolean;
  progress: number;
  events: any[];
  result: any;
  error: any;
}

export function useA2ACommunication(options: UseA2AOptions) {
  const [state, setState] = useState<A2AState>({
    isProcessing: false,
    progress: 0,
    events: [],
    result: null,
    error: null,
  });
  
  const eventSourceRef = useRef<EventSource | null>(null);
  const abortControllerRef = useRef<AbortController | null>(null);
  
  // Send message via traditional POST
  const sendMessage = useMutation({
    mutationFn: async (query: string) => {
      const message: A2AMessage = {
        id: crypto.randomUUID(),
        from: 'user',
        to: options.targetAgent,
        timestamp: new Date().toISOString(),
        protocol: 'a2a/v1',
        task: {
          id: crypto.randomUUID(),
          type: 'request',
          context: {},
          payload: { query },
        },
      };
      
      const response = await fetch(`/api/agents/${options.targetAgent}`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(message),
      });
      
      if (!response.ok) {
        throw new Error('Failed to send message');
      }
      
      return response.json();
    },
    onSuccess: (data) => {
      setState(prev => ({ ...prev, result: data, isProcessing: false }));
      options.onComplete?.(data);
    },
    onError: (error) => {
      setState(prev => ({ ...prev, error, isProcessing: false }));
      options.onError?.(error);
    },
  });
  
  // Send message with streaming
  const sendStreamingMessage = useCallback(
    debounce(async (query: string) => {
      // Clean up previous stream
      if (eventSourceRef.current) {
        eventSourceRef.current.close();
      }
      
      setState(prev => ({
        ...prev,
        isProcessing: true,
        progress: 0,
        events: [],
        error: null,
      }));
      
      const message: A2AMessage = {
        id: crypto.randomUUID(),
        from: 'user',
        to: options.targetAgent,
        timestamp: new Date().toISOString(),
        protocol: 'a2a/v1',
        task: {
          id: crypto.randomUUID(),
          type: 'request',
          context: {},
          payload: { query },
        },
      };
      
      // Use fetch with streaming
      abortControllerRef.current = new AbortController();
      
      try {
        const response = await fetch('/api/agents/stream', {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify({ message }),
          signal: abortControllerRef.current.signal,
        });
        
        if (!response.ok) {
          throw new Error('Stream request failed');
        }
        
        const reader = response.body?.getReader();
        const decoder = new TextDecoder();
        
        if (!reader) {
          throw new Error('No response body');
        }
        
        while (true) {
          const { done, value } = await reader.read();
          if (done) break;
          
          const chunk = decoder.decode(value);
          const lines = chunk.split('\n');
          
          for (const line of lines) {
            if (line.startsWith('event:')) {
              const eventType = line.substring(6).trim();
              const dataLine = lines[lines.indexOf(line) + 1];
              
              if (dataLine?.startsWith('data:')) {
                const data = JSON.parse(dataLine.substring(5));
                
                switch (eventType) {
                  case 'progress':
                    setState(prev => ({
                      ...prev,
                      progress: data.completion * 100,
                    }));
                    options.onProgress?.(data);
                    break;
                    
                  case 'status':
                    setState(prev => ({
                      ...prev,
                      events: [...prev.events, data],
                    }));
                    break;
                    
                  case 'result':
                    setState(prev => ({
                      ...prev,
                      result: data,
                      isProcessing: false,
                    }));
                    options.onComplete?.(data);
                    break;
                    
                  case 'error':
                    setState(prev => ({
                      ...prev,
                      error: data.error,
                      isProcessing: false,
                    }));
                    options.onError?.(data.error);
                    break;
                    
                  case 'done':
                    setState(prev => ({
                      ...prev,
                      isProcessing: false,
                    }));
                    break;
                }
              }
            }
          }
        }
      } catch (error: any) {
        if (error.name !== 'AbortError') {
          setState(prev => ({
            ...prev,
            error,
            isProcessing: false,
          }));
          options.onError?.(error);
        }
      }
    }, 500),
    [options]
  );
  
  // Send message based on mode
  const send = useCallback(
    async (query: string) => {
      if (options.streaming) {
        await sendStreamingMessage(query);
      } else {
        sendMessage.mutate(query);
      }
    },
    [options.streaming, sendMessage, sendStreamingMessage]
  );
  
  // Cancel ongoing operation
  const cancel = useCallback(() => {
    if (eventSourceRef.current) {
      eventSourceRef.current.close();
      eventSourceRef.current = null;
    }
    
    if (abortControllerRef.current) {
      abortControllerRef.current.abort();
      abortControllerRef.current = null;
    }
    
    setState(prev => ({
      ...prev,
      isProcessing: false,
    }));
  }, []);
  
  // Cleanup on unmount
  useEffect(() => {
    return () => {
      cancel();
    };
  }, [cancel]);
  
  return {
    send,
    cancel,
    state,
    isProcessing: state.isProcessing || sendMessage.isPending,
  };
}

Provides a React hook for seamless agent communication with support for both traditional request-response and streaming modes, progress tracking, and cancellation.

6. Dashboard for Multi-Agent Monitoring

// app/dashboard/page.tsx
'use client';

import { useState, useEffect } from 'react';
import { useA2ACommunication } from '@/hooks/useA2ACommunication';
import { useQuery } from '@tanstack/react-query';
import { Line, Bar } from 'react-chartjs-2';

async function fetchAgentMetrics() {
  const response = await fetch('/api/agents/metrics');
  if (!response.ok) throw new Error('Failed to fetch metrics');
  return response.json();
}

export default function Dashboard() {
  const [selectedAgent, setSelectedAgent] = useState('swarm-coordinator');
  const [query, setQuery] = useState('');
  const [progressData, setProgressData] = useState<any[]>([]);
  
  // Fetch real-time metrics
  const { data: metrics, refetch } = useQuery({
    queryKey: ['agent-metrics'],
    queryFn: fetchAgentMetrics,
    refetchInterval: 5000, // Refresh every 5 seconds
  });
  
  // Setup agent communication with streaming
  const { send, cancel, state, isProcessing } = useA2ACommunication({
    targetAgent: selectedAgent,
    streaming: true,
    onProgress: (progress) => {
      setProgressData(prev => [...prev, {
        time: Date.now(),
        completion: progress.completion * 100,
        responses: progress.responses,
      }]);
    },
  });
  
  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (query.trim()) {
      setProgressData([]);
      send(query);
    }
  };
  
  return (
    <div className="min-h-screen bg-base-200 p-4">
      <div className="container mx-auto">
        <h1 className="text-4xl font-bold mb-8">Multi-Agent Dashboard</h1>
        
        {/* Agent Status Grid */}
        <div className="grid grid-cols-1 md:grid-cols-3 gap-4 mb-8">
          {metrics?.agents?.map((agent: any) => (
            <div key={agent.name} className="card bg-base-100 shadow-xl">
              <div className="card-body">
                <h2 className="card-title">{agent.name}</h2>
                <div className="stats stats-vertical">
                  <div className="stat">
                    <div className="stat-title">Status</div>
                    <div className={`stat-value text-sm ${
                      agent.status === 'active' ? 'text-success' : 'text-warning'
                    }`}>
                      {agent.status}
                    </div>
                  </div>
                  <div className="stat">
                    <div className="stat-title">Tasks</div>
                    <div className="stat-value text-sm">{agent.taskCount}</div>
                  </div>
                  <div className="stat">
                    <div className="stat-title">Avg Response</div>
                    <div className="stat-value text-sm">{agent.avgResponseTime}ms</div>
                  </div>
                </div>
              </div>
            </div>
          ))}
        </div>
        
        {/* Queue Metrics */}
        {metrics?.queues && (
          <div className="card bg-base-100 shadow-xl mb-8">
            <div className="card-body">
              <h2 className="card-title">Queue Status</h2>
              <div className="overflow-x-auto">
                <table className="table">
                  <thead>
                    <tr>
                      <th>Queue</th>
                      <th>Waiting</th>
                      <th>Active</th>
                      <th>Completed</th>
                      <th>Failed</th>
                    </tr>
                  </thead>
                  <tbody>
                    {Object.entries(metrics.queues).map(([name, stats]: [string, any]) => (
                      <tr key={name}>
                        <td>{name}</td>
                        <td>{stats.waiting}</td>
                        <td>{stats.active}</td>
                        <td>{stats.completed}</td>
                        <td>{stats.failed}</td>
                      </tr>
                    ))}
                  </tbody>
                </table>
              </div>
            </div>
          </div>
        )}
        
        {/* Agent Interaction */}
        <div className="card bg-base-100 shadow-xl mb-8">
          <div className="card-body">
            <h2 className="card-title">Test Agent Communication</h2>
            
            {/* Agent selector */}
            <select 
              className="select select-bordered w-full max-w-xs mb-4"
              value={selectedAgent}
              onChange={(e) => setSelectedAgent(e.target.value)}
            >
              <option value="swarm-coordinator">Swarm Coordinator</option>
              <option value="supervisor-agent">Supervisor Agent</option>
              <option value="research-agent">Research Agent</option>
            </select>
            
            {/* Query form */}
            <form onSubmit={handleSubmit} className="join w-full">
              <input
                type="text"
                value={query}
                onChange={(e) => setQuery(e.target.value)}
                placeholder="Enter your query..."
                className="input input-bordered join-item flex-1"
                disabled={isProcessing}
              />
              <button 
                type="submit"
                className="btn btn-primary join-item"
                disabled={isProcessing || !query.trim()}
              >
                {isProcessing ? 'Processing...' : 'Send'}
              </button>
              {isProcessing && (
                <button
                  type="button"
                  onClick={cancel}
                  className="btn btn-error join-item"
                >
                  Cancel
                </button>
              )}
            </form>
            
            {/* Progress visualization */}
            {progressData.length > 0 && (
              <div className="mt-4">
                <progress 
                  className="progress progress-primary w-full"
                  value={progressData[progressData.length - 1]?.completion || 0}
                  max="100"
                ></progress>
                <p className="text-sm mt-2">
                  Progress: {Math.round(progressData[progressData.length - 1]?.completion || 0)}%
                </p>
              </div>
            )}
            
            {/* Events log */}
            {state.events.length > 0 && (
              <div className="mt-4">
                <h3 className="font-bold mb-2">Processing Events:</h3>
                <div className="max-h-40 overflow-y-auto bg-base-200 p-2 rounded">
                  {state.events.map((event, idx) => (
                    <div key={idx} className="text-xs mb-1">
                      [{event.type}] {event.agent}: Task {event.taskId}
                    </div>
                  ))}
                </div>
              </div>
            )}
            
            {/* Result display */}
            {state.result && (
              <div className="alert alert-success mt-4">
                <div>
                  <h3 className="font-bold">Result:</h3>
                  <pre className="text-sm">{JSON.stringify(state.result, null, 2)}</pre>
                </div>
              </div>
            )}
            
            {/* Error display */}
            {state.error && (
              <div className="alert alert-error mt-4">
                <div>
                  <h3 className="font-bold">Error:</h3>
                  <p>{state.error.message || 'An error occurred'}</p>
                </div>
              </div>
            )}
          </div>
        </div>
      </div>
    </div>
  );
}

Creates a comprehensive dashboard for monitoring multi-agent systems with real-time metrics, queue status, agent testing interface, and progress visualization.

7. Deployment Configuration

// vercel.json
{
  "functions": {
    "app/api/agents/*/route.ts": {
      "maxDuration": 300
    },
    "app/api/agents/stream/route.ts": {
      "maxDuration": 300
    }
  },
  "env": {
    "REDIS_URL": "@redis-url",
    "GOOGLE_API_KEY": "@google-api-key",
    "SUPERVISOR_API_KEY": "@supervisor-api-key"
  },
  "crons": [
    {
      "path": "/api/agents/health",
      "schedule": "*/5 * * * *"
    }
  ]
}

Configures Vercel deployment with extended timeouts for agent operations, environment variables, and health check crons.

Conclusion

Inter-agent Communication (A2A) represents a fundamental shift in how we build AI systems, moving from isolated agents to collaborative swarms that can tackle complex, multi-faceted problems. By leveraging TypeScript's type safety, LangGraph's orchestration capabilities, and Vercel's serverless infrastructure with 800-second execution times, developers can now build production-ready multi-agent systems that were technically impossible just two years ago.

The patterns demonstrated here—from simple handoff mechanisms to sophisticated swarm coordination with consensus algorithms—provide a foundation for building scalable, resilient agent networks. The combination of event-driven architectures, message queues, and streaming interfaces ensures these systems can handle real-world production loads while maintaining observability and control.

As the ecosystem continues to evolve with standardized protocols like Google's A2A and Anthropic's MCP, the ability for agents built with different frameworks to seamlessly collaborate will unlock new possibilities for AI applications. The future of AI lies not in singular, monolithic models, but in orchestrated networks of specialized agents working in concert—and the tools and patterns to build these systems are available today.