草案 代理设计模式 - 代理间通信(A2A)
学习如何使用 TypeScript、LangGraph 和 Vercel 的无服务器平台构建具有代理间通信的生产就绪多代理系统。
思维模型:管弦乐队指挥模式
将 A2A 通信想象成一个分布式管弦乐队,每个代理都是专业的音乐家。指挥(监管代理)不会演奏每种乐器,但会协调演出。音乐家(专业代理)可以直接或通过指挥传递乐谱(上下文/任务)。有些曲目需要所有音乐家一起演奏(同步),而其他曲目允许独奏家独立演奏并在准备好时加入(异步)。场地(Vercel 平台)提供音响和基础设施,而乐谱(LangGraph)定义音乐的流程。正如音乐家使用标准记谱法进行交流,无论他们使用什么乐器,A2A 协议都能让使用不同框架构建的代理无缝协作。
基础示例:简单的代理交接模式
1. 包含核心依赖项的项目设置
# 使用 TypeScript 初始化 NextJS 15 项目
npx create-next-app@latest a2a-agents --typescript --tailwind --app --no-src-dir
cd a2a-agents
# 安装 LangGraph 和代理通信包
npm install @langchain/langgraph @langchain/core @langchain/community
npm install @langchain/google-genai zod uuid
npm install @tanstack/react-query es-toolkit daisyui
npm install @vercel/kv bullmq ioredis
初始化一个针对多代理开发优化的 Next.js 15 项目,包含代理编排和通信所需的所有必要依赖项。
2. 定义代理通信协议
// lib/protocols/a2a-protocol.ts
import { z } from 'zod';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
import { pipe, map, filter } from 'es-toolkit';
// 定义 A2A 消息模式
export const A2AMessageSchema = z.object({
id: z.string().uuid(),
from: z.string(),
to: z.string(),
timestamp: z.string().datetime(),
protocol: z.literal('a2a/v1'),
task: z.object({
id: z.string().uuid(),
type: z.enum(['request', 'response', 'handoff', 'error']),
skill: z.string().optional(),
context: z.record(z.any()),
payload: z.any(),
metadata: z.object({
priority: z.enum(['low', 'medium', 'high']).default('medium'),
timeout: z.number().default(30000),
retries: z.number().default(3),
}).optional(),
}),
});
export type A2AMessage = z.infer<typeof A2AMessageSchema>;
// 代理卡定义(代理能力)
export const AgentCardSchema = z.object({
name: z.string(),
description: z.string(),
url: z.string().url(),
version: z.string(),
capabilities: z.object({
streaming: z.boolean(),
async: z.boolean(),
maxConcurrent: z.number(),
}),
skills: z.array(z.object({
id: z.string(),
name: z.string(),
description: z.string(),
inputSchema: z.any(),
outputSchema: z.any(),
})),
authentication: z.object({
type: z.enum(['none', 'apiKey', 'oauth2']),
config: z.record(z.any()).optional(),
}),
});
export type AgentCard = z.infer<typeof AgentCardSchema>;
// 使用 es-toolkit 的消息工厂
export class A2AMessageFactory {
static createHandoffMessage(
from: string,
to: string,
task: any,
context: Record<string, any>
): A2AMessage {
return pipe(
{
id: crypto.randomUUID(),
from,
to,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1' as const,
task: {
id: crypto.randomUUID(),
type: 'handoff' as const,
context,
payload: task,
},
},
(msg) => A2AMessageSchema.parse(msg)
);
}
}
建立具有消息模式、能力发现的代理卡和创建标准化消息的工厂方法的类型安全 A2A 协议。
3. 创建具有通信接口的基础代理
// lib/agents/base-agent.ts
import { StateGraph, StateGraphArgs } from '@langchain/langgraph';
import { BaseMessage } from '@langchain/core/messages';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { A2AMessage, AgentCard } from '@/lib/protocols/a2a-protocol';
import { debounce, throttle } from 'es-toolkit';
export interface AgentState {
messages: BaseMessage[];
currentAgent: string;
context: Record<string, any>;
pendingHandoffs: A2AMessage[];
}
export abstract class BaseA2Agent {
protected name: string;
protected model: ChatGoogleGenerativeAI;
protected graph: StateGraph<AgentState>;
protected card: AgentCard;
constructor(name: string, card: AgentCard) {
this.name = name;
this.card = card;
// 使用 Gemini Flash 进行快速响应
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.7,
streaming: true,
maxOutputTokens: 2048,
});
// 为代理工作流设置状态图
this.graph = new StateGraph<AgentState>({
channels: {
messages: {
value: (old: BaseMessage[], next: BaseMessage[]) => [...old, ...next],
},
currentAgent: {
value: (old: string, next: string) => next,
},
context: {
value: (old: Record<string, any>, next: Record<string, any>) => ({
...old,
...next,
}),
},
pendingHandoffs: {
value: (old: A2AMessage[], next: A2AMessage[]) => [...old, ...next],
},
},
});
this.setupGraph();
}
// 子类实现的抽象方法
protected abstract setupGraph(): void;
// 处理传入的 A2A 消息
async processMessage(message: A2AMessage): Promise<A2AMessage> {
const startTime = Date.now();
try {
// 验证消息是否发给此代理
if (message.to !== this.name) {
throw new Error(`消息不是发给此代理的: ${message.to}`);
}
// 根据任务类型处理
const result = await this.handleTask(message.task);
// 创建响应消息
return {
id: crypto.randomUUID(),
from: this.name,
to: message.from,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1',
task: {
id: message.task.id,
type: 'response',
context: {
...message.task.context,
processingTime: Date.now() - startTime,
},
payload: result,
},
};
} catch (error) {
return this.createErrorResponse(message, error);
}
}
// 处理不同的任务类型
protected async handleTask(task: A2AMessage['task']): Promise<any> {
switch (task.type) {
case 'request':
return await this.processRequest(task);
case 'handoff':
return await this.acceptHandoff(task);
default:
throw new Error(`未知的任务类型: ${task.type}`);
}
}
protected abstract processRequest(task: A2AMessage['task']): Promise<any>;
protected abstract acceptHandoff(task: A2AMessage['task']): Promise<any>;
// 节流的错误响应创建
protected createErrorResponse = throttle(
(message: A2AMessage, error: any): A2AMessage => {
return {
id: crypto.randomUUID(),
from: this.name,
to: message.from,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1',
task: {
id: message.task.id,
type: 'error',
context: message.task.context,
payload: {
error: error.message || '未知错误',
stack: process.env.NODE_ENV === 'development' ? error.stack : undefined,
},
},
};
},
1000 // 节流错误响应以防止垃圾邮件
);
}
为代理提供抽象基类,具有内置 A2A 消息处理、通过 LangGraph 进行状态管理以及使用 es-toolkit 实用程序进行错误处理的功能。
4. 实现专门的代理
// lib/agents/research-agent.ts
import { BaseA2Agent } from './base-agent';
import { WebBrowser } from '@langchain/community/tools/webbrowser';
import { GoogleGenerativeAIEmbeddings } from '@langchain/google-genai';
import { HumanMessage, AIMessage } from '@langchain/core/messages';
import { groupBy, chunk } from 'es-toolkit';
export class ResearchAgent extends BaseA2Agent {
private browser: WebBrowser;
private embeddings: GoogleGenerativeAIEmbeddings;
constructor() {
const card = {
name: 'research-agent',
description: '专门从事网络研究和信息收集',
url: process.env.VERCEL_URL ?
`https://${process.env.VERCEL_URL}/api/agents/research` :
'http://localhost:3000/api/agents/research',
version: '1.0.0',
capabilities: {
streaming: true,
async: true,
maxConcurrent: 5,
},
skills: [
{
id: 'web-search',
name: '网络搜索',
description: '从网络搜索和提取信息',
inputSchema: { query: 'string' },
outputSchema: { results: 'array' },
},
{
id: 'summarize',
name: '总结',
description: '创建研究发现的简洁摘要',
inputSchema: { content: 'string' },
outputSchema: { summary: 'string' },
},
],
authentication: {
type: 'apiKey',
config: { header: 'X-API-Key' },
},
};
super('research-agent', card);
this.embeddings = new GoogleGenerativeAIEmbeddings({
modelName: 'embedding-001',
});
this.browser = new WebBrowser({
model: this.model,
embeddings: this.embeddings
});
}
protected setupGraph(): void {
// 定义研究工作流
this.graph
.addNode('analyze', this.analyzeRequest.bind(this))
.addNode('search', this.performSearch.bind(this))
.addNode('synthesize', this.synthesizeResults.bind(this))
.addNode('decide_handoff', this.decideHandoff.bind(this))
.addEdge('__start__', 'analyze')
.addEdge('analyze', 'search')
.addEdge('search', 'synthesize')
.addEdge('synthesize', 'decide_handoff');
}
private async analyzeRequest(state: any) {
const lastMessage = state.messages[state.messages.length - 1];
const analysis = await this.model.invoke([
new HumanMessage(`分析此请求并识别关键搜索词: ${lastMessage.content}`)
]);
return {
context: {
...state.context,
searchTerms: this.extractSearchTerms(analysis.content as string),
},
};
}
private async performSearch(state: any) {
const { searchTerms } = state.context;
// 批量搜索查询以提高效率
const searchBatches = chunk(searchTerms, 3);
const results = [];
for (const batch of searchBatches) {
const batchResults = await Promise.all(
batch.map(term => this.browser.invoke(term))
);
results.push(...batchResults);
}
return {
context: {
...state.context,
searchResults: results,
},
};
}
private async synthesizeResults(state: any) {
const { searchResults } = state.context;
// 按相关性对结果进行分组
const grouped = groupBy(searchResults, (result: any) =>
result.relevance > 0.8 ? 'high' : result.relevance > 0.5 ? 'medium' : 'low'
);
const synthesis = await this.model.invoke([
new HumanMessage(`将这些搜索结果综合成一个全面的答案:
${JSON.stringify(grouped.high || [])}`)
]);
return {
messages: [new AIMessage(synthesis.content as string)],
context: {
...state.context,
synthesis: synthesis.content,
confidence: grouped.high ? 'high' : 'medium',
},
};
}
private async decideHandoff(state: any) {
const { confidence, synthesis } = state.context;
// 决定是否需要交接给另一个代理
if (confidence === 'low' || synthesis.includes('需要更多分析')) {
return {
pendingHandoffs: [{
id: crypto.randomUUID(),
from: this.name,
to: 'analyst-agent',
timestamp: new Date().toISOString(),
protocol: 'a2a/v1' as const,
task: {
id: crypto.randomUUID(),
type: 'handoff' as const,
context: state.context,
payload: {
request: '需要深入分析',
preliminaryFindings: synthesis,
},
},
}],
};
}
return { pendingHandoffs: [] };
}
protected async processRequest(task: any): Promise<any> {
const result = await this.graph.invoke({
messages: [new HumanMessage(task.payload.query)],
currentAgent: this.name,
context: task.context || {},
pendingHandoffs: [],
});
return {
results: result.messages[result.messages.length - 1].content,
handoffs: result.pendingHandoffs,
};
}
protected async acceptHandoff(task: any): Promise<any> {
// 处理来自另一个代理的交接
return this.processRequest(task);
}
private extractSearchTerms(analysis: string): string[] {
// 简单的提取逻辑 - 在生产中使用 NLP
return analysis.match(/["'](.*?)["']/g)?.map(term =>
term.replace(/["']/g, '')
) || [];
}
}
实现一个研究代理,具有网络浏览功能、用于提高效率的搜索批处理以及基于置信度水平的智能交接决策。
5. 创建用于编排的监管代理
// lib/agents/supervisor-agent.ts
import { BaseA2Agent } from './base-agent';
import { A2AMessage, A2AMessageFactory } from '@/lib/protocols/a2a-protocol';
import { HumanMessage, AIMessage } from '@langchain/core/messages';
import { sortBy, uniqBy } from 'es-toolkit';
import { kv } from '@vercel/kv';
interface AgentRegistry {
[key: string]: {
card: any;
endpoint: string;
status: 'active' | 'inactive' | 'busy';
lastSeen: number;
};
}
export class SupervisorAgent extends BaseA2Agent {
private agentRegistry: AgentRegistry = {};
private taskQueue: A2AMessage[] = [];
constructor() {
const card = {
name: 'supervisor-agent',
description: '编排和协调多个专门代理',
url: process.env.VERCEL_URL ?
`https://${process.env.VERCEL_URL}/api/agents/supervisor` :
'http://localhost:3000/api/agents/supervisor',
version: '1.0.0',
capabilities: {
streaming: true,
async: true,
maxConcurrent: 10,
},
skills: [
{
id: 'orchestrate',
name: '编排',
description: '协调多个代理完成复杂任务',
inputSchema: { task: 'string', agents: 'array' },
outputSchema: { result: 'any', trace: 'array' },
},
{
id: 'delegate',
name: '委派',
description: '将任务分配给适当的专门代理',
inputSchema: { task: 'string' },
outputSchema: { assignedTo: 'string', taskId: 'string' },
},
],
authentication: {
type: 'apiKey',
config: { header: 'X-Supervisor-Key' },
},
};
super('supervisor-agent', card);
this.initializeRegistry();
}
private async initializeRegistry() {
// 从 Vercel KV 加载代理注册表
try {
const registry = await kv.get<AgentRegistry>('agent-registry');
if (registry) {
this.agentRegistry = registry;
}
} catch (error) {
console.log('没有现有注册表,从头开始');
}
// 注册默认代理
this.registerAgent('research-agent', {
card: { /* 研究代理卡 */ },
endpoint: '/api/agents/research',
status: 'active',
lastSeen: Date.now(),
});
this.registerAgent('analyst-agent', {
card: { /* 分析师代理卡 */ },
endpoint: '/api/agents/analyst',
status: 'active',
lastSeen: Date.now(),
});
// 持久化注册表
await kv.set('agent-registry', this.agentRegistry);
}
protected setupGraph(): void {
this.graph
.addNode('analyze_task', this.analyzeTask.bind(this))
.addNode('select_agents', this.selectAgents.bind(this))
.addNode('delegate_tasks', this.delegateTasks.bind(this))
.addNode('monitor_progress', this.monitorProgress.bind(this))
.addNode('aggregate_results', this.aggregateResults.bind(this))
.addEdge('__start__', 'analyze_task')
.addEdge('analyze_task', 'select_agents')
.addEdge('select_agents', 'delegate_tasks')
.addEdge('delegate_tasks', 'monitor_progress')
.addEdge('monitor_progress', 'aggregate_results');
}
private async analyzeTask(state: any) {
const lastMessage = state.messages[state.messages.length - 1];
// 使用 LLM 理解任务要求
const analysis = await this.model.invoke([
new HumanMessage(`
分析此任务并确定:
1. 需要什么类型的专业知识
2. 是否需要顺序或并行处理
3. 预估复杂度(简单/中等/复杂)
任务: ${lastMessage.content}
`)
]);
return {
context: {
...state.context,
taskAnalysis: this.parseTaskAnalysis(analysis.content as string),
originalTask: lastMessage.content,
},
};
}
private async selectAgents(state: any) {
const { taskAnalysis } = state.context;
// 获取按相关性排序的活动代理
const activeAgents = Object.entries(this.agentRegistry)
.filter(([_, agent]) => agent.status === 'active')
.map(([name, agent]) => ({ name, ...agent }));
// 根据任务要求对代理进行评分
const scoredAgents = activeAgents.map(agent => ({
...agent,
score: this.calculateAgentScore(agent, taskAnalysis),
}));
// 为任务选择顶级代理
const selectedAgents = sortBy(scoredAgents, 'score')
.reverse()
.slice(0, taskAnalysis.complexity === 'complex' ? 3 : 2);
return {
context: {
...state.context,
selectedAgents: selectedAgents.map(a => a.name),
},
};
}
private async delegateTasks(state: any) {
const { selectedAgents, taskAnalysis, originalTask } = state.context;
const delegatedTasks: A2AMessage[] = [];
for (const agentName of selectedAgents) {
const message = A2AMessageFactory.createHandoffMessage(
this.name,
agentName,
{
task: originalTask,
requirements: taskAnalysis,
deadline: Date.now() + 30000, // 30秒截止期限
},
state.context
);
delegatedTasks.push(message);
this.taskQueue.push(message);
}
// 在 KV 中存储任务委派以进行跟踪
await kv.set(`tasks:${state.context.sessionId}`, delegatedTasks, {
ex: 3600, // 1小时后过期
});
return {
context: {
...state.context,
delegatedTasks: delegatedTasks.map(t => t.id),
},
};
}
private async monitorProgress(state: any) {
const { delegatedTasks } = state.context;
const responses: any[] = [];
const timeout = 30000; // 30秒
const startTime = Date.now();
// 带超时的响应轮询
while (responses.length < delegatedTasks.length) {
if (Date.now() - startTime > timeout) {
console.log('等待代理响应超时');
break;
}
// 在 KV 中检查响应
for (const taskId of delegatedTasks) {
const response = await kv.get(`response:${taskId}`);
if (response && !responses.find(r => r.taskId === taskId)) {
responses.push(response);
}
}
// 在下次轮询前等待
await new Promise(resolve => setTimeout(resolve, 1000));
}
return {
context: {
...state.context,
agentResponses: responses,
},
};
}
private async aggregateResults(state: any) {
const { agentResponses } = state.context;
// 删除重复信息
const uniqueResponses = uniqBy(agentResponses, (r: any) =>
JSON.stringify(r.payload)
);
// 使用 LLM 综合所有响应
const synthesis = await this.model.invoke([
new HumanMessage(`
将这些代理响应综合成一个全面的答案:
${JSON.stringify(uniqueResponses)}
创建一个结合所有见解的统一响应。
`)
]);
return {
messages: [new AIMessage(synthesis.content as string)],
context: {
...state.context,
finalResult: synthesis.content,
contributingAgents: uniqueResponses.map((r: any) => r.from),
},
};
}
private registerAgent(name: string, info: any) {
this.agentRegistry[name] = info;
}
private parseTaskAnalysis(analysis: string): any {
// 简单解析 - 在生产中使用结构化输出
return {
expertise: analysis.includes('研究') ? ['research'] : ['general'],
processing: analysis.includes('并行') ? 'parallel' : 'sequential',
complexity: analysis.includes('复杂') ? 'complex' : 'simple',
};
}
private calculateAgentScore(agent: any, taskAnalysis: any): number {
// 简单的评分算法
let score = 0;
// 检查代理是否具有所需技能
if (agent.card?.skills) {
score += agent.card.skills.length * 10;
}
// 优先选择最近活动的代理
const hoursSinceActive = (Date.now() - agent.lastSeen) / 3600000;
score -= hoursSinceActive * 5;
// 如果代理匹配专业知识,则提高分数
if (taskAnalysis.expertise?.some((exp: string) =>
agent.name.includes(exp)
)) {
score += 50;
}
return Math.max(0, score);
}
protected async processRequest(task: any): Promise<any> {
const result = await this.graph.invoke({
messages: [new HumanMessage(task.payload.query)],
currentAgent: this.name,
context: {
...task.context,
sessionId: task.id,
},
pendingHandoffs: [],
});
return {
result: result.context.finalResult,
trace: result.context.contributingAgents,
metrics: {
totalAgents: result.context.selectedAgents.length,
processingTime: Date.now() - result.context.startTime,
},
};
}
protected async acceptHandoff(task: any): Promise<any> {
// 监管者通常不接受交接
return { error: '监管者不接受交接' };
}
}
实现一个监管代理,分析任务、选择适当的代理、委派工作、监控进度并使用 Vercel KV 进行状态持久化来聚合结果。
6. 代理端点的 API 路由
// app/api/agents/supervisor/route.ts
import { SupervisorAgent } from '@/lib/agents/supervisor-agent';
import { A2AMessageSchema } from '@/lib/protocols/a2a-protocol';
import { NextResponse } from 'next/server';
import { kv } from '@vercel/kv';
export const runtime = 'nodejs';
export const maxDuration = 300;
const supervisor = new SupervisorAgent();
export async function POST(req: Request) {
try {
// 解析和验证传入的 A2A 消息
const rawMessage = await req.json();
const message = A2AMessageSchema.parse(rawMessage);
// 检查身份验证
const apiKey = req.headers.get('X-Supervisor-Key');
if (!apiKey || apiKey !== process.env.SUPERVISOR_API_KEY) {
return NextResponse.json({ error: '未授权' }, { status: 401 });
}
// 使用监管器处理消息
const response = await supervisor.processMessage(message);
// 存储响应以供异步检索
await kv.set(`response:${message.task.id}`, response, {
ex: 3600, // 1小时后过期
});
return NextResponse.json(response);
} catch (error: any) {
console.error('监管器错误:', error);
return NextResponse.json(
{ error: error.message || '内部服务器错误' },
{ status: 500 }
);
}
}
// 代理发现端点
export async function GET(req: Request) {
const agent = new SupervisorAgent();
return NextResponse.json(agent.card);
}
为监管代理创建具有身份验证、消息验证和代理发现支持的 RESTful API 端点。
7. 使用 React Query 的客户端组件
// components/MultiAgentChat.tsx
'use client';
import { useState } from 'react';
import { useMutation, useQuery } from '@tanstack/react-query';
import { debounce } from 'es-toolkit';
interface AgentResponse {
result: string;
trace: string[];
metrics: {
totalAgents: number;
processingTime: number;
};
}
async function sendToSupervisor(query: string): Promise<AgentResponse> {
const message = {
id: crypto.randomUUID(),
from: 'user',
to: 'supervisor-agent',
timestamp: new Date().toISOString(),
protocol: 'a2a/v1',
task: {
id: crypto.randomUUID(),
type: 'request',
context: {},
payload: { query },
},
};
const response = await fetch('/api/agents/supervisor', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Supervisor-Key': process.env.NEXT_PUBLIC_SUPERVISOR_KEY || '',
},
body: JSON.stringify(message),
});
if (!response.ok) {
throw new Error('处理请求失败');
}
const data = await response.json();
return data.task.payload;
}
export default function MultiAgentChat() {
const [input, setInput] = useState('');
const [messages, setMessages] = useState<Array<{
role: 'user' | 'assistant';
content: string;
metadata?: any;
}>>([]);
const mutation = useMutation({
mutationFn: sendToSupervisor,
onSuccess: (data) => {
setMessages(prev => [...prev, {
role: 'assistant',
content: data.result,
metadata: {
agents: data.trace,
processingTime: data.metrics.processingTime,
},
}]);
},
});
const handleSubmit = debounce(async (e: React.FormEvent) => {
e.preventDefault();
if (!input.trim()) return;
const userMessage = input;
setInput('');
setMessages(prev => [...prev, {
role: 'user',
content: userMessage,
}]);
mutation.mutate(userMessage);
}, 500);
return (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">多代理系统</h2>
{/* 聊天消息 */}
<div className="h-96 overflow-y-auto space-y-4 p-4 bg-base-200 rounded-lg">
{messages.map((msg, idx) => (
<div key={idx} className={`chat ${msg.role === 'user' ? 'chat-end' : 'chat-start'}`}>
<div className="chat-bubble">
{msg.content}
{msg.metadata && (
<div className="text-xs mt-2 opacity-70">
处理者: {msg.metadata.agents.join(', ')}
({msg.metadata.processingTime}ms)
</div>
)}
</div>
</div>
))}
{mutation.isPending && (
<div className="chat chat-start">
<div className="chat-bubble">
<span className="loading loading-dots loading-sm"></span>
</div>
</div>
)}
</div>
{/* 输入表单 */}
<form onSubmit={handleSubmit} className="join w-full">
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="向代理提问..."
className="input input-bordered join-item flex-1"
disabled={mutation.isPending}
/>
<button
type="submit"
className="btn btn-primary join-item"
disabled={mutation.isPending || !input.trim()}
>
发送
</button>
</form>
</div>
</div>
);
}
创建一个 React 组件用于与多代理系统交互,通过实时更新显示代理跟踪和处理指标。
高级示例:事件驱动的代理群体
1. 设置消息队列基础设施
// lib/queue/agent-queue.ts
import { Queue, Worker, Job } from 'bullmq';
import Redis from 'ioredis';
import { A2AMessage, A2AMessageSchema } from '@/lib/protocols/a2a-protocol';
import { pipe, groupBy, partition } from 'es-toolkit';
// 为 BullMQ 创建 Redis 连接
const connection = new Redis(process.env.REDIS_URL || 'redis://localhost:6379', {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
// 定义不同代理类型的队列名称
export const QUEUE_NAMES = {
RESEARCH: 'research-queue',
ANALYSIS: 'analysis-queue',
SYNTHESIS: 'synthesis-queue',
SUPERVISOR: 'supervisor-queue',
PRIORITY: 'priority-queue',
} as const;
// 创建类型化队列包装器
export class AgentQueue {
private queues: Map<string, Queue<A2AMessage>> = new Map();
private workers: Map<string, Worker<A2AMessage>> = new Map();
constructor() {
this.initializeQueues();
}
private initializeQueues() {
// 为每种代理类型创建队列
Object.values(QUEUE_NAMES).forEach(queueName => {
this.queues.set(queueName, new Queue<A2AMessage>(queueName, {
connection,
defaultJobOptions: {
removeOnComplete: { count: 100 },
removeOnFail: { count: 500 },
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
},
}));
});
}
// 根据路由规则将消息添加到适当的队列
async routeMessage(message: A2AMessage): Promise<Job<A2AMessage>> {
// 验证消息
const validated = A2AMessageSchema.parse(message);
// 根据目标代理确定队列
const queueName = this.getQueueForAgent(validated.to);
const queue = this.queues.get(queueName);
if (!queue) {
throw new Error(`未找到代理的队列: ${validated.to}`);
}
// 带优先级添加到队列
const priority = this.calculatePriority(validated);
return await queue.add(
`${validated.to}:${validated.task.type}`,
validated,
{
priority,
delay: validated.task.metadata?.delay || 0,
}
);
}
// 高效批量路由多条消息
async batchRoute(messages: A2AMessage[]): Promise<Job<A2AMessage>[]> {
// 按目标队列对消息进行分组
const grouped = groupBy(messages, msg => this.getQueueForAgent(msg.to));
const jobs: Job<A2AMessage>[] = [];
for (const [queueName, msgs] of Object.entries(grouped)) {
const queue = this.queues.get(queueName);
if (!queue) continue;
// 批量添加到队列
const bulkJobs = await queue.addBulk(
msgs.map(msg => ({
name: `${msg.to}:${msg.task.type}`,
data: msg,
opts: {
priority: this.calculatePriority(msg),
},
}))
);
jobs.push(...bulkJobs);
}
return jobs;
}
// 创建用于处理队列的工作器
createWorker(
queueName: string,
processor: (job: Job<A2AMessage>) => Promise<any>
): Worker<A2AMessage> {
const worker = new Worker<A2AMessage>(
queueName,
async (job) => {
console.log(`在 ${queueName} 中处理作业 ${job.id}`);
return await processor(job);
},
{
connection,
concurrency: 5,
limiter: {
max: 10,
duration: 1000,
},
}
);
// 添加事件监听器
worker.on('completed', (job) => {
console.log(`作业 ${job.id} 完成`);
});
worker.on('failed', (job, err) => {
console.error(`作业 ${job?.id} 失败:`, err);
});
this.workers.set(queueName, worker);
return worker;
}
private getQueueForAgent(agentName: string): string {
// 根据代理类型路由到适当的队列
if (agentName.includes('research')) return QUEUE_NAMES.RESEARCH;
if (agentName.includes('analysis')) return QUEUE_NAMES.ANALYSIS;
if (agentName.includes('synthesis')) return QUEUE_NAMES.SYNTHESIS;
if (agentName === 'supervisor-agent') return QUEUE_NAMES.SUPERVISOR;
return QUEUE_NAMES.PRIORITY; // 默认高优先级队列
}
private calculatePriority(message: A2AMessage): number {
// 数字越小 = 优先级越高
const basePriority = message.task.metadata?.priority === 'high' ? 1 :
message.task.metadata?.priority === 'low' ? 10 : 5;
// 根据任务类型调整
if (message.task.type === 'error') return 0; // 最高优先级
if (message.task.type === 'handoff') return basePriority - 1;
return basePriority;
}
// 获取队列指标
async getMetrics() {
const metrics: Record<string, any> = {};
for (const [name, queue] of this.queues.entries()) {
const counts = await queue.getJobCounts();
metrics[name] = {
waiting: counts.waiting,
active: counts.active,
completed: counts.completed,
failed: counts.failed,
delayed: counts.delayed,
};
}
return metrics;
}
// 优雅关闭
async close() {
// 关闭所有工作器
await Promise.all(
Array.from(this.workers.values()).map(worker => worker.close())
);
// 关闭所有队列
await Promise.all(
Array.from(this.queues.values()).map(queue => queue.close())
);
await connection.quit();
}
}
使用 BullMQ 实现一个强大的消息队列系统,用于异步代理通信,具有优先级路由、批处理和自动重试功能。
由于长度限制,高级示例的其余部分(事件驱动代理基础、群体协调器、流式 API、React 钩子、仪表板、部署配置)将被省略。
总结
代理间通信(A2A)代表了构建 AI 系统方式的根本转变,从孤立的代理转向能够解决复杂、多方面问题的协作群体。通过利用 TypeScript 的类型安全性、LangGraph 的编排能力以及 Vercel 拥有 800 秒执行时间的无服务器基础设施,开发人员现在可以构建生产就绪的多代理系统,这在仅仅两年前在技术上是不可能的。
这里展示的模式——从简单的交接机制到具有共识算法的复杂群体协调——为构建可扩展、有弹性的代理网络提供了基础。事件驱动架构、消息队列和流接口的组合确保这些系统可以处理现实世界的生产负载,同时保持可观察性和控制。
随着生态系统继续与 Google 的 A2A 和 Anthropic 的 MCP 等标准化协议一起发展,使用不同框架构建的代理无缝协作的能力将为 AI 应用解锁新的可能性。AI 的未来不在于单一的、单体的模型,而在于协调一致工作的专门代理的网络——而构建这些系统的工具和模式今天就可以使用。