DRAFT Agentic Design Patterns - Parallelization
Learn how to build high-performance AI agents that execute multiple tasks simultaneously using LangChain, LangGraph, and Next.js 15 on Vercel's serverless platform. We'll create agents that can research multiple topics, call various APIs, and process data in parallel—reducing execution time from minutes to seconds.
Mental Model: From Sequential Traffic Lights to Parallel Highway Systems
Think of parallelization in AI agents like upgrading from a single-lane road with traffic lights (sequential) to a multi-lane highway system (parallel). In sequential execution, cars (tasks) wait at each light (step) before proceeding. With parallelization, multiple lanes allow cars to flow simultaneously, with merge points (aggregation) where lanes converge. Just as highway on-ramps enable dynamic traffic entry (dynamic parallelization), LangGraph's Send API allows spawning new parallel tasks at runtime. Vercel's serverless functions act as toll booths that can now process multiple cars simultaneously (in-function concurrency), dramatically improving throughput while maintaining the same infrastructure.
Basic Parallel Agent Implementation
1. Create Parallel Research Agent with RunnableParallel
// lib/agents/parallel-research.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { RunnableParallel, RunnablePassthrough } from '@langchain/core/runnables';
import { PromptTemplate } from '@langchain/core/prompts';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { TavilySearchResults } from '@langchain/community/tools/tavily_search';
import { map, pick } from 'es-toolkit';
import { z } from 'zod';
const ResearchSchema = z.object({
topic: z.string(),
summary: z.string(),
keyPoints: z.array(z.string()),
sources: z.array(z.string())
});
export function createParallelResearchAgent() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
});
const searchTool = new TavilySearchResults({
maxResults: 3,
});
// Define parallel research branches
const technicalResearch = PromptTemplate.fromTemplate(
`Research technical aspects of: {topic}
Focus on: implementation details, architecture, performance metrics`
).pipe(model).pipe(new StringOutputParser());
const businessResearch = PromptTemplate.fromTemplate(
`Research business impact of: {topic}
Focus on: market size, ROI, case studies, adoption rates`
).pipe(model).pipe(new StringOutputParser());
const futureResearch = PromptTemplate.fromTemplate(
`Research future trends of: {topic}
Focus on: predictions, emerging patterns, expert opinions`
).pipe(model).pipe(new StringOutputParser());
// Execute all research branches in parallel
const parallelChain = RunnableParallel.from({
technical: technicalResearch,
business: businessResearch,
future: futureResearch,
topic: RunnablePassthrough(),
});
// Synthesis chain to combine results
const synthesisPrompt = PromptTemplate.fromTemplate(
`Synthesize the following research on {topic}:
Technical Research: {technical}
Business Research: {business}
Future Research: {future}
Create a comprehensive summary with key insights.`
);
return parallelChain
.pipe(synthesisPrompt)
.pipe(model)
.pipe(new StringOutputParser());
}
Creates three research branches that execute simultaneously, reducing research time by 66% compared to sequential execution.
2. API Route with Streaming for Parallel Progress
// app/api/parallel-research/route.ts
import { createParallelResearchAgent } from '@/lib/agents/parallel-research';
import { NextRequest } from 'next/server';
export const runtime = 'nodejs';
export const maxDuration = 300;
export async function POST(req: NextRequest) {
const { topic } = await req.json();
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const agent = createParallelResearchAgent();
// Process in background
(async () => {
try {
// Send initial progress
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'start',
message: 'Starting parallel research on 3 aspects...'
})}\n\n`)
);
// Execute parallel research
const result = await agent.invoke({ topic });
// Send completion
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'complete',
content: result
})}\n\n`)
);
} catch (error) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'error',
error: String(error)
})}\n\n`)
);
} finally {
await writer.close();
}
})();
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
Implements server-sent events to stream progress updates from parallel agent execution.
3. Frontend Component with TanStack Query
// components/ParallelResearchInterface.tsx
'use client';
import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';
import { groupBy } from 'es-toolkit';
interface ResearchEvent {
type: 'start' | 'progress' | 'complete' | 'error';
message?: string;
content?: string;
error?: string;
}
export default function ParallelResearchInterface() {
const [topic, setTopic] = useState('');
const [events, setEvents] = useState<ResearchEvent[]>([]);
const researchMutation = useMutation({
mutationFn: async (researchTopic: string) => {
setEvents([]);
const res = await fetch('/api/parallel-research', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ topic: researchTopic }),
});
if (!res.ok) throw new Error('Research failed');
const reader = res.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const event = JSON.parse(line.slice(6));
setEvents(prev => [...prev, event]);
} catch {}
}
}
}
},
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (topic.trim()) {
researchMutation.mutate(topic);
}
};
const groupedEvents = groupBy(events, (e) => e.type);
const hasCompleted = groupedEvents.complete?.length > 0;
return (
<div className="card w-full bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">Parallel Research Agent</h2>
<form onSubmit={handleSubmit}>
<div className="form-control">
<input
type="text"
className="input input-bordered"
placeholder="Enter research topic..."
value={topic}
onChange={(e) => setTopic(e.target.value)}
disabled={researchMutation.isPending}
/>
</div>
<button
type="submit"
className="btn btn-primary mt-4"
disabled={researchMutation.isPending || !topic.trim()}
>
{researchMutation.isPending ? (
<>
<span className="loading loading-spinner"></span>
Researching in Parallel...
</>
) : 'Start Research'}
</button>
</form>
{events.length > 0 && (
<div className="mt-6">
<div className="steps steps-vertical">
{events.map((event, idx) => (
<li key={idx} className={`step ${
event.type === 'complete' ? 'step-success' :
event.type === 'error' ? 'step-error' :
'step-primary'
}`}>
<div className="text-left ml-4">
{event.message || event.content || event.error}
</div>
</li>
))}
</div>
</div>
)}
{hasCompleted && groupedEvents.complete[0].content && (
<div className="alert alert-success mt-4">
<div className="prose max-w-none">
{groupedEvents.complete[0].content}
</div>
</div>
)}
</div>
</div>
);
}
Frontend component manages parallel research with real-time progress updates using TanStack Query.
Advanced Parallel Workflows with LangGraph
1. Dynamic Parallel Execution with Send API
// lib/workflows/dynamic-parallel.ts
import { StateGraph, Send, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage, HumanMessage } from '@langchain/core/messages';
import { partition, chunk as chunkArray } from 'es-toolkit';
import { z } from 'zod';
// Define state schema
const WorkflowStateSchema = z.object({
query: z.string(),
companies: z.array(z.string()),
results: z.record(z.string(), z.any()),
finalReport: z.string().optional(),
});
type WorkflowState = z.infer<typeof WorkflowStateSchema>;
interface CompanyResearch {
company: string;
revenue: number;
employees: number;
founded: number;
}
export function createDynamicParallelWorkflow() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0,
});
// Create the state graph
const workflow = new StateGraph<WorkflowState>({
channels: {
query: {
value: null,
},
companies: {
value: (x: string[], y: string[]) => [...x, ...y],
default: () => [],
},
results: {
value: (x: Record<string, any>, y: Record<string, any>) => ({...x, ...y}),
default: () => ({}),
},
finalReport: {
value: null,
},
},
});
// Decompose node - identifies companies to research
workflow.addNode('decompose', async (state) => {
const prompt = `Given the query: "${state.query}"
List all companies that need to be researched (comma-separated, no explanation):`;
const response = await model.invoke([new HumanMessage(prompt)]);
const companies = String(response.content)
.split(',')
.map(c => c.trim())
.filter(Boolean);
return { companies };
});
// Map node - spawns parallel research for each company
workflow.addNode('map', async (state) => {
// Use Send API to create dynamic parallel branches
const sends = state.companies.map(company =>
new Send('research', { company, query: state.query })
);
return sends;
});
// Research node - researches individual company
workflow.addNode('research', async (state: any) => {
const { company, query } = state;
// Simulate research with mock data
const mockData: CompanyResearch = {
company,
revenue: Math.floor(Math.random() * 1000) + 100,
employees: Math.floor(Math.random() * 10000) + 100,
founded: 2000 + Math.floor(Math.random() * 25),
};
return {
results: {
[company]: mockData
}
};
});
// Reduce node - aggregates all research results
workflow.addNode('reduce', async (state) => {
const companies = Object.values(state.results) as CompanyResearch[];
// Sort by revenue and create report
const sorted = companies.sort((a, b) => b.revenue - a.revenue);
const report = `
# Company Analysis Report
## Query: ${state.query}
## Top Companies by Revenue:
${sorted.map((c, i) => `
${i + 1}. **${c.company}**
- Revenue: $${c.revenue}M
- Employees: ${c.employees.toLocaleString()}
- Founded: ${c.founded}
`).join('')}
## Summary Statistics:
- Total Companies Analyzed: ${companies.length}
- Average Revenue: $${Math.round(companies.reduce((sum, c) => sum + c.revenue, 0) / companies.length)}M
- Total Employees: ${companies.reduce((sum, c) => sum + c.employees, 0).toLocaleString()}
`;
return { finalReport: report };
});
// Define the workflow edges
workflow.addEdge('decompose', 'map');
workflow.addEdge('map', 'research');
workflow.addEdge('research', 'reduce');
workflow.addEdge('reduce', END);
workflow.setEntryPoint('decompose');
return workflow.compile();
}
Implements dynamic parallelization where the number of parallel tasks is determined at runtime based on query decomposition.
2. Map-Reduce Pattern for Batch Processing
// lib/workflows/map-reduce-batch.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { chunk, flatten, groupBy } from 'es-toolkit';
interface BatchState {
documents: string[];
batchSize: number;
processedBatches: Record<string, any>;
summary: string;
}
export function createMapReduceBatchWorkflow() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
});
const workflow = new StateGraph<BatchState>({
channels: {
documents: {
value: null,
},
batchSize: {
value: null,
default: () => 5,
},
processedBatches: {
value: (x: Record<string, any>, y: Record<string, any>) => ({...x, ...y}),
default: () => ({}),
},
summary: {
value: null,
},
},
});
// Batch documents for parallel processing
workflow.addNode('createBatches', async (state) => {
const batches = chunk(state.documents, state.batchSize);
// Process batches in parallel
const processingPromises = batches.map(async (batch, index) => {
const batchPrompt = `Analyze these documents and extract key insights:
${batch.map((doc, i) => `Document ${i + 1}: ${doc}`).join('\n')}
Provide a structured analysis with main themes and patterns.`;
const response = await model.invoke([
{ role: 'user', content: batchPrompt }
]);
return {
[`batch_${index}`]: {
documents: batch.length,
analysis: response.content,
}
};
});
// Wait for all batches to complete
const results = await Promise.all(processingPromises);
const merged = Object.assign({}, ...results);
return { processedBatches: merged };
});
// Reduce all batch results into final summary
workflow.addNode('reduceBatches', async (state) => {
const allAnalyses = Object.values(state.processedBatches)
.map(batch => batch.analysis)
.join('\n\n');
const reducePrompt = `Synthesize these batch analyses into a comprehensive summary:
${allAnalyses}
Create a unified report highlighting:
1. Common themes across all batches
2. Unique insights from specific batches
3. Overall patterns and conclusions`;
const response = await model.invoke([
{ role: 'user', content: reducePrompt }
]);
return { summary: String(response.content) };
});
workflow.addEdge('createBatches', 'reduceBatches');
workflow.addEdge('reduceBatches', END);
workflow.setEntryPoint('createBatches');
return workflow.compile();
}
Implements map-reduce pattern for processing large document sets in parallel batches.
3. Parallel Agent Coordination with State Management
// lib/workflows/coordinated-agents.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { TavilySearchResults } from '@langchain/community/tools/tavily_search';
import { Calculator } from '@langchain/community/tools/calculator';
import { uniqBy, sortBy } from 'es-toolkit';
interface CoordinatedState {
task: string;
researchData: any[];
calculations: any[];
validation: boolean;
finalOutput: string;
}
export function createCoordinatedAgentsWorkflow() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-pro',
temperature: 0.2,
});
const searchTool = new TavilySearchResults({ maxResults: 5 });
const calculator = new Calculator();
const workflow = new StateGraph<CoordinatedState>({
channels: {
task: { value: null },
researchData: {
value: (x: any[], y: any[]) => [...x, ...y],
default: () => [],
},
calculations: {
value: (x: any[], y: any[]) => [...x, ...y],
default: () => [],
},
validation: {
value: null,
default: () => false,
},
finalOutput: { value: null },
},
});
// Parallel execution of research and calculation agents
workflow.addNode('parallelAgents', async (state) => {
// Research Agent
const researchPromise = (async () => {
const searchQuery = `${state.task} latest data statistics facts`;
const results = await searchTool.invoke(searchQuery);
const analysisPrompt = `Analyze these search results for: ${state.task}
Results: ${JSON.stringify(results)}
Extract key data points and insights.`;
const analysis = await model.invoke([
{ role: 'user', content: analysisPrompt }
]);
return {
researchData: [{
source: 'web_search',
content: analysis.content,
timestamp: new Date().toISOString(),
}]
};
})();
// Calculation Agent
const calculationPromise = (async () => {
// Extract numbers from task for calculations
const numbers = state.task.match(/\d+/g)?.map(Number) || [];
if (numbers.length >= 2) {
const calculations = [];
// Perform various calculations
calculations.push({
operation: 'sum',
result: await calculator.invoke(
`${numbers.join(' + ')}`
),
});
calculations.push({
operation: 'average',
result: await calculator.invoke(
`(${numbers.join(' + ')}) / ${numbers.length}`
),
});
return { calculations };
}
return { calculations: [] };
})();
// Execute both agents in parallel
const [research, calcs] = await Promise.all([
researchPromise,
calculationPromise,
]);
return { ...research, ...calcs };
});
// Validation node - checks parallel results
workflow.addNode('validate', async (state) => {
const hasResearch = state.researchData.length > 0;
const hasCalculations = state.calculations.length > 0;
const validationPrompt = `Validate the consistency of these parallel agent results:
Research: ${JSON.stringify(state.researchData)}
Calculations: ${JSON.stringify(state.calculations)}
Are the results consistent and reliable? (YES/NO only)`;
const response = await model.invoke([
{ role: 'user', content: validationPrompt }
]);
const isValid = String(response.content).toUpperCase().includes('YES');
return { validation: isValid };
});
// Synthesis node - combines validated results
workflow.addNode('synthesize', async (state) => {
if (!state.validation) {
return {
finalOutput: 'Validation failed. Results from parallel agents are inconsistent.'
};
}
const synthesisPrompt = `Create a comprehensive response for: ${state.task}
Using validated data from parallel agents:
- Research findings: ${JSON.stringify(state.researchData)}
- Calculations: ${JSON.stringify(state.calculations)}
Provide a well-structured answer combining both sources.`;
const response = await model.invoke([
{ role: 'user', content: synthesisPrompt }
]);
return { finalOutput: String(response.content) };
});
// Define workflow edges
workflow.addEdge('parallelAgents', 'validate');
workflow.addEdge('validate', 'synthesize');
workflow.addEdge('synthesize', END);
workflow.setEntryPoint('parallelAgents');
return workflow.compile();
}
Implements coordinated parallel agents with validation and state synchronization.
4. API Route for Dynamic Workflow with Progress Streaming
// app/api/dynamic-workflow/route.ts
import { createDynamicParallelWorkflow } from '@/lib/workflows/dynamic-parallel';
import { NextRequest } from 'next/server';
export const runtime = 'nodejs';
export const maxDuration = 777; // Safe buffer under 800s limit
export async function POST(req: NextRequest) {
const { query } = await req.json();
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const workflow = createDynamicParallelWorkflow();
(async () => {
try {
let stepCount = 0;
// Stream workflow events
const eventStream = await workflow.stream({
query,
companies: [],
results: {},
});
for await (const event of eventStream) {
stepCount++;
// Send step updates
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'step',
stepNumber: stepCount,
node: Object.keys(event)[0],
preview: JSON.stringify(event).substring(0, 100) + '...'
})}\n\n`)
);
// Send final report when available
if (event.reduce?.finalReport) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'complete',
report: event.reduce.finalReport
})}\n\n`)
);
}
}
} catch (error) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'error',
error: String(error)
})}\n\n`)
);
} finally {
await writer.close();
}
})();
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
API route handles dynamic workflow execution with real-time progress streaming.
5. Advanced Frontend with Parallel Stream Visualization
// components/DynamicWorkflowInterface.tsx
'use client';
import { useState, useEffect } from 'react';
import { useMutation } from '@tanstack/react-query';
import { partition, groupBy } from 'es-toolkit';
interface WorkflowEvent {
type: 'step' | 'complete' | 'error';
stepNumber?: number;
node?: string;
preview?: string;
report?: string;
error?: string;
}
interface ParallelStream {
id: string;
status: 'pending' | 'active' | 'complete';
result?: any;
}
export default function DynamicWorkflowInterface() {
const [query, setQuery] = useState('');
const [events, setEvents] = useState<WorkflowEvent[]>([]);
const [parallelStreams, setParallelStreams] = useState<ParallelStream[]>([]);
const workflowMutation = useMutation({
mutationFn: async (workflowQuery: string) => {
setEvents([]);
setParallelStreams([]);
const res = await fetch('/api/dynamic-workflow', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query: workflowQuery }),
});
if (!res.ok) throw new Error('Workflow failed');
const reader = res.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const event = JSON.parse(line.slice(6)) as WorkflowEvent;
setEvents(prev => [...prev, event]);
// Update parallel streams visualization
if (event.node === 'research') {
setParallelStreams(prev => {
const newStream: ParallelStream = {
id: `stream-${prev.length}`,
status: 'active'
};
return [...prev, newStream];
});
}
} catch {}
}
}
}
},
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (query.trim()) {
workflowMutation.mutate(query);
}
};
const [stepEvents, otherEvents] = partition(
events,
(e) => e.type === 'step'
);
const completeEvent = events.find(e => e.type === 'complete');
return (
<div className="w-full space-y-4">
{/* Input Form */}
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">Dynamic Parallel Workflow</h2>
<form onSubmit={handleSubmit}>
<div className="form-control">
<label className="label">
<span className="label-text">Enter query for parallel analysis</span>
</label>
<input
type="text"
className="input input-bordered"
placeholder="e.g., Analyze top tech companies revenue..."
value={query}
onChange={(e) => setQuery(e.target.value)}
disabled={workflowMutation.isPending}
/>
</div>
<button
type="submit"
className="btn btn-primary mt-4"
disabled={workflowMutation.isPending || !query.trim()}
>
{workflowMutation.isPending ? (
<>
<span className="loading loading-spinner"></span>
Running Parallel Workflow...
</>
) : 'Execute Workflow'}
</button>
</form>
</div>
</div>
{/* Parallel Streams Visualization */}
{parallelStreams.length > 0 && (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h3 className="card-title">Parallel Execution Streams</h3>
<div className="grid grid-cols-4 gap-2">
{parallelStreams.map(stream => (
<div
key={stream.id}
className={`p-2 rounded ${
stream.status === 'active'
? 'bg-primary text-primary-content animate-pulse'
: 'bg-success text-success-content'
}`}
>
<div className="text-xs font-bold">
{stream.id}
</div>
<div className="text-xs">
{stream.status}
</div>
</div>
))}
</div>
<div className="text-sm text-base-content/70 mt-2">
{parallelStreams.length} parallel operations detected
</div>
</div>
</div>
)}
{/* Workflow Steps */}
{stepEvents.length > 0 && (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h3 className="card-title">Workflow Progress</h3>
<ul className="steps steps-vertical">
{stepEvents.map((event, idx) => (
<li key={idx} className="step step-primary">
<div className="text-left ml-4">
<div className="font-semibold">{event.node}</div>
<div className="text-sm opacity-70">
Step {event.stepNumber}
</div>
{event.preview && (
<div className="text-xs font-mono opacity-50">
{event.preview}
</div>
)}
</div>
</li>
))}
</ul>
</div>
</div>
)}
{/* Final Report */}
{completeEvent?.report && (
<div className="card bg-success text-success-content shadow-xl">
<div className="card-body">
<h3 className="card-title">Analysis Complete</h3>
<div className="prose prose-invert max-w-none">
<pre className="whitespace-pre-wrap">
{completeEvent.report}
</pre>
</div>
</div>
</div>
)}
{/* Error Display */}
{workflowMutation.isError && (
<div className="alert alert-error">
<span>Workflow failed: {workflowMutation.error?.message}</span>
</div>
)}
</div>
);
}
Advanced frontend component visualizes parallel stream execution with real-time updates.
6. Error-Resilient Parallel Pattern with Retry Logic
// lib/patterns/resilient-parallel.ts
import { RunnableParallel } from '@langchain/core/runnables';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { retry, delay, take } from 'es-toolkit';
import pLimit from 'p-limit';
interface ParallelTaskConfig {
maxRetries?: number;
concurrencyLimit?: number;
timeoutMs?: number;
backoffMs?: number;
}
export class ResilientParallelExecutor {
private model: ChatGoogleGenerativeAI;
private config: Required<ParallelTaskConfig>;
constructor(config: ParallelTaskConfig = {}) {
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
});
this.config = {
maxRetries: config.maxRetries ?? 3,
concurrencyLimit: config.concurrencyLimit ?? 10,
timeoutMs: config.timeoutMs ?? 30000,
backoffMs: config.backoffMs ?? 1000,
};
}
async executeWithRetry<T>(
task: () => Promise<T>,
taskName: string
): Promise<T | null> {
for (let attempt = 1; attempt <= this.config.maxRetries; attempt++) {
try {
// Add timeout wrapper
const result = await Promise.race([
task(),
new Promise<never>((_, reject) =>
setTimeout(
() => reject(new Error(`Timeout after ${this.config.timeoutMs}ms`)),
this.config.timeoutMs
)
),
]);
return result;
} catch (error) {
console.error(`Task ${taskName} attempt ${attempt} failed:`, error);
if (attempt < this.config.maxRetries) {
// Exponential backoff with jitter
const backoff = this.config.backoffMs * Math.pow(2, attempt - 1);
const jitter = Math.random() * 1000;
await delay(backoff + jitter);
} else {
console.error(`Task ${taskName} failed after ${attempt} attempts`);
return null; // Graceful degradation
}
}
}
return null;
}
async executeParallelTasks<T>(
tasks: Array<{ name: string; fn: () => Promise<T> }>
): Promise<Array<{ name: string; result: T | null; success: boolean }>> {
// Create concurrency limiter
const limit = pLimit(this.config.concurrencyLimit);
// Execute tasks with retry and concurrency control
const results = await Promise.all(
tasks.map(({ name, fn }) =>
limit(async () => {
const result = await this.executeWithRetry(fn, name);
return {
name,
result,
success: result !== null,
};
})
)
);
// Log summary
const successful = results.filter(r => r.success).length;
console.log(
`Parallel execution complete: ${successful}/${tasks.length} succeeded`
);
return results;
}
async executeBatchedParallel<T>(
items: T[],
batchSize: number,
processor: (batch: T[]) => Promise<any>
): Promise<any[]> {
const batches = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
const results = [];
// Process batches with controlled parallelism
for (const batch of batches) {
const batchResults = await this.executeParallelTasks(
batch.map((item, idx) => ({
name: `batch-item-${idx}`,
fn: () => processor([item]),
}))
);
results.push(...batchResults);
}
return results;
}
}
Implements production-ready parallel execution with retry logic, concurrency limits, and graceful degradation.
Conclusion
Parallelization transforms AI agent performance from minutes to seconds by executing independent tasks simultaneously. The patterns shown here—from basic RunnableParallel to dynamic LangGraph workflows—provide the foundation for building production-ready parallel agent systems on Vercel's serverless platform. Key takeaways include using es-toolkit for efficient data operations, implementing proper error handling with retry logic, streaming progress updates for better UX, and respecting serverless constraints while maximizing the 777-second execution window. These patterns scale from simple parallel research agents to complex multi-agent orchestration systems processing hundreds of tasks concurrently.