초안 에이전트 설계 패턴 - 병렬화

aiagentsparallelizationlangchainlanggraphnextjsvercel
By sko X opus 4.19/20/202510 min read

LangChain, LangGraph, Next.js 15를 사용하여 Vercel 서버리스 플랫폼에서 여러 작업을 동시에 실행하는 고성능 AI 에이전트를 구축하는 방법을 알아봅니다. 여러 주제를 연구하고, 다양한 API를 호출하며, 데이터를 병렬로 처리할 수 있는 에이전트를 만들어 실행 시간을 분 단위에서 초 단위로 줄입니다.

멘탈 모델: 순차적 신호등에서 병렬 고속도로 시스템으로

AI 에이전트의 병렬화를 신호등이 있는 단일 차선 도로(순차적)에서 다중 차선 고속도로 시스템(병렬)으로 업그레이드하는 것으로 생각해보세요. 순차적 실행에서는 자동차(작업)가 각 신호등(단계)에서 대기한 후 진행합니다. 병렬화에서는 여러 차선이 자동차들이 동시에 흐르도록 하며, 차선이 합류하는 지점(집계)이 있습니다. 고속도로 진입 램프가 동적 교통 진입을 가능하게 하듯이(동적 병렬화), LangGraph의 Send API는 런타임에 새로운 병렬 작업을 생성할 수 있습니다. Vercel의 서버리스 함수는 이제 여러 대의 자동차를 동시에 처리할 수 있는 톨게이트 역할을 하며(함수 내 동시성), 동일한 인프라를 유지하면서 처리량을 극적으로 개선합니다.

기본 병렬 에이전트 구현

1. RunnableParallel로 병렬 리서치 에이전트 만들기

// lib/agents/parallel-research.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { RunnableParallel, RunnablePassthrough } from '@langchain/core/runnables';
import { PromptTemplate } from '@langchain/core/prompts';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { TavilySearchResults } from '@langchain/community/tools/tavily_search';
import { map, pick } from 'es-toolkit';
import { z } from 'zod';

const ResearchSchema = z.object({
  topic: z.string(),
  summary: z.string(),
  keyPoints: z.array(z.string()),
  sources: z.array(z.string())
});

export function createParallelResearchAgent() {
  const model = new ChatGoogleGenerativeAI({
    modelName: 'gemini-2.5-flash',
    temperature: 0.3,
  });

  const searchTool = new TavilySearchResults({
    maxResults: 3,
  });

  // 병렬 연구 브랜치 정의
  const technicalResearch = PromptTemplate.fromTemplate(
    `Research technical aspects of: {topic}
     Focus on: implementation details, architecture, performance metrics`
  ).pipe(model).pipe(new StringOutputParser());

  const businessResearch = PromptTemplate.fromTemplate(
    `Research business impact of: {topic}
     Focus on: market size, ROI, case studies, adoption rates`
  ).pipe(model).pipe(new StringOutputParser());

  const futureResearch = PromptTemplate.fromTemplate(
    `Research future trends of: {topic}
     Focus on: predictions, emerging patterns, expert opinions`
  ).pipe(model).pipe(new StringOutputParser());

  // 모든 연구 브랜치를 병렬로 실행
  const parallelChain = RunnableParallel.from({
    technical: technicalResearch,
    business: businessResearch,
    future: futureResearch,
    topic: RunnablePassthrough(),
  });

  // 결과를 결합하는 통합 체인
  const synthesisPrompt = PromptTemplate.fromTemplate(
    `Synthesize the following research on {topic}:

    Technical Research: {technical}
    Business Research: {business}
    Future Research: {future}

    Create a comprehensive summary with key insights.`
  );

  return parallelChain
    .pipe(synthesisPrompt)
    .pipe(model)
    .pipe(new StringOutputParser());
}

세 개의 연구 브랜치를 동시에 실행하여 순차 실행 대비 연구 시간을 66% 단축합니다.

