초안 에이전트 디자인 패턴 - 라우팅

aiagentslangchainlangraphroutingnextjs
By sko X opus 4.19/20/20257 min read

전문 에이전트에게 동적으로 쿼리를 전달하는 지능형 라우팅 시스템을 구축하여 선형 워크플로우를 넘어 적응적이고 컨텍스트 인식 AI 애플리케이션을 만듭니다.

멘탈 모델: AI 에이전트를 위한 교통 관제사

라우팅 패턴은 복잡한 교차로의 지능형 교통 관제사와 같다고 생각해보세요. 교통 관제사가 들어오는 차량(크기, 목적지, 긴급도)을 분석하여 최적의 차선으로 안내하는 것처럼, 라우팅 에이전트는 들어오는 요청(의도, 복잡성, 컨텍스트)을 검토하여 가장 적합한 처리 에이전트로 전달합니다. Next.js 15에서 이는 API 라우트가 스마트 디스패처 역할을 하며, Langchain/Langraph를 사용하여 요청을 평가하고 전문 핸들러로 라우팅한다는 것을 의미합니다 - Vercel의 Edge 미들웨어가 헤더를 기반으로 요청을 라우팅하는 방식과 유사하지만, 정적 규칙 대신 AI 기반 의사 결정을 사용합니다.

기본 예제: 인텐트 기반 고객 지원 라우터

1. 라우터 에이전트 생성

// lib/agents/router.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { PromptTemplate } from '@langchain/core/prompts';
import { z } from 'zod';
import { memoize } from 'es-toolkit';

const RouteSchema = z.object({
  route: z.enum(['technical', 'billing', 'general']),
  confidence: z.number(),
  reasoning: z.string()
});

export class CustomerSupportRouter {
  private model: ChatGoogleGenerativeAI;

  constructor() {
    this.model = new ChatGoogleGenerativeAI({
      model: 'gemini-2.5-flash',
      temperature: 0,
    });
  }

  // 5분 동안 동일한 쿼리에 대한 메모이제이션
  route = memoize(
    async (query: string) => {
      const prompt = PromptTemplate.fromTemplate(`
        이 고객 쿼리를 분석하여 적절한 부서로 라우팅하세요.

        쿼리: {query}

        사용 가능한 라우트:
        - technical: 제품 문제, 버그, 기술 질문
        - billing: 결제, 구독, 환불 문제
        - general: 기타 모든 문의

        JSON으로 응답: {{ "route": "...", "confidence": 0.0-1.0, "reasoning": "..." }}
      `);

      const formatted = await prompt.format({ query });
      const response = await this.model.invoke(formatted);

      return RouteSchema.parse(JSON.parse(response.content as string));
    },
    { ttl: 300000 } // 5분 캐시
  );
}

Gemini Flash를 사용하여 고객 쿼리를 분석하고 신뢰도 점수와 함께 구조화된 라우팅 결정을 반환하는 라우터를 생성합니다.

2. 전문 에이전트 구현

// lib/agents/specialized.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';

export class TechnicalSupportAgent {
  private model = new ChatGoogleGenerativeAI({
    model: 'gemini-2.5-pro',
    temperature: 0.3,
  });

  async handle(query: string) {
    const response = await this.model.invoke(
      `당신은 기술 지원 전문가입니다. 다음을 도와주세요: ${query}`
    );
    return { type: 'technical', response: response.content };
  }
}

export class BillingSupportAgent {
  private model = new ChatGoogleGenerativeAI({
    model: 'gemini-2.5-flash',
    temperature: 0,
  });

  async handle(query: string) {
    const response = await this.model.invoke(
      `당신은 청구 전문가입니다. 다음 문제를 해결하세요: ${query}`
    );
    return { type: 'billing', response: response.content };
  }
}

export class GeneralSupportAgent {
  private model = new ChatGoogleGenerativeAI({
    model: 'gemini-2.5-flash',
    temperature: 0.5,
  });

  async handle(query: string) {
    const response = await this.model.invoke(
      `당신은 친절한 어시스턴트입니다. 다음 질문에 답하세요: ${query}`
    );
    return { type: 'general', response: response.content };
  }
}

각 전문 에이전트는 특정 도메인에 최적화된 다른 모델과 온도를 사용합니다.

3. API 라우트 생성

