草案 代理设计模式 - 并行化
学习如何使用 LangChain、LangGraph 和 Next.js 15 在 Vercel 无服务器平台上构建同时执行多个任务的高性能 AI 代理。我们将创建可以研究多个主题、调用各种 API 并并行处理数据的代理——将执行时间从分钟级降到秒级。
心智模型:从顺序红绿灯到并行高速公路系统
将 AI 代理中的并行化想象成从单车道有红绿灯的道路(顺序)升级到多车道高速公路系统(并行)。在顺序执行中,汽车(任务)在每个红绿灯(步骤)处等待然后继续。通过并行化,多条车道允许汽车同时流动,在车道汇合的地方有合并点(聚合)。就像高速公路匝道支持动态交通进入(动态并行化)一样,LangGraph 的 Send API 允许在运行时生成新的并行任务。Vercel 的无服务器函数充当收费站,现在可以同时处理多辆汽车(函数内并发),在保持相同基础设施的同时大幅提高吞吐量。
基本并行代理实现
1. 使用 RunnableParallel 创建并行研究代理
// lib/agents/parallel-research.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { RunnableParallel, RunnablePassthrough } from '@langchain/core/runnables';
import { PromptTemplate } from '@langchain/core/prompts';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { TavilySearchResults } from '@langchain/community/tools/tavily_search';
import { map, pick } from 'es-toolkit';
import { z } from 'zod';
const ResearchSchema = z.object({
topic: z.string(),
summary: z.string(),
keyPoints: z.array(z.string()),
sources: z.array(z.string())
});
export function createParallelResearchAgent() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
});
const searchTool = new TavilySearchResults({
maxResults: 3,
});
// 定义并行研究分支
const technicalResearch = PromptTemplate.fromTemplate(
`Research technical aspects of: {topic}
Focus on: implementation details, architecture, performance metrics`
).pipe(model).pipe(new StringOutputParser());
const businessResearch = PromptTemplate.fromTemplate(
`Research business impact of: {topic}
Focus on: market size, ROI, case studies, adoption rates`
).pipe(model).pipe(new StringOutputParser());
const futureResearch = PromptTemplate.fromTemplate(
`Research future trends of: {topic}
Focus on: predictions, emerging patterns, expert opinions`
).pipe(model).pipe(new StringOutputParser());
// 并行执行所有研究分支
const parallelChain = RunnableParallel.from({
technical: technicalResearch,
business: businessResearch,
future: futureResearch,
topic: RunnablePassthrough(),
});
// 综合链合并结果
const synthesisPrompt = PromptTemplate.fromTemplate(
`Synthesize the following research on {topic}:
Technical Research: {technical}
Business Research: {business}
Future Research: {future}
Create a comprehensive summary with key insights.`
);
return parallelChain
.pipe(synthesisPrompt)
.pipe(model)
.pipe(new StringOutputParser());
}
创建三个同时执行的研究分支,与顺序执行相比,将研究时间减少 66%。
2. 具有并行进度流式传输的 API 路由
// app/api/parallel-research/route.ts
import { createParallelResearchAgent } from '@/lib/agents/parallel-research';
import { NextRequest } from 'next/server';
export const runtime = 'nodejs';
export const maxDuration = 300;
export async function POST(req: NextRequest) {
const { topic } = await req.json();
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const agent = createParallelResearchAgent();
// 在后台处理
(async () => {
try {
// 发送初始进度
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'start',
message: '正在 3 个方面启动并行研究...'
})}\n\n`)
);
// 执行并行研究
const result = await agent.invoke({ topic });
// 发送完成
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'complete',
content: result
})}\n\n`)
);
} catch (error) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'error',
error: String(error)
})}\n\n`)
);
} finally {
await writer.close();
}
})();
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
实现服务器发送事件以从并行代理执行流式传输进度更新。
3. 使用 TanStack Query 的前端组件
// components/ParallelResearchInterface.tsx
'use client';
import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';
import { groupBy } from 'es-toolkit';
interface ResearchEvent {
type: 'start' | 'progress' | 'complete' | 'error';
message?: string;
content?: string;
error?: string;
}
export default function ParallelResearchInterface() {
const [topic, setTopic] = useState('');
const [events, setEvents] = useState<ResearchEvent[]>([]);
const researchMutation = useMutation({
mutationFn: async (researchTopic: string) => {
setEvents([]);
const res = await fetch('/api/parallel-research', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ topic: researchTopic }),
});
if (!res.ok) throw new Error('研究失败');
const reader = res.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const event = JSON.parse(line.slice(6));
setEvents(prev => [...prev, event]);
} catch {}
}
}
}
},
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (topic.trim()) {
researchMutation.mutate(topic);
}
};
const groupedEvents = groupBy(events, (e) => e.type);
const hasCompleted = groupedEvents.complete?.length > 0;
return (
<div className="card w-full bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">并行研究代理</h2>
<form onSubmit={handleSubmit}>
<div className="form-control">
<input
type="text"
className="input input-bordered"
placeholder="输入研究主题..."
value={topic}
onChange={(e) => setTopic(e.target.value)}
disabled={researchMutation.isPending}
/>
</div>
<button
type="submit"
className="btn btn-primary mt-4"
disabled={researchMutation.isPending || !topic.trim()}
>
{researchMutation.isPending ? (
<>
<span className="loading loading-spinner"></span>
并行研究中...
</>
) : '开始研究'}
</button>
</form>
{events.length > 0 && (
<div className="mt-6">
<div className="steps steps-vertical">
{events.map((event, idx) => (
<li key={idx} className={`step ${
event.type === 'complete' ? 'step-success' :
event.type === 'error' ? 'step-error' :
'step-primary'
}`}>
<div className="text-left ml-4">
{event.message || event.content || event.error}
</div>
</li>
))}
</div>
</div>
)}
{hasCompleted && groupedEvents.complete[0].content && (
<div className="alert alert-success mt-4">
<div className="prose max-w-none">
{groupedEvents.complete[0].content}
</div>
</div>
)}
</div>
</div>
);
}
使用 TanStack Query 管理具有实时进度更新的并行研究的前端组件。
使用 LangGraph 的高级并行工作流
1. 使用 Send API 的动态并行执行
// lib/workflows/dynamic-parallel.ts
import { StateGraph, Send, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage, HumanMessage } from '@langchain/core/messages';
import { partition, chunk as chunkArray } from 'es-toolkit';
import { z } from 'zod';
// 定义状态模式
const WorkflowStateSchema = z.object({
query: z.string(),
companies: z.array(z.string()),
results: z.record(z.string(), z.any()),
finalReport: z.string().optional(),
});
type WorkflowState = z.infer<typeof WorkflowStateSchema>;
interface CompanyResearch {
company: string;
revenue: number;
employees: number;
founded: number;
}
export function createDynamicParallelWorkflow() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0,
});
// 创建状态图
const workflow = new StateGraph<WorkflowState>({
channels: {
query: {
value: null,
},
companies: {
value: (x: string[], y: string[]) => [...x, ...y],
default: () => [],
},
results: {
value: (x: Record<string, any>, y: Record<string, any>) => ({...x, ...y}),
default: () => ({}),
},
finalReport: {
value: null,
},
},
});
// 分解节点 - 识别要研究的公司
workflow.addNode('decompose', async (state) => {
const prompt = `Given the query: "${state.query}"
List all companies that need to be researched (comma-separated, no explanation):`;
const response = await model.invoke([new HumanMessage(prompt)]);
const companies = String(response.content)
.split(',')
.map(c => c.trim())
.filter(Boolean);
return { companies };
});
// 映射节点 - 为每个公司生成并行研究
workflow.addNode('map', async (state) => {
// 使用 Send API 创建动态并行分支
const sends = state.companies.map(company =>
new Send('research', { company, query: state.query })
);
return sends;
});
// 研究节点 - 研究单个公司
workflow.addNode('research', async (state: any) => {
const { company, query } = state;
// 使用模拟数据模拟研究
const mockData: CompanyResearch = {
company,
revenue: Math.floor(Math.random() * 1000) + 100,
employees: Math.floor(Math.random() * 10000) + 100,
founded: 2000 + Math.floor(Math.random() * 25),
};
return {
results: {
[company]: mockData
}
};
});
// 归约节点 - 聚合所有研究结果
workflow.addNode('reduce', async (state) => {
const companies = Object.values(state.results) as CompanyResearch[];
// 按收入排序并创建报告
const sorted = companies.sort((a, b) => b.revenue - a.revenue);
const report = `
# 公司分析报告
## 查询: ${state.query}
## 按收入排名的顶级公司:
${sorted.map((c, i) => `
${i + 1}. **${c.company}**
- 收入: $${c.revenue}M
- 员工数: ${c.employees.toLocaleString()}
- 成立: ${c.founded}
`).join('')}
## 统计摘要:
- 分析公司总数: ${companies.length}
- 平均收入: $${Math.round(companies.reduce((sum, c) => sum + c.revenue, 0) / companies.length)}M
- 总员工数: ${companies.reduce((sum, c) => sum + c.employees, 0).toLocaleString()}
`;
return { finalReport: report };
});
// 定义工作流边
workflow.addEdge('decompose', 'map');
workflow.addEdge('map', 'research');
workflow.addEdge('research', 'reduce');
workflow.addEdge('reduce', END);
workflow.setEntryPoint('decompose');
return workflow.compile();
}
实现动态并行化,其中并行任务的数量在运行时根据查询分解确定。
2. 批处理的 Map-Reduce 模式
// lib/workflows/map-reduce-batch.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { chunk, flatten, groupBy } from 'es-toolkit';
interface BatchState {
documents: string[];
batchSize: number;
processedBatches: Record<string, any>;
summary: string;
}
export function createMapReduceBatchWorkflow() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
});
const workflow = new StateGraph<BatchState>({
channels: {
documents: {
value: null,
},
batchSize: {
value: null,
default: () => 5,
},
processedBatches: {
value: (x: Record<string, any>, y: Record<string, any>) => ({...x, ...y}),
default: () => ({}),
},
summary: {
value: null,
},
},
});
// 为并行处理批量文档
workflow.addNode('createBatches', async (state) => {
const batches = chunk(state.documents, state.batchSize);
// 并行处理批次
const processingPromises = batches.map(async (batch, index) => {
const batchPrompt = `分析这些文档并提取关键见解:
${batch.map((doc, i) => `文档 ${i + 1}: ${doc}`).join('\n')}
提供包含主要主题和模式的结构化分析。`;
const response = await model.invoke([
{ role: 'user', content: batchPrompt }
]);
return {
[`batch_${index}`]: {
documents: batch.length,
analysis: response.content,
}
};
});
// 等待所有批次完成
const results = await Promise.all(processingPromises);
const merged = Object.assign({}, ...results);
return { processedBatches: merged };
});
// 将所有批次结果归约为最终摘要
workflow.addNode('reduceBatches', async (state) => {
const allAnalyses = Object.values(state.processedBatches)
.map(batch => batch.analysis)
.join('\n\n');
const reducePrompt = `将这些批次分析综合成一份全面的摘要:
${allAnalyses}
创建一份统一的报告,突出显示:
1. 所有批次的共同主题
2. 特定批次的独特见解
3. 整体模式和结论`;
const response = await model.invoke([
{ role: 'user', content: reducePrompt }
]);
return { summary: String(response.content) };
});
workflow.addEdge('createBatches', 'reduceBatches');
workflow.addEdge('reduceBatches', END);
workflow.setEntryPoint('createBatches');
return workflow.compile();
}
实现 map-reduce 模式以并行批量处理大型文档集。
3. 具有状态管理的并行代理协调
// lib/workflows/coordinated-agents.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { TavilySearchResults } from '@langchain/community/tools/tavily_search';
import { Calculator } from '@langchain/community/tools/calculator';
import { uniqBy, sortBy } from 'es-toolkit';
interface CoordinatedState {
task: string;
researchData: any[];
calculations: any[];
validation: boolean;
finalOutput: string;
}
export function createCoordinatedAgentsWorkflow() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-pro',
temperature: 0.2,
});
const searchTool = new TavilySearchResults({ maxResults: 5 });
const calculator = new Calculator();
const workflow = new StateGraph<CoordinatedState>({
channels: {
task: { value: null },
researchData: {
value: (x: any[], y: any[]) => [...x, ...y],
default: () => [],
},
calculations: {
value: (x: any[], y: any[]) => [...x, ...y],
default: () => [],
},
validation: {
value: null,
default: () => false,
},
finalOutput: { value: null },
},
});
// 研究和计算代理的并行执行
workflow.addNode('parallelAgents', async (state) => {
// 研究代理
const researchPromise = (async () => {
const searchQuery = `${state.task} 最新数据统计事实`;
const results = await searchTool.invoke(searchQuery);
const analysisPrompt = `分析这些搜索结果: ${state.task}
结果: ${JSON.stringify(results)}
提取关键数据点和见解。`;
const analysis = await model.invoke([
{ role: 'user', content: analysisPrompt }
]);
return {
researchData: [{
source: 'web_search',
content: analysis.content,
timestamp: new Date().toISOString(),
}]
};
})();
// 计算代理
const calculationPromise = (async () => {
// 从任务中提取数字进行计算
const numbers = state.task.match(/\d+/g)?.map(Number) || [];
if (numbers.length >= 2) {
const calculations = [];
// 执行各种计算
calculations.push({
operation: '总和',
result: await calculator.invoke(
`${numbers.join(' + ')}`
),
});
calculations.push({
operation: '平均',
result: await calculator.invoke(
`(${numbers.join(' + ')}) / ${numbers.length}`
),
});
return { calculations };
}
return { calculations: [] };
})();
// 并行执行两个代理
const [research, calcs] = await Promise.all([
researchPromise,
calculationPromise,
]);
return { ...research, ...calcs };
});
// 验证节点 - 检查并行结果
workflow.addNode('validate', async (state) => {
const hasResearch = state.researchData.length > 0;
const hasCalculations = state.calculations.length > 0;
const validationPrompt = `验证这些并行代理结果的一致性:
研究: ${JSON.stringify(state.researchData)}
计算: ${JSON.stringify(state.calculations)}
结果是否一致且可靠?(仅回答YES或NO)`;
const response = await model.invoke([
{ role: 'user', content: validationPrompt }
]);
const isValid = String(response.content).toUpperCase().includes('YES');
return { validation: isValid };
});
// 综合节点 - 合并已验证的结果
workflow.addNode('synthesize', async (state) => {
if (!state.validation) {
return {
finalOutput: '验证失败。并行代理的结果不一致。'
};
}
const synthesisPrompt = `为以下内容创建全面的响应: ${state.task}
使用来自并行代理的已验证数据:
- 研究发现: ${JSON.stringify(state.researchData)}
- 计算: ${JSON.stringify(state.calculations)}
提供结合两个来源的结构化答案。`;
const response = await model.invoke([
{ role: 'user', content: synthesisPrompt }
]);
return { finalOutput: String(response.content) };
});
// 定义工作流边
workflow.addEdge('parallelAgents', 'validate');
workflow.addEdge('validate', 'synthesize');
workflow.addEdge('synthesize', END);
workflow.setEntryPoint('parallelAgents');
return workflow.compile();
}
实现具有验证和状态同步的协调并行代理。
4. 具有进度流式传输的动态工作流 API 路由
// app/api/dynamic-workflow/route.ts
import { createDynamicParallelWorkflow } from '@/lib/workflows/dynamic-parallel';
import { NextRequest } from 'next/server';
export const runtime = 'nodejs';
export const maxDuration = 777; // 低于 800 秒限制的安全缓冲区
export async function POST(req: NextRequest) {
const { query } = await req.json();
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const workflow = createDynamicParallelWorkflow();
(async () => {
try {
let stepCount = 0;
// 流式传输工作流事件
const eventStream = await workflow.stream({
query,
companies: [],
results: {},
});
for await (const event of eventStream) {
stepCount++;
// 发送步骤更新
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'step',
stepNumber: stepCount,
node: Object.keys(event)[0],
preview: JSON.stringify(event).substring(0, 100) + '...'
})}\n\n`)
);
// 如果可用,发送最终报告
if (event.reduce?.finalReport) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'complete',
report: event.reduce.finalReport
})}\n\n`)
);
}
}
} catch (error) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'error',
error: String(error)
})}\n\n`)
);
} finally {
await writer.close();
}
})();
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
API 路由处理具有实时进度流式传输的动态工作流执行。
5. 具有并行流可视化的高级前端
// components/DynamicWorkflowInterface.tsx
'use client';
import { useState, useEffect } from 'react';
import { useMutation } from '@tanstack/react-query';
import { partition, groupBy } from 'es-toolkit';
interface WorkflowEvent {
type: 'step' | 'complete' | 'error';
stepNumber?: number;
node?: string;
preview?: string;
report?: string;
error?: string;
}
interface ParallelStream {
id: string;
status: 'pending' | 'active' | 'complete';
result?: any;
}
export default function DynamicWorkflowInterface() {
const [query, setQuery] = useState('');
const [events, setEvents] = useState<WorkflowEvent[]>([]);
const [parallelStreams, setParallelStreams] = useState<ParallelStream[]>([]);
const workflowMutation = useMutation({
mutationFn: async (workflowQuery: string) => {
setEvents([]);
setParallelStreams([]);
const res = await fetch('/api/dynamic-workflow', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query: workflowQuery }),
});
if (!res.ok) throw new Error('工作流失败');
const reader = res.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const event = JSON.parse(line.slice(6)) as WorkflowEvent;
setEvents(prev => [...prev, event]);
// 更新并行流可视化
if (event.node === 'research') {
setParallelStreams(prev => {
const newStream: ParallelStream = {
id: `stream-${prev.length}`,
status: 'active'
};
return [...prev, newStream];
});
}
} catch {}
}
}
}
},
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (query.trim()) {
workflowMutation.mutate(query);
}
};
const [stepEvents, otherEvents] = partition(
events,
(e) => e.type === 'step'
);
const completeEvent = events.find(e => e.type === 'complete');
return (
<div className="w-full space-y-4">
{/* 输入表单 */}
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">动态并行工作流</h2>
<form onSubmit={handleSubmit}>
<div className="form-control">
<label className="label">
<span className="label-text">输入并行分析的查询</span>
</label>
<input
type="text"
className="input input-bordered"
placeholder="例如:分析顶级科技公司收入..."
value={query}
onChange={(e) => setQuery(e.target.value)}
disabled={workflowMutation.isPending}
/>
</div>
<button
type="submit"
className="btn btn-primary mt-4"
disabled={workflowMutation.isPending || !query.trim()}
>
{workflowMutation.isPending ? (
<>
<span className="loading loading-spinner"></span>
运行并行工作流...
</>
) : '执行工作流'}
</button>
</form>
</div>
</div>
{/* 并行流可视化 */}
{parallelStreams.length > 0 && (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h3 className="card-title">并行执行流</h3>
<div className="grid grid-cols-4 gap-2">
{parallelStreams.map(stream => (
<div
key={stream.id}
className={`p-2 rounded ${
stream.status === 'active'
? 'bg-primary text-primary-content animate-pulse'
: 'bg-success text-success-content'
}`}
>
<div className="text-xs font-bold">
{stream.id}
</div>
<div className="text-xs">
{stream.status}
</div>
</div>
))}
</div>
<div className="text-sm text-base-content/70 mt-2">
检测到 {parallelStreams.length} 个并行操作
</div>
</div>
</div>
)}
{/* 工作流步骤 */}
{stepEvents.length > 0 && (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h3 className="card-title">工作流进度</h3>
<ul className="steps steps-vertical">
{stepEvents.map((event, idx) => (
<li key={idx} className="step step-primary">
<div className="text-left ml-4">
<div className="font-semibold">{event.node}</div>
<div className="text-sm opacity-70">
步骤 {event.stepNumber}
</div>
{event.preview && (
<div className="text-xs font-mono opacity-50">
{event.preview}
</div>
)}
</div>
</li>
))}
</ul>
</div>
</div>
)}
{/* 最终报告 */}
{completeEvent?.report && (
<div className="card bg-success text-success-content shadow-xl">
<div className="card-body">
<h3 className="card-title">分析完成</h3>
<div className="prose prose-invert max-w-none">
<pre className="whitespace-pre-wrap">
{completeEvent.report}
</pre>
</div>
</div>
</div>
)}
{/* 错误显示 */}
{workflowMutation.isError && (
<div className="alert alert-error">
<span>工作流失败: {workflowMutation.error?.message}</span>
</div>
)}
</div>
);
}
高级前端组件使用实时更新可视化并行流执行。
6. 具有重试逻辑的容错并行模式
// lib/patterns/resilient-parallel.ts
import { RunnableParallel } from '@langchain/core/runnables';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { retry, delay, take } from 'es-toolkit';
import pLimit from 'p-limit';
interface ParallelTaskConfig {
maxRetries?: number;
concurrencyLimit?: number;
timeoutMs?: number;
backoffMs?: number;
}
export class ResilientParallelExecutor {
private model: ChatGoogleGenerativeAI;
private config: Required<ParallelTaskConfig>;
constructor(config: ParallelTaskConfig = {}) {
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
});
this.config = {
maxRetries: config.maxRetries ?? 3,
concurrencyLimit: config.concurrencyLimit ?? 10,
timeoutMs: config.timeoutMs ?? 30000,
backoffMs: config.backoffMs ?? 1000,
};
}
async executeWithRetry<T>(
task: () => Promise<T>,
taskName: string
): Promise<T | null> {
for (let attempt = 1; attempt <= this.config.maxRetries; attempt++) {
try {
// 添加超时包装器
const result = await Promise.race([
task(),
new Promise<never>((_, reject) =>
setTimeout(
() => reject(new Error(`${this.config.timeoutMs}ms 后超时`)),
this.config.timeoutMs
)
),
]);
return result;
} catch (error) {
console.error(`任务 ${taskName} 尝试 ${attempt} 失败:`, error);
if (attempt < this.config.maxRetries) {
// 带抖动的指数退避
const backoff = this.config.backoffMs * Math.pow(2, attempt - 1);
const jitter = Math.random() * 1000;
await delay(backoff + jitter);
} else {
console.error(`任务 ${taskName} 在 ${attempt} 次尝试后失败`);
return null; // 优雅降级
}
}
}
return null;
}
async executeParallelTasks<T>(
tasks: Array<{ name: string; fn: () => Promise<T> }>
): Promise<Array<{ name: string; result: T | null; success: boolean }>> {
// 创建并发限制器
const limit = pLimit(this.config.concurrencyLimit);
// 使用重试和并发控制执行任务
const results = await Promise.all(
tasks.map(({ name, fn }) =>
limit(async () => {
const result = await this.executeWithRetry(fn, name);
return {
name,
result,
success: result !== null,
};
})
)
);
// 记录摘要
const successful = results.filter(r => r.success).length;
console.log(
`并行执行完成: ${successful}/${tasks.length} 成功`
);
return results;
}
async executeBatchedParallel<T>(
items: T[],
batchSize: number,
processor: (batch: T[]) => Promise<any>
): Promise<any[]> {
const batches = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
const results = [];
// 使用受控并行处理批次
for (const batch of batches) {
const batchResults = await this.executeParallelTasks(
batch.map((item, idx) => ({
name: `batch-item-${idx}`,
fn: () => processor([item]),
}))
);
results.push(...batchResults);
}
return results;
}
}
实现具有重试逻辑、并发限制和优雅降级的生产就绪并行执行。
结论
并行化通过同时执行独立任务,将 AI 代理性能从分钟级转换为秒级。这里展示的模式——从基本的 RunnableParallel 到动态 LangGraph 工作流——为在 Vercel 无服务器平台上构建生产就绪的并行代理系统提供了基础。关键要点包括使用 es-toolkit 进行高效的数据操作、使用重试逻辑实现适当的错误处理、流式传输进度更新以改善用户体验,以及在最大化 777 秒执行窗口的同时遵守无服务器约束。这些模式可以从简单的并行研究代理扩展到同时处理数百个任务的复杂多代理编排系统。