2. 병렬 진행 상황 스트리밍을 위한 API 라우트

// app/api/parallel-research/route.ts
import { createParallelResearchAgent } from '@/lib/agents/parallel-research';
import { NextRequest } from 'next/server';

export const runtime = 'nodejs';
export const maxDuration = 300;

export async function POST(req: NextRequest) {
  const { topic } = await req.json();

  const encoder = new TextEncoder();
  const stream = new TransformStream();
  const writer = stream.writable.getWriter();

  const agent = createParallelResearchAgent();

  // 백그라운드에서 처리
  (async () => {
    try {
      // 초기 진행 상황 전송
      await writer.write(
        encoder.encode(`data: ${JSON.stringify({
          type: 'start',
          message: '3개 측면에서 병렬 연구를 시작합니다...'
        })}\n\n`)
      );

      // 병렬 연구 실행
      const result = await agent.invoke({ topic });

      // 완료 전송
      await writer.write(
        encoder.encode(`data: ${JSON.stringify({
          type: 'complete',
          content: result
        })}\n\n`)
      );
    } catch (error) {
      await writer.write(
        encoder.encode(`data: ${JSON.stringify({
          type: 'error',
          error: String(error)
        })}\n\n`)
      );
    } finally {
      await writer.close();
    }
  })();

  return new Response(stream.readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}

병렬 에이전트 실행에서 진행 상황 업데이트를 스트리밍하는 서버 전송 이벤트를 구현합니다.

3. TanStack Query를 사용한 프론트엔드 컴포넌트

// components/ParallelResearchInterface.tsx
'use client';

import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';
import { groupBy } from 'es-toolkit';

interface ResearchEvent {
  type: 'start' | 'progress' | 'complete' | 'error';
  message?: string;
  content?: string;
  error?: string;
}

export default function ParallelResearchInterface() {
  const [topic, setTopic] = useState('');
  const [events, setEvents] = useState<ResearchEvent[]>([]);

  const researchMutation = useMutation({
    mutationFn: async (researchTopic: string) => {
      setEvents([]);

      const res = await fetch('/api/parallel-research', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ topic: researchTopic }),
      });

      if (!res.ok) throw new Error('연구 실패');

      const reader = res.body?.getReader();
      const decoder = new TextDecoder();

      while (reader) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            try {
              const event = JSON.parse(line.slice(6));
              setEvents(prev => [...prev, event]);
            } catch {}
          }
        }
      }
    },
  });

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (topic.trim()) {
      researchMutation.mutate(topic);
    }
  };

  const groupedEvents = groupBy(events, (e) => e.type);
  const hasCompleted = groupedEvents.complete?.length > 0;

  return (
    <div className="card w-full bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">병렬 연구 에이전트</h2>

        <form onSubmit={handleSubmit}>
          <div className="form-control">
            <input
              type="text"
              className="input input-bordered"
              placeholder="연구 주제를 입력하세요..."
              value={topic}
              onChange={(e) => setTopic(e.target.value)}
              disabled={researchMutation.isPending}
            />
          </div>

          <button
            type="submit"
            className="btn btn-primary mt-4"
            disabled={researchMutation.isPending || !topic.trim()}
          >
            {researchMutation.isPending ? (
              <>
                <span className="loading loading-spinner"></span>
                병렬로 연구 중...
              </>
            ) : '연구 시작'}
          </button>
        </form>

        {events.length > 0 && (
          <div className="mt-6">
            <div className="steps steps-vertical">
              {events.map((event, idx) => (
                <li key={idx} className={`step ${
                  event.type === 'complete' ? 'step-success' :
                  event.type === 'error' ? 'step-error' :
                  'step-primary'
                }`}>
                  <div className="text-left ml-4">
                    {event.message || event.content || event.error}
                  </div>
                </li>
              ))}
            </div>
          </div>
        )}

        {hasCompleted && groupedEvents.complete[0].content && (
          <div className="alert alert-success mt-4">
            <div className="prose max-w-none">
              {groupedEvents.complete[0].content}
            </div>
          </div>
        )}
      </div>
    </div>
  );
}

