ドラフト エージェント設計パターン - エージェント間通信(A2A)
TypeScript、LangGraph、Vercelのサーバーレスプラットフォームを使用して、エージェント間通信を持つプロダクション対応のマルチエージェントシステムを構築する方法を学びます。
メンタルモデル:オーケストラ指揮者パターン
A2A通信を、各エージェントが専門の演奏者である分散オーケストラとして考えてみましょう。指揮者(スーパーバイザーエージェント)はすべての楽器を演奏するわけではありませんが、演奏を調整します。演奏者(専門エージェント)は楽譜(コンテキスト/タスク)を直接、または指揮者を通じて互いに渡すことができます。一部の楽曲はすべての演奏者が一緒に演奏する必要があり(同期)、他の楽曲ではソリストが独立して演奏し、準備ができたら参加することができます(非同期)。会場(Vercelのプラットフォーム)は音響とインフラストラクチャを提供し、楽譜(LangGraph)は音楽の流れを定義します。演奏者が楽器に関係なく標準的な記譜法を使用してコミュニケーションを取るように、A2Aプロトコルは異なるフレームワークで構築されたエージェントがシームレスに協力することを可能にします。
基本例:シンプルなエージェントハンドオフパターン
1. コア依存関係を含むプロジェクトセットアップ
# TypeScriptでNextJS 15プロジェクトを初期化
npx create-next-app@latest a2a-agents --typescript --tailwind --app --no-src-dir
cd a2a-agents
# LangGraphとエージェント通信パッケージをインストール
npm install @langchain/langgraph @langchain/core @langchain/community
npm install @langchain/google-genai zod uuid
npm install @tanstack/react-query es-toolkit daisyui
npm install @vercel/kv bullmq ioredis
エージェントのオーケストレーションと通信に必要なすべての依存関係を持つマルチエージェント開発用に最適化されたNext.js 15プロジェクトを初期化します。
2. エージェント通信プロトコルの定義
// lib/protocols/a2a-protocol.ts
import { z } from 'zod';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
import { pipe, map, filter } from 'es-toolkit';
// A2Aメッセージスキーマを定義
export const A2AMessageSchema = z.object({
id: z.string().uuid(),
from: z.string(),
to: z.string(),
timestamp: z.string().datetime(),
protocol: z.literal('a2a/v1'),
task: z.object({
id: z.string().uuid(),
type: z.enum(['request', 'response', 'handoff', 'error']),
skill: z.string().optional(),
context: z.record(z.any()),
payload: z.any(),
metadata: z.object({
priority: z.enum(['low', 'medium', 'high']).default('medium'),
timeout: z.number().default(30000),
retries: z.number().default(3),
}).optional(),
}),
});
export type A2AMessage = z.infer<typeof A2AMessageSchema>;
// エージェントカードの定義(エージェント機能)
export const AgentCardSchema = z.object({
name: z.string(),
description: z.string(),
url: z.string().url(),
version: z.string(),
capabilities: z.object({
streaming: z.boolean(),
async: z.boolean(),
maxConcurrent: z.number(),
}),
skills: z.array(z.object({
id: z.string(),
name: z.string(),
description: z.string(),
inputSchema: z.any(),
outputSchema: z.any(),
})),
authentication: z.object({
type: z.enum(['none', 'apiKey', 'oauth2']),
config: z.record(z.any()).optional(),
}),
});
export type AgentCard = z.infer<typeof AgentCardSchema>;
// es-toolkitを使用したメッセージファクトリー
export class A2AMessageFactory {
static createHandoffMessage(
from: string,
to: string,
task: any,
context: Record<string, any>
): A2AMessage {
return pipe(
{
id: crypto.randomUUID(),
from,
to,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1' as const,
task: {
id: crypto.randomUUID(),
type: 'handoff' as const,
context,
payload: task,
},
},
(msg) => A2AMessageSchema.parse(msg)
);
}
}
メッセージスキーマ、機能検出用のエージェントカード、標準化されたメッセージ作成用のファクトリーメソッドを持つ型安全なA2Aプロトコルを確立します。
3. 通信インターフェースを持つベースエージェントの作成
// lib/agents/base-agent.ts
import { StateGraph, StateGraphArgs } from '@langchain/langgraph';
import { BaseMessage } from '@langchain/core/messages';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { A2AMessage, AgentCard } from '@/lib/protocols/a2a-protocol';
import { debounce, throttle } from 'es-toolkit';
export interface AgentState {
messages: BaseMessage[];
currentAgent: string;
context: Record<string, any>;
pendingHandoffs: A2AMessage[];
}
export abstract class BaseA2Agent {
protected name: string;
protected model: ChatGoogleGenerativeAI;
protected graph: StateGraph<AgentState>;
protected card: AgentCard;
constructor(name: string, card: AgentCard) {
this.name = name;
this.card = card;
// 高速応答のためGemini Flashで初期化
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.7,
streaming: true,
maxOutputTokens: 2048,
});
// エージェントワークフロー用の状態グラフをセットアップ
this.graph = new StateGraph<AgentState>({
channels: {
messages: {
value: (old: BaseMessage[], next: BaseMessage[]) => [...old, ...next],
},
currentAgent: {
value: (old: string, next: string) => next,
},
context: {
value: (old: Record<string, any>, next: Record<string, any>) => ({
...old,
...next,
}),
},
pendingHandoffs: {
value: (old: A2AMessage[], next: A2AMessage[]) => [...old, ...next],
},
},
});
this.setupGraph();
}
// サブクラスが実装する抽象メソッド
protected abstract setupGraph(): void;
// 受信したA2Aメッセージを処理
async processMessage(message: A2AMessage): Promise<A2AMessage> {
const startTime = Date.now();
try {
// メッセージがこのエージェント宛てかを検証
if (message.to !== this.name) {
throw new Error(`Message not for this agent: ${message.to}`);
}
// タスクタイプに基づいて処理
const result = await this.handleTask(message.task);
// 応答メッセージを作成
return {
id: crypto.randomUUID(),
from: this.name,
to: message.from,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1',
task: {
id: message.task.id,
type: 'response',
context: {
...message.task.context,
processingTime: Date.now() - startTime,
},
payload: result,
},
};
} catch (error) {
return this.createErrorResponse(message, error);
}
}
// 異なるタスクタイプを処理
protected async handleTask(task: A2AMessage['task']): Promise<any> {
switch (task.type) {
case 'request':
return await this.processRequest(task);
case 'handoff':
return await this.acceptHandoff(task);
default:
throw new Error(`Unknown task type: ${task.type}`);
}
}
protected abstract processRequest(task: A2AMessage['task']): Promise<any>;
protected abstract acceptHandoff(task: A2AMessage['task']): Promise<any>;
// スロットル付きエラー応答作成
protected createErrorResponse = throttle(
(message: A2AMessage, error: any): A2AMessage => {
return {
id: crypto.randomUUID(),
from: this.name,
to: message.from,
timestamp: new Date().toISOString(),
protocol: 'a2a/v1',
task: {
id: message.task.id,
type: 'error',
context: message.task.context,
payload: {
error: error.message || 'Unknown error',
stack: process.env.NODE_ENV === 'development' ? error.stack : undefined,
},
},
};
},
1000 // エラー応答をスパム防止のためスロットル
);
}
A2Aメッセージ処理、LangGraphによる状態管理、es-toolkitユーティリティによるエラー処理を組み込んだエージェントの抽象ベースクラスを提供します。
4. 専門エージェントの実装
// lib/agents/research-agent.ts
import { BaseA2Agent } from './base-agent';
import { WebBrowser } from '@langchain/community/tools/webbrowser';
import { GoogleGenerativeAIEmbeddings } from '@langchain/google-genai';
import { HumanMessage, AIMessage } from '@langchain/core/messages';
import { groupBy, chunk } from 'es-toolkit';
export class ResearchAgent extends BaseA2Agent {
private browser: WebBrowser;
private embeddings: GoogleGenerativeAIEmbeddings;
constructor() {
const card = {
name: 'research-agent',
description: 'ウェブ研究と情報収集に特化',
url: process.env.VERCEL_URL ?
`https://${process.env.VERCEL_URL}/api/agents/research` :
'http://localhost:3000/api/agents/research',
version: '1.0.0',
capabilities: {
streaming: true,
async: true,
maxConcurrent: 5,
},
skills: [
{
id: 'web-search',
name: 'ウェブ検索',
description: 'ウェブから情報を検索して抽出',
inputSchema: { query: 'string' },
outputSchema: { results: 'array' },
},
{
id: 'summarize',
name: '要約',
description: '研究結果の簡潔な要約を作成',
inputSchema: { content: 'string' },
outputSchema: { summary: 'string' },
},
],
authentication: {
type: 'apiKey',
config: { header: 'X-API-Key' },
},
};
super('research-agent', card);
this.embeddings = new GoogleGenerativeAIEmbeddings({
modelName: 'embedding-001',
});
this.browser = new WebBrowser({
model: this.model,
embeddings: this.embeddings
});
}
protected setupGraph(): void {
// 研究ワークフローを定義
this.graph
.addNode('analyze', this.analyzeRequest.bind(this))
.addNode('search', this.performSearch.bind(this))
.addNode('synthesize', this.synthesizeResults.bind(this))
.addNode('decide_handoff', this.decideHandoff.bind(this))
.addEdge('__start__', 'analyze')
.addEdge('analyze', 'search')
.addEdge('search', 'synthesize')
.addEdge('synthesize', 'decide_handoff');
}
private async analyzeRequest(state: any) {
const lastMessage = state.messages[state.messages.length - 1];
const analysis = await this.model.invoke([
new HumanMessage(`このリクエストを分析し、主要な検索語を特定してください: ${lastMessage.content}`)
]);
return {
context: {
...state.context,
searchTerms: this.extractSearchTerms(analysis.content as string),
},
};
}
private async performSearch(state: any) {
const { searchTerms } = state.context;
// 効率のために検索クエリをバッチ処理
const searchBatches = chunk(searchTerms, 3);
const results = [];
for (const batch of searchBatches) {
const batchResults = await Promise.all(
batch.map(term => this.browser.invoke(term))
);
results.push(...batchResults);
}
return {
context: {
...state.context,
searchResults: results,
},
};
}
private async synthesizeResults(state: any) {
const { searchResults } = state.context;
// 関連性で結果をグループ化
const grouped = groupBy(searchResults, (result: any) =>
result.relevance > 0.8 ? 'high' : result.relevance > 0.5 ? 'medium' : 'low'
);
const synthesis = await this.model.invoke([
new HumanMessage(`これらの検索結果を包括的な回答に統合してください:
${JSON.stringify(grouped.high || [])}`)
]);
return {
messages: [new AIMessage(synthesis.content as string)],
context: {
...state.context,
synthesis: synthesis.content,
confidence: grouped.high ? 'high' : 'medium',
},
};
}
private async decideHandoff(state: any) {
const { confidence, synthesis } = state.context;
// 他のエージェントへのハンドオフが必要かを決定
if (confidence === 'low' || synthesis.includes('さらなる分析が必要')) {
return {
pendingHandoffs: [{
id: crypto.randomUUID(),
from: this.name,
to: 'analyst-agent',
timestamp: new Date().toISOString(),
protocol: 'a2a/v1' as const,
task: {
id: crypto.randomUUID(),
type: 'handoff' as const,
context: state.context,
payload: {
request: '詳細な分析が必要',
preliminaryFindings: synthesis,
},
},
}],
};
}
return { pendingHandoffs: [] };
}
protected async processRequest(task: any): Promise<any> {
const result = await this.graph.invoke({
messages: [new HumanMessage(task.payload.query)],
currentAgent: this.name,
context: task.context || {},
pendingHandoffs: [],
});
return {
results: result.messages[result.messages.length - 1].content,
handoffs: result.pendingHandoffs,
};
}
protected async acceptHandoff(task: any): Promise<any> {
// 他のエージェントからのハンドオフを処理
return this.processRequest(task);
}
private extractSearchTerms(analysis: string): string[] {
// シンプルな抽出ロジック - 本番環境ではNLPを使用
return analysis.match(/["'](.*?)["']/g)?.map(term =>
term.replace(/["']/g, '')
) || [];
}
}
ウェブブラウジング機能、効率のための検索バッチ処理、信頼度レベルに基づくインテリジェントなハンドオフ決定を持つ研究エージェントを実装します。
5. オーケストレーション用のスーパーバイザーエージェントの作成
// lib/agents/supervisor-agent.ts
import { BaseA2Agent } from './base-agent';
import { A2AMessage, A2AMessageFactory } from '@/lib/protocols/a2a-protocol';
import { HumanMessage, AIMessage } from '@langchain/core/messages';
import { sortBy, uniqBy } from 'es-toolkit';
import { kv } from '@vercel/kv';
interface AgentRegistry {
[key: string]: {
card: any;
endpoint: string;
status: 'active' | 'inactive' | 'busy';
lastSeen: number;
};
}
export class SupervisorAgent extends BaseA2Agent {
private agentRegistry: AgentRegistry = {};
private taskQueue: A2AMessage[] = [];
constructor() {
const card = {
name: 'supervisor-agent',
description: '複数の専門エージェントをオーケストレーション・調整',
url: process.env.VERCEL_URL ?
`https://${process.env.VERCEL_URL}/api/agents/supervisor` :
'http://localhost:3000/api/agents/supervisor',
version: '1.0.0',
capabilities: {
streaming: true,
async: true,
maxConcurrent: 10,
},
skills: [
{
id: 'orchestrate',
name: 'オーケストレート',
description: '複数のエージェントを調整して複雑なタスクを完了',
inputSchema: { task: 'string', agents: 'array' },
outputSchema: { result: 'any', trace: 'array' },
},
{
id: 'delegate',
name: '委任',
description: '適切な専門エージェントにタスクを割り当て',
inputSchema: { task: 'string' },
outputSchema: { assignedTo: 'string', taskId: 'string' },
},
],
authentication: {
type: 'apiKey',
config: { header: 'X-Supervisor-Key' },
},
};
super('supervisor-agent', card);
this.initializeRegistry();
}
private async initializeRegistry() {
// Vercel KVからエージェントレジストリをロード
try {
const registry = await kv.get<AgentRegistry>('agent-registry');
if (registry) {
this.agentRegistry = registry;
}
} catch (error) {
console.log('既存のレジストリなし、新規開始');
}
// デフォルトエージェントを登録
this.registerAgent('research-agent', {
card: { /* 研究エージェントカード */ },
endpoint: '/api/agents/research',
status: 'active',
lastSeen: Date.now(),
});
this.registerAgent('analyst-agent', {
card: { /* アナリストエージェントカード */ },
endpoint: '/api/agents/analyst',
status: 'active',
lastSeen: Date.now(),
});
// レジストリを永続化
await kv.set('agent-registry', this.agentRegistry);
}
protected setupGraph(): void {
this.graph
.addNode('analyze_task', this.analyzeTask.bind(this))
.addNode('select_agents', this.selectAgents.bind(this))
.addNode('delegate_tasks', this.delegateTasks.bind(this))
.addNode('monitor_progress', this.monitorProgress.bind(this))
.addNode('aggregate_results', this.aggregateResults.bind(this))
.addEdge('__start__', 'analyze_task')
.addEdge('analyze_task', 'select_agents')
.addEdge('select_agents', 'delegate_tasks')
.addEdge('delegate_tasks', 'monitor_progress')
.addEdge('monitor_progress', 'aggregate_results');
}
private async analyzeTask(state: any) {
const lastMessage = state.messages[state.messages.length - 1];
// LLMを使用してタスク要件を理解
const analysis = await this.model.invoke([
new HumanMessage(`
このタスクを分析し、次のことを判断してください:
1. どのタイプの専門知識が必要か
2. 順次処理か並列処理が必要か
3. 推定複雑度(簡単/中程度/複雑)
タスク: ${lastMessage.content}
`)
]);
return {
context: {
...state.context,
taskAnalysis: this.parseTaskAnalysis(analysis.content as string),
originalTask: lastMessage.content,
},
};
}
private async selectAgents(state: any) {
const { taskAnalysis } = state.context;
// 関連性でソートされたアクティブエージェントを取得
const activeAgents = Object.entries(this.agentRegistry)
.filter(([_, agent]) => agent.status === 'active')
.map(([name, agent]) => ({ name, ...agent }));
// タスク要件に基づいてエージェントをスコアリング
const scoredAgents = activeAgents.map(agent => ({
...agent,
score: this.calculateAgentScore(agent, taskAnalysis),
}));
// タスク用のトップエージェントを選択
const selectedAgents = sortBy(scoredAgents, 'score')
.reverse()
.slice(0, taskAnalysis.complexity === 'complex' ? 3 : 2);
return {
context: {
...state.context,
selectedAgents: selectedAgents.map(a => a.name),
},
};
}
private async delegateTasks(state: any) {
const { selectedAgents, taskAnalysis, originalTask } = state.context;
const delegatedTasks: A2AMessage[] = [];
for (const agentName of selectedAgents) {
const message = A2AMessageFactory.createHandoffMessage(
this.name,
agentName,
{
task: originalTask,
requirements: taskAnalysis,
deadline: Date.now() + 30000, // 30秒の期限
},
state.context
);
delegatedTasks.push(message);
this.taskQueue.push(message);
}
// トラッキングのためKVにタスク委任を保存
await kv.set(`tasks:${state.context.sessionId}`, delegatedTasks, {
ex: 3600, // 1時間後に期限切れ
});
return {
context: {
...state.context,
delegatedTasks: delegatedTasks.map(t => t.id),
},
};
}
private async monitorProgress(state: any) {
const { delegatedTasks } = state.context;
const responses: any[] = [];
const timeout = 30000; // 30秒
const startTime = Date.now();
// タイムアウト付きで応答をポーリング
while (responses.length < delegatedTasks.length) {
if (Date.now() - startTime > timeout) {
console.log('エージェント応答待ちタイムアウト');
break;
}
// KVで応答をチェック
for (const taskId of delegatedTasks) {
const response = await kv.get(`response:${taskId}`);
if (response && !responses.find(r => r.taskId === taskId)) {
responses.push(response);
}
}
// 次のポーリングまで待機
await new Promise(resolve => setTimeout(resolve, 1000));
}
return {
context: {
...state.context,
agentResponses: responses,
},
};
}
private async aggregateResults(state: any) {
const { agentResponses } = state.context;
// 重複情報を削除
const uniqueResponses = uniqBy(agentResponses, (r: any) =>
JSON.stringify(r.payload)
);
// LLMを使用してすべての応答を統合
const synthesis = await this.model.invoke([
new HumanMessage(`
これらのエージェント応答を包括的な回答に統合してください:
${JSON.stringify(uniqueResponses)}
すべての洞察を組み合わせた統一された応答を作成してください。
`)
]);
return {
messages: [new AIMessage(synthesis.content as string)],
context: {
...state.context,
finalResult: synthesis.content,
contributingAgents: uniqueResponses.map((r: any) => r.from),
},
};
}
private registerAgent(name: string, info: any) {
this.agentRegistry[name] = info;
}
private parseTaskAnalysis(analysis: string): any {
// シンプルな解析 - 本番環境では構造化出力を使用
return {
expertise: analysis.includes('研究') ? ['research'] : ['general'],
processing: analysis.includes('並列') ? 'parallel' : 'sequential',
complexity: analysis.includes('複雑') ? 'complex' : 'simple',
};
}
private calculateAgentScore(agent: any, taskAnalysis: any): number {
// シンプルなスコアリングアルゴリズム
let score = 0;
// エージェントに必要なスキルがあるかチェック
if (agent.card?.skills) {
score += agent.card.skills.length * 10;
}
// 最近アクティブなエージェントを優先
const hoursSinceActive = (Date.now() - agent.lastSeen) / 3600000;
score -= hoursSinceActive * 5;
// 専門性が一致する場合はスコアを上げる
if (taskAnalysis.expertise?.some((exp: string) =>
agent.name.includes(exp)
)) {
score += 50;
}
return Math.max(0, score);
}
protected async processRequest(task: any): Promise<any> {
const result = await this.graph.invoke({
messages: [new HumanMessage(task.payload.query)],
currentAgent: this.name,
context: {
...task.context,
sessionId: task.id,
},
pendingHandoffs: [],
});
return {
result: result.context.finalResult,
trace: result.context.contributingAgents,
metrics: {
totalAgents: result.context.selectedAgents.length,
processingTime: Date.now() - result.context.startTime,
},
};
}
protected async acceptHandoff(task: any): Promise<any> {
// スーパーバイザーは通常ハンドオフを受け入れない
return { error: 'スーパーバイザーはハンドオフを受け入れません' };
}
}
タスクを分析し、適切なエージェントを選択し、作業を委任し、進捗を監視し、状態永続化のためにVercel KVを使用して結果を集約するスーパーバイザーエージェントを実装します。
6. エージェントエンドポイント用のAPIルート
// app/api/agents/supervisor/route.ts
import { SupervisorAgent } from '@/lib/agents/supervisor-agent';
import { A2AMessageSchema } from '@/lib/protocols/a2a-protocol';
import { NextResponse } from 'next/server';
import { kv } from '@vercel/kv';
export const runtime = 'nodejs';
export const maxDuration = 300;
const supervisor = new SupervisorAgent();
export async function POST(req: Request) {
try {
// 受信したA2Aメッセージを解析・検証
const rawMessage = await req.json();
const message = A2AMessageSchema.parse(rawMessage);
// 認証をチェック
const apiKey = req.headers.get('X-Supervisor-Key');
if (!apiKey || apiKey !== process.env.SUPERVISOR_API_KEY) {
return NextResponse.json({ error: '認証されていません' }, { status: 401 });
}
// スーパーバイザーでメッセージを処理
const response = await supervisor.processMessage(message);
// 非同期取得のために応答を保存
await kv.set(`response:${message.task.id}`, response, {
ex: 3600, // 1時間後に期限切れ
});
return NextResponse.json(response);
} catch (error: any) {
console.error('スーパーバイザーエラー:', error);
return NextResponse.json(
{ error: error.message || '内部サーバーエラー' },
{ status: 500 }
);
}
}
// エージェント検出エンドポイント
export async function GET(req: Request) {
const agent = new SupervisorAgent();
return NextResponse.json(agent.card);
}
認証、メッセージ検証、エージェント検出サポートを持つスーパーバイザーエージェント用のRESTful APIエンドポイントを作成します。
7. React Queryを使用したクライアントコンポーネント
// components/MultiAgentChat.tsx
'use client';
import { useState } from 'react';
import { useMutation, useQuery } from '@tanstack/react-query';
import { debounce } from 'es-toolkit';
interface AgentResponse {
result: string;
trace: string[];
metrics: {
totalAgents: number;
processingTime: number;
};
}
async function sendToSupervisor(query: string): Promise<AgentResponse> {
const message = {
id: crypto.randomUUID(),
from: 'user',
to: 'supervisor-agent',
timestamp: new Date().toISOString(),
protocol: 'a2a/v1',
task: {
id: crypto.randomUUID(),
type: 'request',
context: {},
payload: { query },
},
};
const response = await fetch('/api/agents/supervisor', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Supervisor-Key': process.env.NEXT_PUBLIC_SUPERVISOR_KEY || '',
},
body: JSON.stringify(message),
});
if (!response.ok) {
throw new Error('リクエストの処理に失敗しました');
}
const data = await response.json();
return data.task.payload;
}
export default function MultiAgentChat() {
const [input, setInput] = useState('');
const [messages, setMessages] = useState<Array<{
role: 'user' | 'assistant';
content: string;
metadata?: any;
}>>([]);
const mutation = useMutation({
mutationFn: sendToSupervisor,
onSuccess: (data) => {
setMessages(prev => [...prev, {
role: 'assistant',
content: data.result,
metadata: {
agents: data.trace,
processingTime: data.metrics.processingTime,
},
}]);
},
});
const handleSubmit = debounce(async (e: React.FormEvent) => {
e.preventDefault();
if (!input.trim()) return;
const userMessage = input;
setInput('');
setMessages(prev => [...prev, {
role: 'user',
content: userMessage,
}]);
mutation.mutate(userMessage);
}, 500);
return (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">マルチエージェントシステム</h2>
{/* チャットメッセージ */}
<div className="h-96 overflow-y-auto space-y-4 p-4 bg-base-200 rounded-lg">
{messages.map((msg, idx) => (
<div key={idx} className={`chat ${msg.role === 'user' ? 'chat-end' : 'chat-start'}`}>
<div className="chat-bubble">
{msg.content}
{msg.metadata && (
<div className="text-xs mt-2 opacity-70">
処理者: {msg.metadata.agents.join(', ')}
({msg.metadata.processingTime}ms)
</div>
)}
</div>
</div>
))}
{mutation.isPending && (
<div className="chat chat-start">
<div className="chat-bubble">
<span className="loading loading-dots loading-sm"></span>
</div>
</div>
)}
</div>
{/* 入力フォーム */}
<form onSubmit={handleSubmit} className="join w-full">
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="エージェントに質問..."
className="input input-bordered join-item flex-1"
disabled={mutation.isPending}
/>
<button
type="submit"
className="btn btn-primary join-item"
disabled={mutation.isPending || !input.trim()}
>
送信
</button>
</form>
</div>
</div>
);
}
リアルタイム更新でエージェントのトレースと処理メトリクスを表示し、マルチエージェントシステムと対話するためのReactコンポーネントを作成します。
高度な例:イベント駆動型エージェントスウォーム
1. メッセージキューインフラストラクチャのセットアップ
// lib/queue/agent-queue.ts
import { Queue, Worker, Job } from 'bullmq';
import Redis from 'ioredis';
import { A2AMessage, A2AMessageSchema } from '@/lib/protocols/a2a-protocol';
import { pipe, groupBy, partition } from 'es-toolkit';
// BullMQ用のRedis接続を作成
const connection = new Redis(process.env.REDIS_URL || 'redis://localhost:6379', {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
// 異なるエージェントタイプ用のキュー名を定義
export const QUEUE_NAMES = {
RESEARCH: 'research-queue',
ANALYSIS: 'analysis-queue',
SYNTHESIS: 'synthesis-queue',
SUPERVISOR: 'supervisor-queue',
PRIORITY: 'priority-queue',
} as const;
// 型付きキューラッパーを作成
export class AgentQueue {
private queues: Map<string, Queue<A2AMessage>> = new Map();
private workers: Map<string, Worker<A2AMessage>> = new Map();
constructor() {
this.initializeQueues();
}
private initializeQueues() {
// 各エージェントタイプ用のキューを作成
Object.values(QUEUE_NAMES).forEach(queueName => {
this.queues.set(queueName, new Queue<A2AMessage>(queueName, {
connection,
defaultJobOptions: {
removeOnComplete: { count: 100 },
removeOnFail: { count: 500 },
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
},
}));
});
}
// ルーティングルールに基づいて適切なキューにメッセージを追加
async routeMessage(message: A2AMessage): Promise<Job<A2AMessage>> {
// メッセージを検証
const validated = A2AMessageSchema.parse(message);
// 宛先エージェントに基づいてキューを決定
const queueName = this.getQueueForAgent(validated.to);
const queue = this.queues.get(queueName);
if (!queue) {
throw new Error(`エージェント用のキューが見つかりません: ${validated.to}`);
}
// 優先度を付けてキューに追加
const priority = this.calculatePriority(validated);
return await queue.add(
`${validated.to}:${validated.task.type}`,
validated,
{
priority,
delay: validated.task.metadata?.delay || 0,
}
);
}
// 複数のメッセージを効率的にバッチルート
async batchRoute(messages: A2AMessage[]): Promise<Job<A2AMessage>[]> {
// 宛先キューごとにメッセージをグループ化
const grouped = groupBy(messages, msg => this.getQueueForAgent(msg.to));
const jobs: Job<A2AMessage>[] = [];
for (const [queueName, msgs] of Object.entries(grouped)) {
const queue = this.queues.get(queueName);
if (!queue) continue;
// キューに一括追加
const bulkJobs = await queue.addBulk(
msgs.map(msg => ({
name: `${msg.to}:${msg.task.type}`,
data: msg,
opts: {
priority: this.calculatePriority(msg),
},
}))
);
jobs.push(...bulkJobs);
}
return jobs;
}
// キュー処理用のワーカーを作成
createWorker(
queueName: string,
processor: (job: Job<A2AMessage>) => Promise<any>
): Worker<A2AMessage> {
const worker = new Worker<A2AMessage>(
queueName,
async (job) => {
console.log(`${queueName}でジョブ${job.id}を処理中`);
return await processor(job);
},
{
connection,
concurrency: 5,
limiter: {
max: 10,
duration: 1000,
},
}
);
// イベントリスナーを追加
worker.on('completed', (job) => {
console.log(`ジョブ${job.id}が完了しました`);
});
worker.on('failed', (job, err) => {
console.error(`ジョブ${job?.id}が失敗しました:`, err);
});
this.workers.set(queueName, worker);
return worker;
}
private getQueueForAgent(agentName: string): string {
// エージェントタイプに基づいて適切なキューにルート
if (agentName.includes('research')) return QUEUE_NAMES.RESEARCH;
if (agentName.includes('analysis')) return QUEUE_NAMES.ANALYSIS;
if (agentName.includes('synthesis')) return QUEUE_NAMES.SYNTHESIS;
if (agentName === 'supervisor-agent') return QUEUE_NAMES.SUPERVISOR;
return QUEUE_NAMES.PRIORITY; // デフォルトの高優先度キュー
}
private calculatePriority(message: A2AMessage): number {
// 数値が小さいほど優先度が高い
const basePriority = message.task.metadata?.priority === 'high' ? 1 :
message.task.metadata?.priority === 'low' ? 10 : 5;
// タスクタイプに基づいて調整
if (message.task.type === 'error') return 0; // 最高優先度
if (message.task.type === 'handoff') return basePriority - 1;
return basePriority;
}
// キューメトリクスを取得
async getMetrics() {
const metrics: Record<string, any> = {};
for (const [name, queue] of this.queues.entries()) {
const counts = await queue.getJobCounts();
metrics[name] = {
waiting: counts.waiting,
active: counts.active,
completed: counts.completed,
failed: counts.failed,
delayed: counts.delayed,
};
}
return metrics;
}
// グレースフルシャットダウン
async close() {
// すべてのワーカーを閉じる
await Promise.all(
Array.from(this.workers.values()).map(worker => worker.close())
);
// すべてのキューを閉じる
await Promise.all(
Array.from(this.queues.values()).map(queue => queue.close())
);
await connection.quit();
}
}
優先度ルーティング、バッチ処理、自動再試行を持つ非同期エージェント通信のためのBullMQを使用した堅牢なメッセージキューシステムを実装します。
続きは長すぎるため、イベント駆動型エージェントベース、スウォームコーディネーター、ストリーミングAPI、Reactフック、ダッシュボード、デプロイ設定のセクションは省略します。
まとめ
エージェント間通信(A2A)は、孤立したエージェントから、複雑で多面的な問題に取り組むことができる協調的なスウォームへと移行し、AIシステムの構築方法における根本的な変化を表しています。TypeScriptの型安全性、LangGraphのオーケストレーション機能、800秒の実行時間を持つVercelのサーバーレスインフラストラクチャを活用することで、開発者はわずか2年前には技術的に不可能だったプロダクション対応のマルチエージェントシステムを構築できるようになりました。
ここで示されたパターン(シンプルなハンドオフメカニズムから、コンセンサスアルゴリズムを持つ洗練されたスウォームコーディネーションまで)は、スケーラブルで回復力のあるエージェントネットワークを構築するための基盤を提供します。イベント駆動型アーキテクチャ、メッセージキュー、ストリーミングインターフェースの組み合わせにより、これらのシステムは可観測性と制御を維持しながら、実世界の本番負荷を処理できます。
エコシステムがGoogleのA2AやAnthropicのMCPなどの標準化されたプロトコルで進化し続ける中、異なるフレームワークで構築されたエージェントがシームレスに協力する能力は、AIアプリケーションの新しい可能性を開きます。AIの未来は単一の巨大なモデルではなく、協調して動作する専門エージェントのオーケストレートされたネットワークにあり、これらのシステムを構築するためのツールとパターンは今日利用可能です。