// app/api/support/route.ts
export const runtime = 'nodejs';
export const maxDuration = 60;

import { CustomerSupportRouter } from '@/lib/agents/router';
import {
  TechnicalSupportAgent,
  BillingSupportAgent,
  GeneralSupportAgent
} from '@/lib/agents/specialized';
import { NextResponse } from 'next/server';

const router = new CustomerSupportRouter();
const agents = {
  technical: new TechnicalSupportAgent(),
  billing: new BillingSupportAgent(),
  general: new GeneralSupportAgent(),
};

export async function POST(req: Request) {
  try {
    const { query } = await req.json();

    // 쿼리 라우팅
    const routing = await router.route(query);

    // 선택된 에이전트로 실행
    const agent = agents[routing.route];
    const result = await agent.handle(query);

    return NextResponse.json({
      ...result,
      routing: {
        selected: routing.route,
        confidence: routing.confidence,
        reasoning: routing.reasoning
      }
    });
  } catch (error) {
    console.error('라우팅 오류:', error);
    return NextResponse.json(
      { error: '요청 처리 실패' },
      { status: 500 }
    );
  }
}

라우터의 결정에 따라 쿼리를 전문 에이전트로 라우팅하는 API 엔드포인트.

4. TanStack Query를 사용한 프론트엔드 컴포넌트

// components/SupportChat.tsx
'use client';

import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';

export default function SupportChat() {
  const [message, setMessage] = useState('');

  const submitQuery = useMutation({
    mutationFn: async (query: string) => {
      const res = await fetch('/api/support', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ query }),
      });

      if (!res.ok) throw new Error('요청 실패');
      return res.json();
    },
  });

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (message.trim()) {
      submitQuery.mutate(message);
      setMessage('');
    }
  };

  return (
    <div className="card w-full bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">고객 지원</h2>

        <form onSubmit={handleSubmit}>
          <input
            type="text"
            className="input input-bordered w-full"
            placeholder="무엇을 도와드릴까요?"
            value={message}
            onChange={(e) => setMessage(e.target.value)}
            disabled={submitQuery.isPending}
          />

          <button
            type="submit"
            className="btn btn-primary mt-4"
            disabled={submitQuery.isPending || !message.trim()}
          >
            {submitQuery.isPending ? (
              <span className="loading loading-spinner"></span>
            ) : '전송'}
          </button>
        </form>

        {submitQuery.data && (
          <div className="alert mt-4">
            <div>
              <div className="badge badge-secondary">
                {submitQuery.data.routing.selected}
              </div>
              <p className="mt-2">{submitQuery.data.response}</p>
            </div>
          </div>
        )}
      </div>
    </div>
  );
}

TanStack Query를 사용하여 라우팅 결정과 에이전트 응답을 표시하는 React 컴포넌트.

고급 예제: 멀티 스테이지 문서 처리 파이프라인

1. 추가 종속성 설치

npm install @langchain/langgraph @upstash/redis pdf-parse

상태 관리 워크플로우를 위한 Langraph와 분산 상태 관리를 위한 Upstash Redis 추가.

2. Langraph로 라우팅 상태 머신 정의

// lib/workflows/document-router.ts
import { StateGraph, Annotation } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage } from '@langchain/core/messages';
import { groupBy, chunk } from 'es-toolkit';

const DocumentState = Annotation.Root({
  documentId: Annotation<string>(),
  content: Annotation<string>(),
  documentType: Annotation<string>(),
  confidence: Annotation<number>(),
  processingStage: Annotation<string>(),
  extractedData: Annotation<Record<string, any>>(),
  errors: Annotation<string[]>(),
});