TanStack Query를 사용하여 실시간 진행 상황 업데이트로 병렬 연구를 관리하는 프론트엔드 컴포넌트.

LangGraph를 사용한 고급 병렬 워크플로우

1. Send API를 사용한 동적 병렬 실행

// lib/workflows/dynamic-parallel.ts
import { StateGraph, Send, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage, HumanMessage } from '@langchain/core/messages';
import { partition, chunk as chunkArray } from 'es-toolkit';
import { z } from 'zod';

// 상태 스키마 정의
const WorkflowStateSchema = z.object({
  query: z.string(),
  companies: z.array(z.string()),
  results: z.record(z.string(), z.any()),
  finalReport: z.string().optional(),
});

type WorkflowState = z.infer<typeof WorkflowStateSchema>;

interface CompanyResearch {
  company: string;
  revenue: number;
  employees: number;
  founded: number;
}

export function createDynamicParallelWorkflow() {
  const model = new ChatGoogleGenerativeAI({
    modelName: 'gemini-2.5-flash',
    temperature: 0,
  });

  // 상태 그래프 생성
  const workflow = new StateGraph<WorkflowState>({
    channels: {
      query: {
        value: null,
      },
      companies: {
        value: (x: string[], y: string[]) => [...x, ...y],
        default: () => [],
      },
      results: {
        value: (x: Record<string, any>, y: Record<string, any>) => ({...x, ...y}),
        default: () => ({}),
      },
      finalReport: {
        value: null,
      },
    },
  });

  // 분해 노드 - 연구할 회사 식별
  workflow.addNode('decompose', async (state) => {
    const prompt = `Given the query: "${state.query}"
    List all companies that need to be researched (comma-separated, no explanation):`;

    const response = await model.invoke([new HumanMessage(prompt)]);
    const companies = String(response.content)
      .split(',')
      .map(c => c.trim())
      .filter(Boolean);

    return { companies };
  });

  // 맵 노드 - 각 회사에 대한 병렬 연구 생성
  workflow.addNode('map', async (state) => {
    // Send API를 사용하여 동적 병렬 브랜치 생성
    const sends = state.companies.map(company =>
      new Send('research', { company, query: state.query })
    );

    return sends;
  });

  // 연구 노드 - 개별 회사 연구
  workflow.addNode('research', async (state: any) => {
    const { company, query } = state;

    // 모의 데이터로 연구 시뮬레이션
    const mockData: CompanyResearch = {
      company,
      revenue: Math.floor(Math.random() * 1000) + 100,
      employees: Math.floor(Math.random() * 10000) + 100,
      founded: 2000 + Math.floor(Math.random() * 25),
    };

    return {
      results: {
        [company]: mockData
      }
    };
  });

  // 리듀스 노드 - 모든 연구 결과 집계
  workflow.addNode('reduce', async (state) => {
    const companies = Object.values(state.results) as CompanyResearch[];

    // 수익별로 정렬하고 보고서 생성
    const sorted = companies.sort((a, b) => b.revenue - a.revenue);

    const report = `
# 회사 분석 보고서

## 쿼리: ${state.query}

## 수익 상위 회사:
${sorted.map((c, i) => `
${i + 1}. **${c.company}**
   - 수익: $${c.revenue}M
   - 직원 수: ${c.employees.toLocaleString()}
   - 설립: ${c.founded}
`).join('')}

## 통계 요약:
- 분석된 총 회사 수: ${companies.length}
- 평균 수익: $${Math.round(companies.reduce((sum, c) => sum + c.revenue, 0) / companies.length)}M
- 총 직원 수: ${companies.reduce((sum, c) => sum + c.employees, 0).toLocaleString()}
`;

    return { finalReport: report };
  });

  // 워크플로우 엣지 정의
  workflow.addEdge('decompose', 'map');
  workflow.addEdge('map', 'research');
  workflow.addEdge('research', 'reduce');
  workflow.addEdge('reduce', END);

  workflow.setEntryPoint('decompose');

  return workflow.compile();
}

