ドラフト エージェント設計パターン - マルチエージェント協調
マルチエージェント協調は、共通の目標に向けて協働する複数の専門的なAIエージェントをオーケストレーションすることで、複雑な問題解決を可能にします。単一のモノリシックなエージェントに依存する代わりに、このパターンは特定の機能とツールを持つ専門エージェント間でタスクを分散し、より堅牢でスケーラブルなソリューションを実現します。
メンタルモデル:オーケストラ指揮者アーキテクチャ
マルチエージェント協調は、Next.jsのAPIルートでオーケストラを指揮するようなものと考えてください。各エージェントは専門の演奏者(バイオリン = リサーチャー、ドラム = データアナリスト、ピアノ = ライター)であり、それぞれの楽器(ツール/機能)を持っています。指揮者(スーパーバイザーエージェントまたはオーケストレーター)は、これらの専門家を調整してシンフォニー(完全なソリューション)を作り上げます。Vercelのサーバーレス環境では、RedisやUpstashの共有ステートを通じて協調する軽量で専門的な関数をスピンアップすることを意味し、メッセージキューを介してマイクロサービスが通信する方法に似ていますが、各ノードでAI駆動の意思決定が行われます。
基本例:スーパーバイザー・ワーカーパターン
リサーチタスクを処理するために、スーパーバイザーが専門ワーカーを調整するシンプルなマルチエージェントシステムを構築しましょう。
1. エージェントの状態と型の定義
// app/lib/agents/types.ts
import { Annotation } from "@langchain/langgraph";
import { BaseMessage } from "@langchain/core/messages";
import { z } from "zod";
// エージェント間で流れる状態を定義
export const AgentState = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
default: () => []
}),
task: Annotation<string>(),
researchData: Annotation<string>(),
analysis: Annotation<string>(),
finalOutput: Annotation<string>(),
nextAgent: Annotation<string>()
});
export type AgentStateType = typeof AgentState.State;
// エージェント設定スキーマ
export const AgentConfigSchema = z.object({
name: z.string(),
role: z.string(),
temperature: z.number().default(0.7),
maxTokens: z.number().default(1000)
});
適切なTypeScript型を使用したLangGraphのAnnotationシステムを使用して共有状態構造を定義します。
2. 専門ワーカーエージェントの作成
// app/lib/agents/workers.ts
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { DynamicStructuredTool } from "@langchain/core/tools";
import { z } from "zod";
import { isEmpty, pick } from "es-toolkit";
import { AgentStateType } from "./types";
// Web検索機能を持つリサーチエージェント
export const researchAgent = async (state: AgentStateType) => {
const model = new ChatGoogleGenerativeAI({
model: "gemini-2.5-flash",
temperature: 0.5,
maxRetries: 2
});
const searchTool = new DynamicStructuredTool({
name: "web_search",
description: "Webで情報を検索",
schema: z.object({
query: z.string().describe("検索クエリ")
}),
func: async ({ query }) => {
// モック検索 - 実際のAPIコールに置き換え
await new Promise(resolve => setTimeout(resolve, 100));
return `次の情報が見つかりました: ${query}`;
}
});
const prompt = `あなたはリサーチスペシャリストです。
タスク: ${state.task}
検索ツールを使用して関連情報を収集してください。`;
const response = await model.invoke(prompt);
return {
messages: [response],
researchData: response.content as string
};
};
// データ処理用の分析エージェント
export const analysisAgent = async (state: AgentStateType) => {
const model = new ChatGoogleGenerativeAI({
model: "gemini-2.5-flash",
temperature: 0.3
});
const prompt = `あなたはデータアナリストです。
タスク: ${state.task}
リサーチデータ: ${state.researchData}
このデータを分析して洞察を提供してください。`;
const response = await model.invoke(prompt);
return {
messages: [response],
analysis: response.content as string
};
};
// 最終出力用のライターエージェント
export const writerAgent = async (state: AgentStateType) => {
const model = new ChatGoogleGenerativeAI({
model: "gemini-2.5-pro",
temperature: 0.7
});
const prompt = `あなたはプロフェッショナルライターです。
タスク: ${state.task}
リサーチ: ${state.researchData}
分析: ${state.analysis}
包括的で構造化された応答を作成してください。`;
const response = await model.invoke(prompt);
return {
messages: [response],
finalOutput: response.content as string
};
};
各ワーカーエージェントは、適切なモデル設定とツールを備えた特定のタスクに特化しています。
3. スーパーバイザーの実装
// app/lib/agents/supervisor.ts
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { JsonOutputParser } from "@langchain/core/output_parsers";
import { AgentStateType } from "./types";
import { END } from "@langchain/langgraph";
export const supervisorAgent = async (state: AgentStateType) => {
const model = new ChatGoogleGenerativeAI({
model: "gemini-2.5-flash",
temperature: 0
});
// 現在の状態に基づいて次のエージェントを決定
const systemPrompt = `あなたはエージェントチームを管理するスーパーバイザーです。
現在のタスク: ${state.task}
利用可能なエージェント:
- researcher: 情報収集
- analyst: データ分析
- writer: 最終出力作成
完了した内容に基づいて:
- リサーチデータの存在: ${!!state.researchData}
- 分析の存在: ${!!state.analysis}
- 最終出力の存在: ${!!state.finalOutput}
JSONで返してください: { "nextAgent": "agent_name" または "finish" }`;
const parser = new JsonOutputParser();
const response = await model.invoke(systemPrompt);
const decision = await parser.parse(response.content as string);
return {
nextAgent: decision.nextAgent === "finish" ? END : decision.nextAgent,
messages: [response]
};
};
スーパーバイザーは現在の状態に基づいて次にアクティブ化するエージェントを決定します。
4. マルチエージェントグラフの構築
// app/lib/agents/workflow.ts
import { StateGraph } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph";
import { START, END } from "@langchain/langgraph";
import { AgentState } from "./types";
import { supervisorAgent } from "./supervisor";
import { researchAgent, analysisAgent, writerAgent } from "./workers";
export function createMultiAgentWorkflow() {
const workflow = new StateGraph(AgentState)
// 各エージェントのノードを追加
.addNode("supervisor", supervisorAgent)
.addNode("researcher", researchAgent)
.addNode("analyst", analysisAgent)
.addNode("writer", writerAgent)
// フローを定義
.addEdge(START, "supervisor")
.addConditionalEdges(
"supervisor",
// スーパーバイザーの決定に基づくルーティング関数
(state) => state.nextAgent || END,
{
researcher: "researcher",
analyst: "analyst",
writer: "writer",
[END]: END
}
)
// ワーカーはスーパーバイザーに戻る
.addEdge("researcher", "supervisor")
.addEdge("analyst", "supervisor")
.addEdge("writer", "supervisor");
// 会話の永続化のためメモリ付きでコンパイル
const memory = new MemorySaver();
return workflow.compile({
checkpointer: memory,
recursionLimit: 10 // 無限ループを防止
});
}
LangGraphワークフローは適切な状態管理でエージェントの相互作用をオーケストレーションします。
5. APIルートの作成
// app/api/multi-agent/route.ts
import { NextRequest, NextResponse } from "next/server";
import { createMultiAgentWorkflow } from "@/lib/agents/workflow";
import { HumanMessage } from "@langchain/core/messages";
import { pick } from "es-toolkit";
export const maxDuration = 300; // 長時間実行タスク用の5分
export async function POST(req: NextRequest) {
try {
const { task, threadId } = await req.json();
// ワークフローの初期化
const workflow = createMultiAgentWorkflow();
// マルチエージェント協調の実行
const result = await workflow.invoke(
{
task,
messages: [new HumanMessage(task)]
},
{
configurable: { thread_id: threadId || "default" },
recursionLimit: 10
}
);
// 関連フィールドのみを返す
const response = pick(result, ["finalOutput", "analysis", "researchData"]);
return NextResponse.json({
success: true,
result: response
});
} catch (error) {
console.error("マルチエージェントエラー:", error);
return NextResponse.json(
{ error: "処理に失敗しました" },
{ status: 500 }
);
}
}
APIルートはVercelの拡張タイムアウトサポートでマルチエージェントオーケストレーションを処理します。
6. React Queryを使用したフロントエンド統合
// app/components/MultiAgentInterface.tsx
"use client";
import { useMutation } from "@tanstack/react-query";
import { useState } from "react";
import { debounce } from "es-toolkit";
interface MultiAgentResult {
finalOutput: string;
analysis: string;
researchData: string;
}
async function runMultiAgentTask(task: string): Promise<MultiAgentResult> {
const response = await fetch("/api/multi-agent", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ task, threadId: Date.now().toString() })
});
if (!response.ok) throw new Error("エージェント実行に失敗しました");
const data = await response.json();
return data.result;
}
export default function MultiAgentInterface() {
const [task, setTask] = useState("");
const mutation = useMutation({
mutationFn: runMultiAgentTask,
onSuccess: (data) => {
console.log("タスク完了:", data);
}
});
const handleSubmit = debounce((e: React.FormEvent) => {
e.preventDefault();
if (task.trim()) {
mutation.mutate(task);
}
}, 500);
return (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">マルチエージェントタスクプロセッサー</h2>
<form onSubmit={handleSubmit}>
<textarea
className="textarea textarea-bordered w-full"
placeholder="タスクを記述してください..."
value={task}
onChange={(e) => setTask(e.target.value)}
rows={4}
/>
<button
type="submit"
className="btn btn-primary mt-4"
disabled={mutation.isPending}
>
{mutation.isPending ? (
<span className="loading loading-spinner" />
) : (
"タスクを処理"
)}
</button>
</form>
{mutation.data && (
<div className="mt-6 space-y-4">
<div className="collapse collapse-arrow bg-base-200">
<input type="checkbox" defaultChecked />
<div className="collapse-title font-medium">
最終出力
</div>
<div className="collapse-content">
<p>{mutation.data.finalOutput}</p>
</div>
</div>
<div className="collapse collapse-arrow bg-base-200">
<input type="checkbox" />
<div className="collapse-title font-medium">
分析
</div>
<div className="collapse-content">
<p>{mutation.data.analysis}</p>
</div>
</div>
</div>
)}
</div>
</div>
);
}
Reactコンポーネントは適切なローディング状態と結果表示でマルチエージェントの相互作用を管理します。
高度な例:ディベートを伴う並列協調
次に、エージェントが並列で動作し、コンセンサスに達するためにディベートする洗練されたマルチエージェントシステムを実装しましょう。
1. 投票を含む高度な状態の定義
// app/lib/agents/advanced-types.ts
import { Annotation } from "@langchain/langgraph";
import { z } from "zod";
export const DebateOpinion = z.object({
agent: z.string(),
opinion: z.string(),
confidence: z.number().min(0).max(1),
reasoning: z.string()
});
export const DebateState = Annotation.Root({
question: Annotation<string>(),
round: Annotation<number>({ default: () => 0 }),
opinions: Annotation<z.infer<typeof DebateOpinion>[]>({
reducer: (x, y) => [...x, ...y],
default: () => []
}),
consensus: Annotation<string>(),
hasConverged: Annotation<boolean>({ default: () => false })
});
export type DebateStateType = typeof DebateState.State;
高度な状態は意見、信頼スコア、コンセンサスステータスを追跡します。
2. 並列エキスパートエージェントの実装
// app/lib/agents/expert-agents.ts
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { DebateStateType, DebateOpinion } from "./advanced-types";
import { chunk, groupBy, maxBy } from "es-toolkit";
// ベースエキスパートエージェントファクトリ
function createExpertAgent(expertise: string, model: string = "gemini-2.5-flash") {
return async (state: DebateStateType): Promise<Partial<DebateStateType>> => {
const llm = new ChatGoogleGenerativeAI({
model,
temperature: 0.7,
maxRetries: 2
});
// 前のラウンドから他のエージェントの意見を取得
const otherOpinions = state.opinions
.filter(op => op.agent !== expertise)
.slice(-3); // 他者からの最後の3つの意見
const prompt = `あなたは${expertise}のエキスパートです。
質問: ${state.question}
ラウンド: ${state.round}
${otherOpinions.length > 0 ? `
他のエキスパートの意見:
${otherOpinions.map(op =>
`${op.agent}: ${op.opinion} (信頼度: ${op.confidence})`
).join('\n')}
` : ''}
理由と信頼度(0-1)を付けてあなたの意見を提供してください。
フォーマット: { "opinion": "...", "reasoning": "...", "confidence": 0.8 }`;
const response = await llm.invoke(prompt);
const parsed = JSON.parse(response.content as string);
return {
opinions: [{
agent: expertise,
opinion: parsed.opinion,
reasoning: parsed.reasoning,
confidence: parsed.confidence
}]
};
};
}
// 専門エキスパートの作成
export const technicalExpert = createExpertAgent("技術分析");
export const businessExpert = createExpertAgent("ビジネス戦略");
export const userExpert = createExpertAgent("ユーザーエクスペリエンス");
export const securityExpert = createExpertAgent("セキュリティ", "gemini-2.5-pro");
エキスパートエージェントは信頼スコア付きの専門的な視点を提供します。
3. 並列実行コーディネーター
// app/lib/agents/parallel-coordinator.ts
import { DebateStateType } from "./advanced-types";
import {
technicalExpert,
businessExpert,
userExpert,
securityExpert
} from "./expert-agents";
import { chunk, meanBy, partition } from "es-toolkit";
export async function parallelExpertAnalysis(
state: DebateStateType
): Promise<Partial<DebateStateType>> {
const experts = [
technicalExpert,
businessExpert,
userExpert,
securityExpert
];
// タイムアウト付きですべてのエキスパートを並列実行
const expertPromises = experts.map(expert =>
Promise.race([
expert(state),
new Promise<Partial<DebateStateType>>((_, reject) =>
setTimeout(() => reject(new Error("タイムアウト")), 10000)
)
])
);
const results = await Promise.allSettled(expertPromises);
// 成功した意見を収集
const newOpinions = results
.filter(r => r.status === "fulfilled")
.flatMap(r => (r as PromiseFulfilledResult<any>).value.opinions || []);
// 収束をチェック
const hasConverged = checkConvergence(newOpinions);
return {
opinions: newOpinions,
round: state.round + 1,
hasConverged
};
}
function checkConvergence(opinions: any[]): boolean {
if (opinions.length < 3) return false;
// 類似性で意見をグループ化(簡略化)
const groups = groupBy(opinions, op =>
op.opinion.toLowerCase().includes("同意") ? "同意" : "不同意"
);
// 多数派が同意するかチェック
const largestGroup = Object.values(groups)
.reduce((a, b) => a.length > b.length ? a : b, []);
// 75%以上が同意し、平均信頼度が0.7以上の場合に収束
const convergenceRatio = largestGroup.length / opinions.length;
const avgConfidence = meanBy(largestGroup, op => op.confidence);
return convergenceRatio > 0.75 && avgConfidence > 0.7;
}
コーディネーターはタイムアウト処理と収束検出を備えた並列実行を管理します。
4. 投票によるコンセンサス構築
// app/lib/agents/consensus.ts
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { DebateStateType } from "./advanced-types";
import { groupBy, maxBy, sortBy } from "es-toolkit";
export async function buildConsensus(
state: DebateStateType
): Promise<Partial<DebateStateType>> {
const model = new ChatGoogleGenerativeAI({
model: "gemini-2.5-pro",
temperature: 0.3
});
// 各エージェントの最新の意見を取得
const recentOpinions = Object.values(
groupBy(state.opinions, op => op.agent)
).map(group => maxBy(group, op => op.confidence));
// 信頼度でソート
const sortedOpinions = sortBy(
recentOpinions,
op => -(op?.confidence || 0)
);
const prompt = `これらのエキスパートの意見をコンセンサスに統合してください:
質問: ${state.question}
エキスパートの意見:
${sortedOpinions.map(op => `
${op?.agent} (信頼度: ${op?.confidence}):
意見: ${op?.opinion}
理由: ${op?.reasoning}
`).join('\n')}
すべてのエキスパートからの重要なポイントに対処するバランスの取れたコンセンサスを作成してください。
信頼スコアで意見を重み付けしてください。`;
const response = await model.invoke(prompt);
return {
consensus: response.content as string,
hasConverged: true
};
}
// デッドロック解決のための投票メカニズム
export function majorityVote(opinions: any[]): string {
const voteGroups = groupBy(opinions, op => op.opinion);
const weightedVotes = Object.entries(voteGroups).map(([opinion, votes]) => ({
opinion,
totalWeight: votes.reduce((sum, v) => sum + v.confidence, 0)
}));
const winner = maxBy(weightedVotes, v => v.totalWeight);
return winner?.opinion || "コンセンサスに達しませんでした";
}
コンセンサス構築は信頼度の重み付けで多様な意見を統合します。
5. 完全なディベートワークフロー
// app/lib/agents/debate-workflow.ts
import { StateGraph, START, END } from "@langchain/langgraph";
import { DebateState } from "./advanced-types";
import { parallelExpertAnalysis } from "./parallel-coordinator";
import { buildConsensus } from "./consensus";
export function createDebateWorkflow() {
const MAX_ROUNDS = 5;
const workflow = new StateGraph(DebateState)
.addNode("parallel_experts", parallelExpertAnalysis)
.addNode("consensus", buildConsensus)
// 並列エキスパート分析から開始
.addEdge(START, "parallel_experts")
// 収束に基づく条件付きルーティング
.addConditionalEdges(
"parallel_experts",
(state) => {
if (state.hasConverged || state.round >= MAX_ROUNDS) {
return "consensus";
}
return "parallel_experts"; // ディベートを継続
},
{
parallel_experts: "parallel_experts",
consensus: "consensus"
}
)
.addEdge("consensus", END);
return workflow.compile({
recursionLimit: MAX_ROUNDS * 2
});
}
ワークフローは自動収束検出を備えた反復ディベートを実装します。
6. リアルタイム更新用のストリーミングAPI
// app/api/debate/route.ts
import { NextRequest } from "next/server";
import { createDebateWorkflow } from "@/lib/agents/debate-workflow";
export const maxDuration = 300;
export async function POST(req: NextRequest) {
const { question } = await req.json();
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
const workflow = createDebateWorkflow();
try {
// イベントが発生したらストリーミング
for await (const event of workflow.stream(
{ question, round: 0 },
{ streamMode: "values" }
)) {
const data = JSON.stringify({
type: "update",
round: event.round,
opinions: event.opinions?.slice(-4), // 最後の4つの意見
hasConverged: event.hasConverged
});
controller.enqueue(
encoder.encode(`data: ${data}\n\n`)
);
}
// 最終コンセンサスを送信
const final = await workflow.invoke({ question, round: 0 });
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({
type: "complete",
consensus: final.consensus
})}\n\n`)
);
} catch (error) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({
type: "error",
message: "ディベートに失敗しました"
})}\n\n`)
);
} finally {
controller.close();
}
}
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
});
}
ストリーミングAPIはディベートプロセス中にリアルタイム更新を提供します。
7. リアルタイムディベート可視化
// app/components/DebateVisualization.tsx
"use client";
import { useEffect, useState } from "react";
import { groupBy, sortBy } from "es-toolkit";
interface Opinion {
agent: string;
opinion: string;
confidence: number;
reasoning: string;
}
export default function DebateVisualization() {
const [question, setQuestion] = useState("");
const [isDebating, setIsDebating] = useState(false);
const [rounds, setRounds] = useState<Opinion[][]>([]);
const [consensus, setConsensus] = useState<string>("");
const startDebate = async () => {
setIsDebating(true);
setRounds([]);
setConsensus("");
const response = await fetch("/api/debate", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ question })
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.type === "update" && data.opinions) {
setRounds(prev => [...prev, data.opinions]);
} else if (data.type === "complete") {
setConsensus(data.consensus);
setIsDebating(false);
}
}
}
}
};
return (
<div className="container mx-auto p-4">
<div className="card bg-base-100 shadow-xl mb-6">
<div className="card-body">
<h2 className="card-title">マルチエージェントディベートシステム</h2>
<input
type="text"
placeholder="ディベート用の質問を入力してください..."
className="input input-bordered w-full"
value={question}
onChange={(e) => setQuestion(e.target.value)}
/>
<button
className="btn btn-primary"
onClick={startDebate}
disabled={isDebating || !question}
>
{isDebating ? (
<>
<span className="loading loading-spinner" />
ディベート中...
</>
) : (
"ディベートを開始"
)}
</button>
</div>
</div>
{/* ディベートラウンドの可視化 */}
{rounds.length > 0 && (
<div className="grid grid-cols-1 md:grid-cols-2 gap-4 mb-6">
{rounds[rounds.length - 1].map((opinion, idx) => (
<div key={idx} className="card bg-base-200">
<div className="card-body">
<div className="flex justify-between items-center">
<h3 className="font-bold">{opinion.agent}</h3>
<div className="badge badge-primary">
{Math.round(opinion.confidence * 100)}% 確信度
</div>
</div>
<p className="text-sm mt-2">{opinion.opinion}</p>
<progress
className="progress progress-primary w-full"
value={opinion.confidence}
max="1"
/>
</div>
</div>
))}
</div>
)}
{/* コンセンサス結果 */}
{consensus && (
<div className="alert alert-success">
<div>
<h3 className="font-bold">コンセンサスに到達:</h3>
<p>{consensus}</p>
</div>
</div>
)}
</div>
);
}
リアルタイム可視化は信頼度インジケーターを使用してディベートの進行を表示します。
まとめ
マルチエージェント協調は、専門エージェント間でタスクを分散することにより、複雑な問題解決を変革します。スーパーバイザー・ワーカーパターンは、カスタマーサービスやリサーチタスクに理想的な集中制御を提供し、並列ディベートシステムは重要な意思決定のための多様な視点の収集に優れています。主な考慮事項には、調整オーバーヘッドの管理(チームを7エージェント以下に保つ)、サーバーレス環境用の適切なタイムアウト処理の実装、コンセンサス構築のための信頼度重み付け投票の使用が含まれます。LangGraphのステートフルオーケストレーションとVercelのサーバーレスインフラストラクチャを使用することで、洗練された調整パターンを通じて品質を維持しながら、2〜10倍の生産性向上を実現する本番環境対応のマルチエージェントシステムを構築できます。