export function createDocumentRoutingWorkflow() {
  const model = new ChatGoogleGenerativeAI({
    model: 'gemini-2.5-pro',
    temperature: 0,
  });

  const workflow = new StateGraph(DocumentState)
    // 분류 노드
    .addNode('classify', async (state) => {
      const response = await model.invoke(
        `이 문서 유형을 분류하세요: ${state.content.substring(0, 1000)}`
      );

      // 분류 결과 파싱
      const type = extractDocumentType(response.content as string);
      const confidence = calculateConfidence(response.content as string);

      return {
        documentType: type,
        confidence: confidence,
        processingStage: 'classified',
      };
    })

    // 송장 프로세서
    .addNode('process_invoice', async (state) => {
      const invoiceData = await extractInvoiceData(state.content);
      return {
        extractedData: invoiceData,
        processingStage: 'completed',
      };
    })

    // 계약서 프로세서
    .addNode('process_contract', async (state) => {
      const contractData = await extractContractData(state.content);
      return {
        extractedData: contractData,
        processingStage: 'completed',
      };
    })

    // 일반 프로세서
    .addNode('process_general', async (state) => {
      const generalData = await extractGeneralData(state.content);
      return {
        extractedData: generalData,
        processingStage: 'completed',
      };
    })

    // 휴먼 리뷰 노드
    .addNode('human_review', async (state) => {
      await notifyHumanReviewer(state);
      return {
        processingStage: 'pending_review',
      };
    });

  // 조건부 라우팅 추가
  workflow.addConditionalEdges('classify', (state) => {
    if (state.confidence < 0.7) {
      return 'human_review';
    }

    switch (state.documentType) {
      case 'invoice':
        return 'process_invoice';
      case 'contract':
        return 'process_contract';
      default:
        return 'process_general';
    }
  });

  // 진입점 설정
  workflow.setEntryPoint('classify');

  return workflow.compile();
}

문서를 분류하고 신뢰도에 따라 전문 프로세서로 라우팅하는 Langraph 워크플로우.

3. 상태 관리를 포함한 스트리밍 API 구현

// app/api/documents/process/route.ts
export const runtime = 'nodejs';
export const maxDuration = 300;

import { createDocumentRoutingWorkflow } from '@/lib/workflows/document-router';
import { Redis } from '@upstash/redis';

const redis = Redis.fromEnv();
const workflow = createDocumentRoutingWorkflow();

export async function POST(req: Request) {
  const { documentId, content } = await req.json();

  const encoder = new TextEncoder();
  const stream = new TransformStream();
  const writer = stream.writable.getWriter();

  // 백그라운드에서 처리
  (async () => {
    try {
      // 초기 상태
      const initialState = {
        documentId,
        content,
        documentType: '',
        confidence: 0,
        processingStage: 'pending',
        extractedData: {},
        errors: [],
      };

      // Redis에 초기 상태 저장
      await redis.set(
        `doc:${documentId}:state`,
        JSON.stringify(initialState),
        { ex: 3600 } // 1시간 TTL
      );

      // 워크플로우 이벤트 스트리밍
      const eventStream = await workflow.stream(initialState);

      for await (const event of eventStream) {
        const state = event[Object.keys(event)[0]];

        // Redis 상태 업데이트
        await redis.set(
          `doc:${documentId}:state`,
          JSON.stringify(state),
          { ex: 3600 }
        );

        // 클라이언트로 스트리밍
        await writer.write(
          encoder.encode(`data: ${JSON.stringify({
            stage: state.processingStage,
            type: state.documentType,
            confidence: state.confidence,
            hasData: !!state.extractedData,
          })}\n\n`)
        );

        if (state.processingStage === 'completed' ||
            state.processingStage === 'pending_review') {
          break;
        }
      }

      await writer.write(
        encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)
      );
    } catch (error) {
      console.error('워크플로우 오류:', error);
      await writer.write(
        encoder.encode(`data: ${JSON.stringify({
          error: '처리 실패'
        })}\n\n`)
      );
    } finally {
      await writer.close();
    }
  })();

  return new Response(stream.readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}

라우팅 워크플로우를 통해 문서를 처리하고 Redis에 상태를 저장하는 스트리밍 API.

4. 스트리밍 업데이트용 훅 생성

// hooks/useDocumentProcessing.ts
import { useState, useCallback } from 'react';
import { useMutation } from '@tanstack/react-query';

interface ProcessingUpdate {
  stage?: string;
  type?: string;
  confidence?: number;
  done?: boolean;
  error?: string;
}