쿼리 분해에 따라 런타임에 병렬 작업 수가 결정되는 동적 병렬화를 구현합니다.

2. 배치 처리를 위한 Map-Reduce 패턴

// lib/workflows/map-reduce-batch.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { chunk, flatten, groupBy } from 'es-toolkit';

interface BatchState {
  documents: string[];
  batchSize: number;
  processedBatches: Record<string, any>;
  summary: string;
}

export function createMapReduceBatchWorkflow() {
  const model = new ChatGoogleGenerativeAI({
    modelName: 'gemini-2.5-flash',
    temperature: 0.3,
  });

  const workflow = new StateGraph<BatchState>({
    channels: {
      documents: {
        value: null,
      },
      batchSize: {
        value: null,
        default: () => 5,
      },
      processedBatches: {
        value: (x: Record<string, any>, y: Record<string, any>) => ({...x, ...y}),
        default: () => ({}),
      },
      summary: {
        value: null,
      },
    },
  });

  // 병렬 처리를 위한 문서 배치화
  workflow.addNode('createBatches', async (state) => {
    const batches = chunk(state.documents, state.batchSize);

    // 배치를 병렬로 처리
    const processingPromises = batches.map(async (batch, index) => {
      const batchPrompt = `이 문서들을 분석하고 주요 인사이트를 추출하세요:
      ${batch.map((doc, i) => `문서 ${i + 1}: ${doc}`).join('\n')}

      주요 테마와 패턴을 포함한 구조화된 분석을 제공하세요.`;

      const response = await model.invoke([
        { role: 'user', content: batchPrompt }
      ]);

      return {
        [`batch_${index}`]: {
          documents: batch.length,
          analysis: response.content,
        }
      };
    });

    // 모든 배치가 완료될 때까지 대기
    const results = await Promise.all(processingPromises);
    const merged = Object.assign({}, ...results);

    return { processedBatches: merged };
  });

  // 모든 배치 결과를 최종 요약으로 리듀스
  workflow.addNode('reduceBatches', async (state) => {
    const allAnalyses = Object.values(state.processedBatches)
      .map(batch => batch.analysis)
      .join('\n\n');

    const reducePrompt = `이 배치 분석들을 종합적인 요약으로 통합하세요:
    ${allAnalyses}

    다음을 강조한 통합 보고서를 작성하세요:
    1. 모든 배치에 공통된 테마
    2. 특정 배치의 고유한 인사이트
    3. 전반적인 패턴과 결론`;

    const response = await model.invoke([
      { role: 'user', content: reducePrompt }
    ]);

    return { summary: String(response.content) };
  });

  workflow.addEdge('createBatches', 'reduceBatches');
  workflow.addEdge('reduceBatches', END);
  workflow.setEntryPoint('createBatches');

  return workflow.compile();
}

대규모 문서 세트를 병렬 배치로 처리하는 map-reduce 패턴을 구현합니다.

3. 상태 관리를 통한 병렬 에이전트 조정

// lib/workflows/coordinated-agents.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { TavilySearchResults } from '@langchain/community/tools/tavily_search';
import { Calculator } from '@langchain/community/tools/calculator';
import { uniqBy, sortBy } from 'es-toolkit';

interface CoordinatedState {
  task: string;
  researchData: any[];
  calculations: any[];
  validation: boolean;
  finalOutput: string;
}

