ドラフト エージェント設計パターン - 並列化
LangChain、LangGraph、Next.js 15を使用してVercelのサーバーレスプラットフォーム上で複数のタスクを同時に実行する高性能AIエージェントの構築方法を学びます。複数のトピックを研究し、様々なAPIを呼び出し、データを並列処理できるエージェントを作成します—実行時間を分単位から秒単位に短縮します。
メンタルモデル:逐次的な信号機から並列ハイウェイシステムへ
AIエージェントの並列化を、信号機のある単一車線道路(逐次)から多車線ハイウェイシステム(並列)へのアップグレードと考えてみましょう。逐次実行では、車(タスク)は各信号(ステップ)で待機してから進みます。並列化では、複数の車線により車が同時に流れ、車線が合流する地点(集約)があります。ハイウェイのランプが動的な交通の流入を可能にするように(動的並列化)、LangGraphのSend APIは実行時に新しい並列タスクの生成を可能にします。Vercelのサーバーレス関数は、複数の車を同時に処理できる料金所として機能し(関数内並行性)、同じインフラストラクチャを維持しながらスループットを劇的に向上させます。
基本的な並列エージェントの実装
1. RunnableParallelを使用した並列研究エージェントの作成
// lib/agents/parallel-research.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { RunnableParallel, RunnablePassthrough } from '@langchain/core/runnables';
import { PromptTemplate } from '@langchain/core/prompts';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { TavilySearchResults } from '@langchain/community/tools/tavily_search';
import { map, pick } from 'es-toolkit';
import { z } from 'zod';
const ResearchSchema = z.object({
topic: z.string(),
summary: z.string(),
keyPoints: z.array(z.string()),
sources: z.array(z.string())
});
export function createParallelResearchAgent() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
});
const searchTool = new TavilySearchResults({
maxResults: 3,
});
// 並列研究ブランチを定義
const technicalResearch = PromptTemplate.fromTemplate(
`Research technical aspects of: {topic}
Focus on: implementation details, architecture, performance metrics`
).pipe(model).pipe(new StringOutputParser());
const businessResearch = PromptTemplate.fromTemplate(
`Research business impact of: {topic}
Focus on: market size, ROI, case studies, adoption rates`
).pipe(model).pipe(new StringOutputParser());
const futureResearch = PromptTemplate.fromTemplate(
`Research future trends of: {topic}
Focus on: predictions, emerging patterns, expert opinions`
).pipe(model).pipe(new StringOutputParser());
// すべての研究ブランチを並列で実行
const parallelChain = RunnableParallel.from({
technical: technicalResearch,
business: businessResearch,
future: futureResearch,
topic: RunnablePassthrough(),
});
// 結果を結合するための統合チェーン
const synthesisPrompt = PromptTemplate.fromTemplate(
`Synthesize the following research on {topic}:
Technical Research: {technical}
Business Research: {business}
Future Research: {future}
Create a comprehensive summary with key insights.`
);
return parallelChain
.pipe(synthesisPrompt)
.pipe(model)
.pipe(new StringOutputParser());
}
3つの研究ブランチを同時に実行し、逐次実行と比較して研究時間を66%削減します。
2. 並列進行状況のストリーミング機能を備えたAPIルート
// app/api/parallel-research/route.ts
import { createParallelResearchAgent } from '@/lib/agents/parallel-research';
import { NextRequest } from 'next/server';
export const runtime = 'nodejs';
export const maxDuration = 300;
export async function POST(req: NextRequest) {
const { topic } = await req.json();
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const agent = createParallelResearchAgent();
// バックグラウンドで処理
(async () => {
try {
// 初期進行状況を送信
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'start',
message: '3つの側面で並列研究を開始しています...'
})}\n\n`)
);
// 並列研究を実行
const result = await agent.invoke({ topic });
// 完了を送信
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'complete',
content: result
})}\n\n`)
);
} catch (error) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'error',
error: String(error)
})}\n\n`)
);
} finally {
await writer.close();
}
})();
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
並列エージェント実行から進行状況の更新をストリーミングするサーバー送信イベントを実装します。
3. TanStack Queryを使用したフロントエンドコンポーネント
// components/ParallelResearchInterface.tsx
'use client';
import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';
import { groupBy } from 'es-toolkit';
interface ResearchEvent {
type: 'start' | 'progress' | 'complete' | 'error';
message?: string;
content?: string;
error?: string;
}
export default function ParallelResearchInterface() {
const [topic, setTopic] = useState('');
const [events, setEvents] = useState<ResearchEvent[]>([]);
const researchMutation = useMutation({
mutationFn: async (researchTopic: string) => {
setEvents([]);
const res = await fetch('/api/parallel-research', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ topic: researchTopic }),
});
if (!res.ok) throw new Error('研究が失敗しました');
const reader = res.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const event = JSON.parse(line.slice(6));
setEvents(prev => [...prev, event]);
} catch {}
}
}
}
},
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (topic.trim()) {
researchMutation.mutate(topic);
}
};
const groupedEvents = groupBy(events, (e) => e.type);
const hasCompleted = groupedEvents.complete?.length > 0;
return (
<div className="card w-full bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">並列研究エージェント</h2>
<form onSubmit={handleSubmit}>
<div className="form-control">
<input
type="text"
className="input input-bordered"
placeholder="研究トピックを入力..."
value={topic}
onChange={(e) => setTopic(e.target.value)}
disabled={researchMutation.isPending}
/>
</div>
<button
type="submit"
className="btn btn-primary mt-4"
disabled={researchMutation.isPending || !topic.trim()}
>
{researchMutation.isPending ? (
<>
<span className="loading loading-spinner"></span>
並列で研究中...
</>
) : '研究を開始'}
</button>
</form>
{events.length > 0 && (
<div className="mt-6">
<div className="steps steps-vertical">
{events.map((event, idx) => (
<li key={idx} className={`step ${
event.type === 'complete' ? 'step-success' :
event.type === 'error' ? 'step-error' :
'step-primary'
}`}>
<div className="text-left ml-4">
{event.message || event.content || event.error}
</div>
</li>
))}
</div>
</div>
)}
{hasCompleted && groupedEvents.complete[0].content && (
<div className="alert alert-success mt-4">
<div className="prose max-w-none">
{groupedEvents.complete[0].content}
</div>
</div>
)}
</div>
</div>
);
}
TanStack Queryを使用してリアルタイムの進行状況更新で並列研究を管理するフロントエンドコンポーネント。
LangGraphを使用した高度な並列ワークフロー
1. Send APIを使用した動的並列実行
// lib/workflows/dynamic-parallel.ts
import { StateGraph, Send, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage, HumanMessage } from '@langchain/core/messages';
import { partition, chunk as chunkArray } from 'es-toolkit';
import { z } from 'zod';
// 状態スキーマを定義
const WorkflowStateSchema = z.object({
query: z.string(),
companies: z.array(z.string()),
results: z.record(z.string(), z.any()),
finalReport: z.string().optional(),
});
type WorkflowState = z.infer<typeof WorkflowStateSchema>;
interface CompanyResearch {
company: string;
revenue: number;
employees: number;
founded: number;
}
export function createDynamicParallelWorkflow() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0,
});
// 状態グラフを作成
const workflow = new StateGraph<WorkflowState>({
channels: {
query: {
value: null,
},
companies: {
value: (x: string[], y: string[]) => [...x, ...y],
default: () => [],
},
results: {
value: (x: Record<string, any>, y: Record<string, any>) => ({...x, ...y}),
default: () => ({}),
},
finalReport: {
value: null,
},
},
});
// 分解ノード - 研究する企業を特定
workflow.addNode('decompose', async (state) => {
const prompt = `Given the query: "${state.query}"
List all companies that need to be researched (comma-separated, no explanation):`;
const response = await model.invoke([new HumanMessage(prompt)]);
const companies = String(response.content)
.split(',')
.map(c => c.trim())
.filter(Boolean);
return { companies };
});
// マップノード - 各企業に対して並列研究を生成
workflow.addNode('map', async (state) => {
// Send APIを使用して動的並列ブランチを作成
const sends = state.companies.map(company =>
new Send('research', { company, query: state.query })
);
return sends;
});
// 研究ノード - 個々の企業を研究
workflow.addNode('research', async (state: any) => {
const { company, query } = state;
// モックデータで研究をシミュレート
const mockData: CompanyResearch = {
company,
revenue: Math.floor(Math.random() * 1000) + 100,
employees: Math.floor(Math.random() * 10000) + 100,
founded: 2000 + Math.floor(Math.random() * 25),
};
return {
results: {
[company]: mockData
}
};
});
// リデュースノード - すべての研究結果を集約
workflow.addNode('reduce', async (state) => {
const companies = Object.values(state.results) as CompanyResearch[];
// 収益でソートしてレポートを作成
const sorted = companies.sort((a, b) => b.revenue - a.revenue);
const report = `
# 企業分析レポート
## クエリ: ${state.query}
## 収益トップ企業:
${sorted.map((c, i) => `
${i + 1}. **${c.company}**
- 収益: $${c.revenue}M
- 従業員数: ${c.employees.toLocaleString()}
- 設立: ${c.founded}
`).join('')}
## 統計サマリー:
- 分析企業総数: ${companies.length}
- 平均収益: $${Math.round(companies.reduce((sum, c) => sum + c.revenue, 0) / companies.length)}M
- 総従業員数: ${companies.reduce((sum, c) => sum + c.employees, 0).toLocaleString()}
`;
return { finalReport: report };
});
// ワークフローエッジを定義
workflow.addEdge('decompose', 'map');
workflow.addEdge('map', 'research');
workflow.addEdge('research', 'reduce');
workflow.addEdge('reduce', END);
workflow.setEntryPoint('decompose');
return workflow.compile();
}
クエリの分解に基づいて実行時に並列タスク数が決定される動的並列化を実装します。
2. バッチ処理のためのMap-Reduceパターン
// lib/workflows/map-reduce-batch.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { chunk, flatten, groupBy } from 'es-toolkit';
interface BatchState {
documents: string[];
batchSize: number;
processedBatches: Record<string, any>;
summary: string;
}
export function createMapReduceBatchWorkflow() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
});
const workflow = new StateGraph<BatchState>({
channels: {
documents: {
value: null,
},
batchSize: {
value: null,
default: () => 5,
},
processedBatches: {
value: (x: Record<string, any>, y: Record<string, any>) => ({...x, ...y}),
default: () => ({}),
},
summary: {
value: null,
},
},
});
// 並列処理のためにドキュメントをバッチ化
workflow.addNode('createBatches', async (state) => {
const batches = chunk(state.documents, state.batchSize);
// バッチを並列で処理
const processingPromises = batches.map(async (batch, index) => {
const batchPrompt = `これらのドキュメントを分析して主要な洞察を抽出してください:
${batch.map((doc, i) => `ドキュメント ${i + 1}: ${doc}`).join('\n')}
主要テーマとパターンを含む構造化された分析を提供してください。`;
const response = await model.invoke([
{ role: 'user', content: batchPrompt }
]);
return {
[`batch_${index}`]: {
documents: batch.length,
analysis: response.content,
}
};
});
// すべてのバッチが完了するまで待機
const results = await Promise.all(processingPromises);
const merged = Object.assign({}, ...results);
return { processedBatches: merged };
});
// すべてのバッチ結果を最終サマリーに削減
workflow.addNode('reduceBatches', async (state) => {
const allAnalyses = Object.values(state.processedBatches)
.map(batch => batch.analysis)
.join('\n\n');
const reducePrompt = `これらのバッチ分析を包括的なサマリーに統合してください:
${allAnalyses}
以下を強調した統一レポートを作成してください:
1. すべてのバッチに共通するテーマ
2. 特定のバッチからの独自の洞察
3. 全体的なパターンと結論`;
const response = await model.invoke([
{ role: 'user', content: reducePrompt }
]);
return { summary: String(response.content) };
});
workflow.addEdge('createBatches', 'reduceBatches');
workflow.addEdge('reduceBatches', END);
workflow.setEntryPoint('createBatches');
return workflow.compile();
}
大規模なドキュメントセットを並列バッチで処理するためのmap-reduceパターンを実装します。
3. 状態管理を備えた並列エージェントの調整
// lib/workflows/coordinated-agents.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { TavilySearchResults } from '@langchain/community/tools/tavily_search';
import { Calculator } from '@langchain/community/tools/calculator';
import { uniqBy, sortBy } from 'es-toolkit';
interface CoordinatedState {
task: string;
researchData: any[];
calculations: any[];
validation: boolean;
finalOutput: string;
}
export function createCoordinatedAgentsWorkflow() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-pro',
temperature: 0.2,
});
const searchTool = new TavilySearchResults({ maxResults: 5 });
const calculator = new Calculator();
const workflow = new StateGraph<CoordinatedState>({
channels: {
task: { value: null },
researchData: {
value: (x: any[], y: any[]) => [...x, ...y],
default: () => [],
},
calculations: {
value: (x: any[], y: any[]) => [...x, ...y],
default: () => [],
},
validation: {
value: null,
default: () => false,
},
finalOutput: { value: null },
},
});
// 研究エージェントと計算エージェントの並列実行
workflow.addNode('parallelAgents', async (state) => {
// 研究エージェント
const researchPromise = (async () => {
const searchQuery = `${state.task} 最新データ統計ファクト`;
const results = await searchTool.invoke(searchQuery);
const analysisPrompt = `これらの検索結果を分析してください: ${state.task}
結果: ${JSON.stringify(results)}
主要なデータポイントと洞察を抽出してください。`;
const analysis = await model.invoke([
{ role: 'user', content: analysisPrompt }
]);
return {
researchData: [{
source: 'web_search',
content: analysis.content,
timestamp: new Date().toISOString(),
}]
};
})();
// 計算エージェント
const calculationPromise = (async () => {
// タスクから計算用の数値を抽出
const numbers = state.task.match(/\d+/g)?.map(Number) || [];
if (numbers.length >= 2) {
const calculations = [];
// 様々な計算を実行
calculations.push({
operation: '合計',
result: await calculator.invoke(
`${numbers.join(' + ')}`
),
});
calculations.push({
operation: '平均',
result: await calculator.invoke(
`(${numbers.join(' + ')}) / ${numbers.length}`
),
});
return { calculations };
}
return { calculations: [] };
})();
// 両方のエージェントを並列で実行
const [research, calcs] = await Promise.all([
researchPromise,
calculationPromise,
]);
return { ...research, ...calcs };
});
// 検証ノード - 並列結果をチェック
workflow.addNode('validate', async (state) => {
const hasResearch = state.researchData.length > 0;
const hasCalculations = state.calculations.length > 0;
const validationPrompt = `これらの並列エージェント結果の一貫性を検証してください:
研究: ${JSON.stringify(state.researchData)}
計算: ${JSON.stringify(state.calculations)}
結果は一貫性があり信頼できますか?(YES/NOのみ)`;
const response = await model.invoke([
{ role: 'user', content: validationPrompt }
]);
const isValid = String(response.content).toUpperCase().includes('YES');
return { validation: isValid };
});
// 統合ノード - 検証された結果を結合
workflow.addNode('synthesize', async (state) => {
if (!state.validation) {
return {
finalOutput: '検証が失敗しました。並列エージェントからの結果に一貫性がありません。'
};
}
const synthesisPrompt = `以下のタスクの包括的な応答を作成してください: ${state.task}
並列エージェントから検証されたデータを使用:
- 研究結果: ${JSON.stringify(state.researchData)}
- 計算: ${JSON.stringify(state.calculations)}
両方のソースを組み合わせて構造化された回答を提供してください。`;
const response = await model.invoke([
{ role: 'user', content: synthesisPrompt }
]);
return { finalOutput: String(response.content) };
});
// ワークフローエッジを定義
workflow.addEdge('parallelAgents', 'validate');
workflow.addEdge('validate', 'synthesize');
workflow.addEdge('synthesize', END);
workflow.setEntryPoint('parallelAgents');
return workflow.compile();
}
検証と状態同期を備えた調整された並列エージェントを実装します。
4. 進行状況ストリーミング機能を備えた動的ワークフローのAPIルート
// app/api/dynamic-workflow/route.ts
import { createDynamicParallelWorkflow } from '@/lib/workflows/dynamic-parallel';
import { NextRequest } from 'next/server';
export const runtime = 'nodejs';
export const maxDuration = 777; // 800秒制限以下の安全なバッファー
export async function POST(req: NextRequest) {
const { query } = await req.json();
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const workflow = createDynamicParallelWorkflow();
(async () => {
try {
let stepCount = 0;
// ワークフローイベントをストリーム
const eventStream = await workflow.stream({
query,
companies: [],
results: {},
});
for await (const event of eventStream) {
stepCount++;
// ステップ更新を送信
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'step',
stepNumber: stepCount,
node: Object.keys(event)[0],
preview: JSON.stringify(event).substring(0, 100) + '...'
})}\n\n`)
);
// 利用可能な場合は最終レポートを送信
if (event.reduce?.finalReport) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'complete',
report: event.reduce.finalReport
})}\n\n`)
);
}
}
} catch (error) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'error',
error: String(error)
})}\n\n`)
);
} finally {
await writer.close();
}
})();
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
リアルタイムの進行状況ストリーミングで動的ワークフローの実行を処理するAPIルート。
5. 並列ストリーム可視化を備えた高度なフロントエンド
// components/DynamicWorkflowInterface.tsx
'use client';
import { useState, useEffect } from 'react';
import { useMutation } from '@tanstack/react-query';
import { partition, groupBy } from 'es-toolkit';
interface WorkflowEvent {
type: 'step' | 'complete' | 'error';
stepNumber?: number;
node?: string;
preview?: string;
report?: string;
error?: string;
}
interface ParallelStream {
id: string;
status: 'pending' | 'active' | 'complete';
result?: any;
}
export default function DynamicWorkflowInterface() {
const [query, setQuery] = useState('');
const [events, setEvents] = useState<WorkflowEvent[]>([]);
const [parallelStreams, setParallelStreams] = useState<ParallelStream[]>([]);
const workflowMutation = useMutation({
mutationFn: async (workflowQuery: string) => {
setEvents([]);
setParallelStreams([]);
const res = await fetch('/api/dynamic-workflow', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query: workflowQuery }),
});
if (!res.ok) throw new Error('ワークフローが失敗しました');
const reader = res.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const event = JSON.parse(line.slice(6)) as WorkflowEvent;
setEvents(prev => [...prev, event]);
// 並列ストリームの可視化を更新
if (event.node === 'research') {
setParallelStreams(prev => {
const newStream: ParallelStream = {
id: `stream-${prev.length}`,
status: 'active'
};
return [...prev, newStream];
});
}
} catch {}
}
}
}
},
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (query.trim()) {
workflowMutation.mutate(query);
}
};
const [stepEvents, otherEvents] = partition(
events,
(e) => e.type === 'step'
);
const completeEvent = events.find(e => e.type === 'complete');
return (
<div className="w-full space-y-4">
{/* 入力フォーム */}
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">動的並列ワークフロー</h2>
<form onSubmit={handleSubmit}>
<div className="form-control">
<label className="label">
<span className="label-text">並列分析のクエリを入力</span>
</label>
<input
type="text"
className="input input-bordered"
placeholder="例:トップテック企業の収益を分析..."
value={query}
onChange={(e) => setQuery(e.target.value)}
disabled={workflowMutation.isPending}
/>
</div>
<button
type="submit"
className="btn btn-primary mt-4"
disabled={workflowMutation.isPending || !query.trim()}
>
{workflowMutation.isPending ? (
<>
<span className="loading loading-spinner"></span>
並列ワークフローを実行中...
</>
) : 'ワークフローを実行'}
</button>
</form>
</div>
</div>
{/* 並列ストリームの可視化 */}
{parallelStreams.length > 0 && (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h3 className="card-title">並列実行ストリーム</h3>
<div className="grid grid-cols-4 gap-2">
{parallelStreams.map(stream => (
<div
key={stream.id}
className={`p-2 rounded ${
stream.status === 'active'
? 'bg-primary text-primary-content animate-pulse'
: 'bg-success text-success-content'
}`}
>
<div className="text-xs font-bold">
{stream.id}
</div>
<div className="text-xs">
{stream.status}
</div>
</div>
))}
</div>
<div className="text-sm text-base-content/70 mt-2">
{parallelStreams.length}個の並列操作が検出されました
</div>
</div>
</div>
)}
{/* ワークフローステップ */}
{stepEvents.length > 0 && (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h3 className="card-title">ワークフローの進行状況</h3>
<ul className="steps steps-vertical">
{stepEvents.map((event, idx) => (
<li key={idx} className="step step-primary">
<div className="text-left ml-4">
<div className="font-semibold">{event.node}</div>
<div className="text-sm opacity-70">
ステップ {event.stepNumber}
</div>
{event.preview && (
<div className="text-xs font-mono opacity-50">
{event.preview}
</div>
)}
</div>
</li>
))}
</ul>
</div>
</div>
)}
{/* 最終レポート */}
{completeEvent?.report && (
<div className="card bg-success text-success-content shadow-xl">
<div className="card-body">
<h3 className="card-title">分析完了</h3>
<div className="prose prose-invert max-w-none">
<pre className="whitespace-pre-wrap">
{completeEvent.report}
</pre>
</div>
</div>
</div>
)}
{/* エラー表示 */}
{workflowMutation.isError && (
<div className="alert alert-error">
<span>ワークフローが失敗しました: {workflowMutation.error?.message}</span>
</div>
)}
</div>
);
}
リアルタイム更新で並列ストリーム実行を可視化する高度なフロントエンドコンポーネント。
6. リトライロジックを備えたエラー耐性のある並列パターン
// lib/patterns/resilient-parallel.ts
import { RunnableParallel } from '@langchain/core/runnables';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { retry, delay, take } from 'es-toolkit';
import pLimit from 'p-limit';
interface ParallelTaskConfig {
maxRetries?: number;
concurrencyLimit?: number;
timeoutMs?: number;
backoffMs?: number;
}
export class ResilientParallelExecutor {
private model: ChatGoogleGenerativeAI;
private config: Required<ParallelTaskConfig>;
constructor(config: ParallelTaskConfig = {}) {
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
});
this.config = {
maxRetries: config.maxRetries ?? 3,
concurrencyLimit: config.concurrencyLimit ?? 10,
timeoutMs: config.timeoutMs ?? 30000,
backoffMs: config.backoffMs ?? 1000,
};
}
async executeWithRetry<T>(
task: () => Promise<T>,
taskName: string
): Promise<T | null> {
for (let attempt = 1; attempt <= this.config.maxRetries; attempt++) {
try {
// タイムアウトラッパーを追加
const result = await Promise.race([
task(),
new Promise<never>((_, reject) =>
setTimeout(
() => reject(new Error(`${this.config.timeoutMs}ms後にタイムアウト`)),
this.config.timeoutMs
)
),
]);
return result;
} catch (error) {
console.error(`タスク ${taskName} 試行 ${attempt} が失敗しました:`, error);
if (attempt < this.config.maxRetries) {
// ジッターを含む指数バックオフ
const backoff = this.config.backoffMs * Math.pow(2, attempt - 1);
const jitter = Math.random() * 1000;
await delay(backoff + jitter);
} else {
console.error(`タスク ${taskName} は ${attempt} 回の試行後に失敗しました`);
return null; // グレースフルデグラデーション
}
}
}
return null;
}
async executeParallelTasks<T>(
tasks: Array<{ name: string; fn: () => Promise<T> }>
): Promise<Array<{ name: string; result: T | null; success: boolean }>> {
// 同時実行制限を作成
const limit = pLimit(this.config.concurrencyLimit);
// リトライと同時実行制御でタスクを実行
const results = await Promise.all(
tasks.map(({ name, fn }) =>
limit(async () => {
const result = await this.executeWithRetry(fn, name);
return {
name,
result,
success: result !== null,
};
})
)
);
// サマリーをログ
const successful = results.filter(r => r.success).length;
console.log(
`並列実行完了: ${successful}/${tasks.length} 成功`
);
return results;
}
async executeBatchedParallel<T>(
items: T[],
batchSize: number,
processor: (batch: T[]) => Promise<any>
): Promise<any[]> {
const batches = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
const results = [];
// 制御された並列処理でバッチを処理
for (const batch of batches) {
const batchResults = await this.executeParallelTasks(
batch.map((item, idx) => ({
name: `batch-item-${idx}`,
fn: () => processor([item]),
}))
);
results.push(...batchResults);
}
return results;
}
}
リトライロジック、同時実行制限、グレースフルデグラデーションを備えた本番環境対応の並列実行を実装します。
まとめ
並列化により、独立したタスクを同時に実行することで、AIエージェントのパフォーマンスを分単位から秒単位に変換します。ここで示したパターン—基本的なRunnableParallelから動的なLangGraphワークフローまで—は、Vercelのサーバーレスプラットフォーム上で本番環境対応の並列エージェントシステムを構築するための基盤を提供します。主要なポイントには、効率的なデータ操作のためのes-toolkitの使用、リトライロジックを含む適切なエラー処理の実装、より良いUXのための進行状況更新のストリーミング、777秒の実行ウィンドウを最大化しながらサーバーレス制約を尊重することが含まれます。これらのパターンは、シンプルな並列研究エージェントから数百のタスクを同時に処理する複雑なマルチエージェントオーケストレーションシステムまでスケールします。