export function useDocumentProcessing() {
  const [updates, setUpdates] = useState<ProcessingUpdate[]>([]);
  const [isProcessing, setIsProcessing] = useState(false);

  const processDocument = useCallback(async (
    documentId: string,
    content: string
  ) => {
    setIsProcessing(true);
    setUpdates([]);

    try {
      const response = await fetch('/api/documents/process', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ documentId, content }),
      });

      if (!response.ok) throw new Error('처리 실패');

      const reader = response.body?.getReader();
      const decoder = new TextDecoder();

      while (reader) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            try {
              const update = JSON.parse(line.slice(6));
              setUpdates(prev => [...prev, update]);

              if (update.done || update.error) {
                setIsProcessing(false);
                return update;
              }
            } catch {}
          }
        }
      }
    } catch (error) {
      setIsProcessing(false);
      throw error;
    }
  }, []);

  return {
    processDocument,
    updates,
    isProcessing,
    currentStage: updates[updates.length - 1]?.stage,
    documentType: updates[updates.length - 1]?.type,
    confidence: updates[updates.length - 1]?.confidence,
  };
}

문서 처리 상태와 스트리밍 업데이트를 관리하는 커스텀 훅.

5. 문서 처리 UI 구축

// components/DocumentProcessor.tsx
'use client';

import { useDocumentProcessing } from '@/hooks/useDocumentProcessing';
import { useState } from 'react';

export default function DocumentProcessor() {
  const [file, setFile] = useState<File | null>(null);
  const {
    processDocument,
    updates,
    isProcessing,
    currentStage,
    documentType,
    confidence
  } = useDocumentProcessing();

  const handleFileSelect = (e: React.ChangeEvent<HTMLInputElement>) => {
    const selectedFile = e.target.files?.[0];
    if (selectedFile) {
      setFile(selectedFile);
    }
  };

  const handleProcess = async () => {
    if (!file) return;

    const content = await file.text();
    await processDocument(file.name, content);
  };

  return (
    <div className="card w-full bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">문서 프로세서</h2>

        <div className="form-control">
          <label className="label">
            <span className="label-text">문서 선택</span>
          </label>
          <input
            type="file"
            className="file-input file-input-bordered"
            onChange={handleFileSelect}
            disabled={isProcessing}
            accept=".pdf,.txt,.docx"
          />
        </div>

        <button
          className="btn btn-primary"
          onClick={handleProcess}
          disabled={!file || isProcessing}
        >
          {isProcessing ? (
            <>
              <span className="loading loading-spinner"></span>
              처리 중...
            </>
          ) : '문서 처리'}
        </button>

        {updates.length > 0 && (
          <div className="mt-6">
            <h3 className="font-semibold mb-4">처리 단계</h3>

            <ul className="steps steps-vertical">
              {['classify', 'process', 'complete'].map((step) => (
                <li
                  key={step}
                  className={`step ${
                    updates.some(u => u.stage === step) ? 'step-primary' : ''
                  }`}
                >
                  <div className="text-left">
                    <div className="font-medium capitalize">{step}</div>
                    {step === 'classify' && documentType && (
                      <div className="text-sm opacity-70">
                        유형: {documentType} ({Math.round((confidence || 0) * 100)}%)
                      </div>
                    )}
                  </div>
                </li>
              ))}
            </ul>

            {currentStage === 'pending_review' && (
              <div className="alert alert-warning mt-4">
                <span>낮은 신뢰도로 인해 문서가 휴먼 리뷰로 전송되었습니다</span>
              </div>
            )}

            {currentStage === 'completed' && (
              <div className="alert alert-success mt-4">
                <span>처리 완료! 문서 유형: {documentType}</span>
              </div>
            )}
          </div>
        )}
      </div>
    </div>
  );
}

실시간 라우팅 결정과 처리 진행 상황을 표시하는 UI 컴포넌트.

6. 임베딩을 사용한 시맨틱 라우팅 추가

// lib/routing/semantic-router.ts
import { GoogleGenerativeAIEmbeddings } from '@langchain/google-genai';
import { cosineSimilarity } from '@langchain/core/utils/math';
import { memoize } from 'es-toolkit';

interface Route {
  name: string;
  description: string;
  examples: string[];
  handler: string;
}

export class SemanticRouter {
  private embeddings: GoogleGenerativeAIEmbeddings;
  private routeEmbeddings: Map<string, number[]> = new Map();

  constructor(private routes: Route[]) {
    this.embeddings = new GoogleGenerativeAIEmbeddings({
      model: 'embedding-001',
    });
    this.initialize();
  }

  private async initialize() {
    // 각 라우트에 대한 임베딩 생성
    for (const route of this.routes) {
      const description = `${route.description} ${route.examples.join(' ')}`;
      const embedding = await this.embeddings.embedQuery(description);
      this.routeEmbeddings.set(route.name, embedding);
    }
  }