export function createCoordinatedAgentsWorkflow() {
  const model = new ChatGoogleGenerativeAI({
    modelName: 'gemini-2.5-pro',
    temperature: 0.2,
  });

  const searchTool = new TavilySearchResults({ maxResults: 5 });
  const calculator = new Calculator();

  const workflow = new StateGraph<CoordinatedState>({
    channels: {
      task: { value: null },
      researchData: {
        value: (x: any[], y: any[]) => [...x, ...y],
        default: () => [],
      },
      calculations: {
        value: (x: any[], y: any[]) => [...x, ...y],
        default: () => [],
      },
      validation: {
        value: null,
        default: () => false,
      },
      finalOutput: { value: null },
    },
  });

  // 연구 및 계산 에이전트의 병렬 실행
  workflow.addNode('parallelAgents', async (state) => {
    // 연구 에이전트
    const researchPromise = (async () => {
      const searchQuery = `${state.task} 최신 데이터 통계 사실`;
      const results = await searchTool.invoke(searchQuery);

      const analysisPrompt = `다음에 대한 검색 결과를 분석하세요: ${state.task}
      결과: ${JSON.stringify(results)}
      주요 데이터 포인트와 인사이트를 추출하세요.`;

      const analysis = await model.invoke([
        { role: 'user', content: analysisPrompt }
      ]);

      return {
        researchData: [{
          source: 'web_search',
          content: analysis.content,
          timestamp: new Date().toISOString(),
        }]
      };
    })();

    // 계산 에이전트
    const calculationPromise = (async () => {
      // 작업에서 계산용 숫자 추출
      const numbers = state.task.match(/\d+/g)?.map(Number) || [];

      if (numbers.length >= 2) {
        const calculations = [];

        // 다양한 계산 수행
        calculations.push({
          operation: '합계',
          result: await calculator.invoke(
            `${numbers.join(' + ')}`
          ),
        });

        calculations.push({
          operation: '평균',
          result: await calculator.invoke(
            `(${numbers.join(' + ')}) / ${numbers.length}`
          ),
        });

        return { calculations };
      }

      return { calculations: [] };
    })();

    // 두 에이전트를 병렬로 실행
    const [research, calcs] = await Promise.all([
      researchPromise,
      calculationPromise,
    ]);

    return { ...research, ...calcs };
  });

  // 검증 노드 - 병렬 결과 확인
  workflow.addNode('validate', async (state) => {
    const hasResearch = state.researchData.length > 0;
    const hasCalculations = state.calculations.length > 0;

    const validationPrompt = `이 병렬 에이전트 결과의 일관성을 검증하세요:
    연구: ${JSON.stringify(state.researchData)}
    계산: ${JSON.stringify(state.calculations)}

    결과가 일관되고 신뢰할 수 있습니까? (YES/NO만)`;

    const response = await model.invoke([
      { role: 'user', content: validationPrompt }
    ]);

    const isValid = String(response.content).toUpperCase().includes('YES');

    return { validation: isValid };
  });

  // 통합 노드 - 검증된 결과 결합
  workflow.addNode('synthesize', async (state) => {
    if (!state.validation) {
      return {
        finalOutput: '검증 실패. 병렬 에이전트의 결과가 일치하지 않습니다.'
      };
    }

    const synthesisPrompt = `다음에 대한 종합적인 응답을 만드세요: ${state.task}

    병렬 에이전트에서 검증된 데이터 사용:
    - 연구 결과: ${JSON.stringify(state.researchData)}
    - 계산: ${JSON.stringify(state.calculations)}

    두 소스를 결합한 구조화된 답변을 제공하세요.`;

    const response = await model.invoke([
      { role: 'user', content: synthesisPrompt }
    ]);

    return { finalOutput: String(response.content) };
  });

  // 워크플로우 엣지 정의
  workflow.addEdge('parallelAgents', 'validate');
  workflow.addEdge('validate', 'synthesize');
  workflow.addEdge('synthesize', END);
  workflow.setEntryPoint('parallelAgents');

  return workflow.compile();
}

검증 및 상태 동기화를 통한 조정된 병렬 에이전트를 구현합니다.

4. 진행 상황 스트리밍이 있는 동적 워크플로우용 API 라우트

// app/api/dynamic-workflow/route.ts
import { createDynamicParallelWorkflow } from '@/lib/workflows/dynamic-parallel';
import { NextRequest } from 'next/server';

export const runtime = 'nodejs';
export const maxDuration = 777; // 800초 제한 이하의 안전한 버퍼

export async function POST(req: NextRequest) {
  const { query } = await req.json();

  const encoder = new TextEncoder();
  const stream = new TransformStream();
  const writer = stream.writable.getWriter();

  const workflow = createDynamicParallelWorkflow();

  (async () => {
    try {
      let stepCount = 0;

      // 워크플로우 이벤트 스트림
      const eventStream = await workflow.stream({
        query,
        companies: [],
        results: {},
      });

      for await (const event of eventStream) {
        stepCount++;

        // 단계 업데이트 전송
        await writer.write(
          encoder.encode(`data: ${JSON.stringify({
            type: 'step',
            stepNumber: stepCount,
            node: Object.keys(event)[0],
            preview: JSON.stringify(event).substring(0, 100) + '...'
          })}\n\n`)
        );

        // 사용 가능한 경우 최종 보고서 전송
        if (event.reduce?.finalReport) {
          await writer.write(
            encoder.encode(`data: ${JSON.stringify({
              type: 'complete',
              report: event.reduce.finalReport
            })}\n\n`)
          );
        }
      }
    } catch (error) {
      await writer.write(
        encoder.encode(`data: ${JSON.stringify({
          type: 'error',
          error: String(error)
        })}\n\n`)
      );
    } finally {
      await writer.close();
    }
  })();

  return new Response(stream.readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}

실시간 진행 상황 스트리밍으로 동적 워크플로우 실행을 처리하는 API 라우트.

5. 병렬 스트림 시각화가 있는 고급 프론트엔드

// components/DynamicWorkflowInterface.tsx
'use client';

import { useState, useEffect } from 'react';
import { useMutation } from '@tanstack/react-query';
import { partition, groupBy } from 'es-toolkit';

interface WorkflowEvent {
  type: 'step' | 'complete' | 'error';
  stepNumber?: number;
  node?: string;
  preview?: string;
  report?: string;
  error?: string;
}

interface ParallelStream {
  id: string;
  status: 'pending' | 'active' | 'complete';
  result?: any;
}

export default function DynamicWorkflowInterface() {
  const [query, setQuery] = useState('');
  const [events, setEvents] = useState<WorkflowEvent[]>([]);
  const [parallelStreams, setParallelStreams] = useState<ParallelStream[]>([]);

  const workflowMutation = useMutation({
    mutationFn: async (workflowQuery: string) => {
      setEvents([]);
      setParallelStreams([]);

      const res = await fetch('/api/dynamic-workflow', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ query: workflowQuery }),
      });

      if (!res.ok) throw new Error('워크플로우 실패');

      const reader = res.body?.getReader();
      const decoder = new TextDecoder();

      while (reader) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            try {
              const event = JSON.parse(line.slice(6)) as WorkflowEvent;
              setEvents(prev => [...prev, event]);

              // 병렬 스트림 시각화 업데이트
              if (event.node === 'research') {
                setParallelStreams(prev => {
                  const newStream: ParallelStream = {
                    id: `stream-${prev.length}`,
                    status: 'active'
                  };
                  return [...prev, newStream];
                });
              }
            } catch {}
          }
        }
      }
    },
  });

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (query.trim()) {
      workflowMutation.mutate(query);
    }
  };

  const [stepEvents, otherEvents] = partition(
    events,
    (e) => e.type === 'step'
  );

  const completeEvent = events.find(e => e.type === 'complete');

  return (
    <div className="w-full space-y-4">
      {/* 입력 폼 */}
      <div className="card bg-base-100 shadow-xl">
        <div className="card-body">
          <h2 className="card-title">동적 병렬 워크플로우</h2>

          <form onSubmit={handleSubmit}>
            <div className="form-control">
              <label className="label">
                <span className="label-text">병렬 분석을 위한 쿼리 입력</span>
              </label>
              <input
                type="text"
                className="input input-bordered"
                placeholder="예: 상위 기술 회사 수익 분석..."
                value={query}
                onChange={(e) => setQuery(e.target.value)}
                disabled={workflowMutation.isPending}
              />
            </div>

            <button
              type="submit"
              className="btn btn-primary mt-4"
              disabled={workflowMutation.isPending || !query.trim()}
            >
              {workflowMutation.isPending ? (
                <>
                  <span className="loading loading-spinner"></span>
                  병렬 워크플로우 실행 중...
                </>
              ) : '워크플로우 실행'}
            </button>
          </form>
        </div>
      </div>

      {/* 병렬 스트림 시각화 */}
      {parallelStreams.length > 0 && (
        <div className="card bg-base-100 shadow-xl">
          <div className="card-body">
            <h3 className="card-title">병렬 실행 스트림</h3>
            <div className="grid grid-cols-4 gap-2">
              {parallelStreams.map(stream => (
                <div
                  key={stream.id}
                  className={`p-2 rounded ${
                    stream.status === 'active'
                      ? 'bg-primary text-primary-content animate-pulse'
                      : 'bg-success text-success-content'
                  }`}
                >
                  <div className="text-xs font-bold">
                    {stream.id}
                  </div>
                  <div className="text-xs">
                    {stream.status}
                  </div>
                </div>
              ))}
            </div>
            <div className="text-sm text-base-content/70 mt-2">
              {parallelStreams.length}개의 병렬 작업 감지됨
            </div>
          </div>
        </div>
      )}

      {/* 워크플로우 단계 */}
      {stepEvents.length > 0 && (
        <div className="card bg-base-100 shadow-xl">
          <div className="card-body">
            <h3 className="card-title">워크플로우 진행 상황</h3>
            <ul className="steps steps-vertical">
              {stepEvents.map((event, idx) => (
                <li key={idx} className="step step-primary">
                  <div className="text-left ml-4">
                    <div className="font-semibold">{event.node}</div>
                    <div className="text-sm opacity-70">
                      단계 {event.stepNumber}
                    </div>
                    {event.preview && (
                      <div className="text-xs font-mono opacity-50">
                        {event.preview}
                      </div>
                    )}
                  </div>
                </li>
              ))}
            </ul>
          </div>
        </div>
      )}

      {/* 최종 보고서 */}
      {completeEvent?.report && (
        <div className="card bg-success text-success-content shadow-xl">
          <div className="card-body">
            <h3 className="card-title">분석 완료</h3>
            <div className="prose prose-invert max-w-none">
              <pre className="whitespace-pre-wrap">
                {completeEvent.report}
              </pre>
            </div>
          </div>
        </div>
      )}

      {/* 오류 표시 */}
      {workflowMutation.isError && (
        <div className="alert alert-error">
          <span>워크플로우 실패: {workflowMutation.error?.message}</span>
        </div>
      )}
    </div>
  );
}