  // 성능을 위한 메모이제이션
  findBestRoute = memoize(
    async (query: string): Promise<{ route: Route; similarity: number }> => {
      const queryEmbedding = await this.embeddings.embedQuery(query);

      let bestRoute: Route | null = null;
      let bestSimilarity = -1;

      for (const route of this.routes) {
        const routeEmbedding = this.routeEmbeddings.get(route.name)!;
        const similarity = cosineSimilarity(
          [queryEmbedding],
          [routeEmbedding]
        )[0][0];

        if (similarity > bestSimilarity) {
          bestSimilarity = similarity;
          bestRoute = route;
        }
      }

      return {
        route: bestRoute!,
        similarity: bestSimilarity,
      };
    },
    { ttl: 60000 } // 1분 캐시
  );
}

키워드가 아닌 의미를 기반으로 가장 유사한 라우트를 찾기 위해 임베딩을 사용하는 시맨틱 라우터.

7. 학습을 통한 적응형 라우팅 구현

// lib/routing/adaptive-router.ts
import { Redis } from '@upstash/redis';
import { pick, omit } from 'es-toolkit';

interface RoutingDecision {
  query: string;
  selectedRoute: string;
  confidence: number;
  timestamp: number;
  outcome?: 'success' | 'failure';
}

export class AdaptiveRouter {
  private redis = Redis.fromEnv();

  async recordDecision(decision: RoutingDecision) {
    const key = `routing:history`;
    await this.redis.lpush(key, JSON.stringify(decision));
    await this.redis.ltrim(key, 0, 999); // 마지막 1000개 결정 유지
  }

  async updateOutcome(
    query: string,
    route: string,
    outcome: 'success' | 'failure'
  ) {
    // 결정을 찾아서 업데이트
    const history = await this.redis.lrange('routing:history', 0, 99);

    for (let i = 0; i < history.length; i++) {
      const decision = JSON.parse(history[i] as string) as RoutingDecision;

      if (decision.query === query && decision.selectedRoute === route) {
        decision.outcome = outcome;
        await this.redis.lset('routing:history', i, JSON.stringify(decision));
        break;
      }
    }

    // 라우트 통계 업데이트
    const statKey = `routing:stats:${route}`;
    await this.redis.hincrby(statKey, outcome, 1);
  }

  async getRoutePerformance(route: string) {
    const stats = await this.redis.hgetall(`routing:stats:${route}`);
    const success = parseInt(stats.success || '0');
    const failure = parseInt(stats.failure || '0');
    const total = success + failure;

    return {
      successRate: total > 0 ? success / total : 0.5,
      totalRequests: total,
    };
  }

  async selectBestRoute(candidates: string[]): Promise<string> {
    const performances = await Promise.all(
      candidates.map(async (route) => ({
        route,
        performance: await this.getRoutePerformance(route),
      }))
    );

    // 최고 성공률을 가진 라우트 선택 (탐색 포함)
    const explorationRate = 0.1;

    if (Math.random() < explorationRate) {
      // 탐색: 무작위 선택
      return candidates[Math.floor(Math.random() * candidates.length)];
    } else {
      // 활용: 최고 성능 선택
      return performances.reduce((best, current) =>
        current.performance.successRate > best.performance.successRate
          ? current
          : best
      ).route;
    }
  }
}

결과에서 학습하고 강화 학습 원칙을 사용하여 시간이 지남에 따라 라우팅 결정을 개선하는 적응형 라우터.

결론

라우팅 패턴은 정적이고 선형적인 AI 워크플로우를 컨텍스트에 적응하고 결과에서 학습하는 동적이고 지능적인 시스템으로 변환합니다. Next.js 15에서 Langchain과 Langraph로 라우팅을 구현하면 전문 에이전트 간에 작업을 효율적으로 분산하고, 스마트한 모델 선택을 통해 비용을 절감하며, 피드백 루프를 통해 지속적으로 개선되는 프로덕션 준비 애플리케이션을 구축할 수 있습니다. 시맨틱 이해, 상태 관리, 적응형 학습의 조합은 Vercel의 서버리스 플랫폼의 확장성과 개발자 경험 이점을 유지하면서 시간이 지남에 따라 더욱 효과적이 되는 AI 시스템을 만들어냅니다.