실시간 업데이트로 병렬 스트림 실행을 시각화하는 고급 프론트엔드 컴포넌트.

6. 재시도 로직이 있는 오류 복원력 병렬 패턴

// lib/patterns/resilient-parallel.ts
import { RunnableParallel } from '@langchain/core/runnables';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { retry, delay, take } from 'es-toolkit';
import pLimit from 'p-limit';

interface ParallelTaskConfig {
  maxRetries?: number;
  concurrencyLimit?: number;
  timeoutMs?: number;
  backoffMs?: number;
}

export class ResilientParallelExecutor {
  private model: ChatGoogleGenerativeAI;
  private config: Required<ParallelTaskConfig>;

  constructor(config: ParallelTaskConfig = {}) {
    this.model = new ChatGoogleGenerativeAI({
      modelName: 'gemini-2.5-flash',
      temperature: 0.3,
    });

    this.config = {
      maxRetries: config.maxRetries ?? 3,
      concurrencyLimit: config.concurrencyLimit ?? 10,
      timeoutMs: config.timeoutMs ?? 30000,
      backoffMs: config.backoffMs ?? 1000,
    };
  }

  async executeWithRetry<T>(
    task: () => Promise<T>,
    taskName: string
  ): Promise<T | null> {
    for (let attempt = 1; attempt <= this.config.maxRetries; attempt++) {
      try {
        // 타임아웃 래퍼 추가
        const result = await Promise.race([
          task(),
          new Promise<never>((_, reject) =>
            setTimeout(
              () => reject(new Error(`${this.config.timeoutMs}ms 후 타임아웃`)),
              this.config.timeoutMs
            )
          ),
        ]);

        return result;
      } catch (error) {
        console.error(`작업 ${taskName} 시도 ${attempt} 실패:`, error);

        if (attempt < this.config.maxRetries) {
          // 지터가 포함된 지수 백오프
          const backoff = this.config.backoffMs * Math.pow(2, attempt - 1);
          const jitter = Math.random() * 1000;
          await delay(backoff + jitter);
        } else {
          console.error(`작업 ${taskName}이 ${attempt}번 시도 후 실패했습니다`);
          return null; // 우아한 성능 저하
        }
      }
    }

    return null;
  }

  async executeParallelTasks<T>(
    tasks: Array<{ name: string; fn: () => Promise<T> }>
  ): Promise<Array<{ name: string; result: T | null; success: boolean }>> {
    // 동시성 제한기 생성
    const limit = pLimit(this.config.concurrencyLimit);

    // 재시도 및 동시성 제어로 작업 실행
    const results = await Promise.all(
      tasks.map(({ name, fn }) =>
        limit(async () => {
          const result = await this.executeWithRetry(fn, name);
          return {
            name,
            result,
            success: result !== null,
          };
        })
      )
    );

    // 요약 로그
    const successful = results.filter(r => r.success).length;
    console.log(
      `병렬 실행 완료: ${successful}/${tasks.length} 성공`
    );

    return results;
  }

  async executeBatchedParallel<T>(
    items: T[],
    batchSize: number,
    processor: (batch: T[]) => Promise<any>
  ): Promise<any[]> {
    const batches = [];

    for (let i = 0; i < items.length; i += batchSize) {
      batches.push(items.slice(i, i + batchSize));
    }

    const results = [];

    // 제어된 병렬 처리로 배치 처리
    for (const batch of batches) {
      const batchResults = await this.executeParallelTasks(
        batch.map((item, idx) => ({
          name: `batch-item-${idx}`,
          fn: () => processor([item]),
        }))
      );

      results.push(...batchResults);
    }

    return results;
  }
}

재시도 로직, 동시성 제한 및 우아한 성능 저하를 통한 프로덕션 준비 병렬 실행을 구현합니다.

결론

병렬화는 독립적인 작업을 동시에 실행하여 AI 에이전트 성능을 분 단위에서 초 단위로 변환합니다. 여기에 제시된 패턴 - 기본 RunnableParallel에서 동적 LangGraph 워크플로우까지 - 은 Vercel의 서버리스 플랫폼에서 프로덕션 준비 병렬 에이전트 시스템을 구축하기 위한 기반을 제공합니다. 주요 내용으로는 효율적인 데이터 작업을 위한 es-toolkit 사용, 재시도 로직을 통한 적절한 오류 처리 구현, 더 나은 UX를 위한 진행 상황 업데이트 스트리밍, 777초 실행 창을 최대화하면서 서버리스 제약 준수 등이 있습니다. 이러한 패턴은 간단한 병렬 연구 에이전트부터 수백 개의 작업을 동시에 처리하는 복잡한 다중 에이전트 오케스트레이션 시스템까지 확장됩니다.