ドラフト エージェントデザインパターン - ルーティング

aiagentslangchainlangraphroutingnextjs
By sko X opus 4.19/20/202510 min read

特殊化されたエージェントに動的にクエリを誘導するインテリジェントなルーティングシステムを構築し、線形ワークフローを超えて適応的でコンテキスト認識型のAIアプリケーションを作成します。

メンタルモデル:AIエージェントのトラフィックコントローラー

ルーティングパターンは、混雑した交差点のインテリジェントな交通管制官のようなものだと考えてください。交通管制官が入ってくる車両(サイズ、目的地、緊急度)を分析して最適なレーンに誘導するように、ルーティングエージェントは入ってくるリクエスト(意図、複雑さ、コンテキスト)を調べて、最も適切な処理エージェントに誘導します。Next.js 15では、APIルートがスマートディスパッチャーとして機能し、Langchain/Langraphを使用してリクエストを評価し、特殊化されたハンドラーにルーティングします - VercelのEdgeミドルウェアがヘッダーに基づいてリクエストをルーティングする方法に似ていますが、静的なルールではなくAIを使用した意思決定を行います。

基本例:インテントベースのカスタマーサポートルーター

1. ルーターエージェントの作成

// lib/agents/router.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { PromptTemplate } from '@langchain/core/prompts';
import { z } from 'zod';
import { memoize } from 'es-toolkit';

const RouteSchema = z.object({
  route: z.enum(['technical', 'billing', 'general']),
  confidence: z.number(),
  reasoning: z.string()
});

export class CustomerSupportRouter {
  private model: ChatGoogleGenerativeAI;

  constructor() {
    this.model = new ChatGoogleGenerativeAI({
      model: 'gemini-2.5-flash',
      temperature: 0,
    });
  }

  // 5分間の同一クエリに対するメモ化
  route = memoize(
    async (query: string) => {
      const prompt = PromptTemplate.fromTemplate(`
        この顧客のクエリを分析し、適切な部門にルーティングしてください。

        クエリ: {query}

        利用可能なルート:
        - technical: 製品の問題、バグ、技術的な質問
        - billing: 支払い、サブスクリプション、返金の問題
        - general: その他すべての問い合わせ

        JSONで応答: {{ "route": "...", "confidence": 0.0-1.0, "reasoning": "..." }}
      `);

      const formatted = await prompt.format({ query });
      const response = await this.model.invoke(formatted);

      return RouteSchema.parse(JSON.parse(response.content as string));
    },
    { ttl: 300000 } // 5分間のキャッシュ
  );
}

Gemini Flashを使用して顧客のクエリを分析し、信頼度スコアと共に構造化されたルーティング決定を返すルーターを作成します。

2. 特殊化エージェントの実装

// lib/agents/specialized.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';

export class TechnicalSupportAgent {
  private model = new ChatGoogleGenerativeAI({
    model: 'gemini-2.5-pro',
    temperature: 0.3,
  });

  async handle(query: string) {
    const response = await this.model.invoke(
      `あなたは技術サポートの専門家です。次の件について支援してください: ${query}`
    );
    return { type: 'technical', response: response.content };
  }
}

export class BillingSupportAgent {
  private model = new ChatGoogleGenerativeAI({
    model: 'gemini-2.5-flash',
    temperature: 0,
  });

  async handle(query: string) {
    const response = await this.model.invoke(
      `あなたは請求の専門家です。次の問題を解決してください: ${query}`
    );
    return { type: 'billing', response: response.content };
  }
}

export class GeneralSupportAgent {
  private model = new ChatGoogleGenerativeAI({
    model: 'gemini-2.5-flash',
    temperature: 0.5,
  });

  async handle(query: string) {
    const response = await this.model.invoke(
      `あなたは親切なアシスタントです。次の質問に答えてください: ${query}`
    );
    return { type: 'general', response: response.content };
  }
}

各特殊化エージェントは、特定のドメインに最適化された異なるモデルと温度を使用します。

3. APIルートの作成

// app/api/support/route.ts
export const runtime = 'nodejs';
export const maxDuration = 60;

import { CustomerSupportRouter } from '@/lib/agents/router';
import {
  TechnicalSupportAgent,
  BillingSupportAgent,
  GeneralSupportAgent
} from '@/lib/agents/specialized';
import { NextResponse } from 'next/server';

const router = new CustomerSupportRouter();
const agents = {
  technical: new TechnicalSupportAgent(),
  billing: new BillingSupportAgent(),
  general: new GeneralSupportAgent(),
};

export async function POST(req: Request) {
  try {
    const { query } = await req.json();

    // クエリをルーティング
    const routing = await router.route(query);

    // 選択されたエージェントで実行
    const agent = agents[routing.route];
    const result = await agent.handle(query);

    return NextResponse.json({
      ...result,
      routing: {
        selected: routing.route,
        confidence: routing.confidence,
        reasoning: routing.reasoning
      }
    });
  } catch (error) {
    console.error('ルーティングエラー:', error);
    return NextResponse.json(
      { error: 'リクエストの処理に失敗しました' },
      { status: 500 }
    );
  }
}

ルーターの決定に基づいて、クエリを特殊化エージェントにルーティングするAPIエンドポイント。

4. TanStack Queryを使用したフロントエンドコンポーネント

// components/SupportChat.tsx
'use client';

import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';

export default function SupportChat() {
  const [message, setMessage] = useState('');

  const submitQuery = useMutation({
    mutationFn: async (query: string) => {
      const res = await fetch('/api/support', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ query }),
      });

      if (!res.ok) throw new Error('リクエストが失敗しました');
      return res.json();
    },
  });

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (message.trim()) {
      submitQuery.mutate(message);
      setMessage('');
    }
  };

  return (
    <div className="card w-full bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">カスタマーサポート</h2>

        <form onSubmit={handleSubmit}>
          <input
            type="text"
            className="input input-bordered w-full"
            placeholder="どのようにお手伝いできますか?"
            value={message}
            onChange={(e) => setMessage(e.target.value)}
            disabled={submitQuery.isPending}
          />

          <button
            type="submit"
            className="btn btn-primary mt-4"
            disabled={submitQuery.isPending || !message.trim()}
          >
            {submitQuery.isPending ? (
              <span className="loading loading-spinner"></span>
            ) : '送信'}
          </button>
        </form>

        {submitQuery.data && (
          <div className="alert mt-4">
            <div>
              <div className="badge badge-secondary">
                {submitQuery.data.routing.selected}
              </div>
              <p className="mt-2">{submitQuery.data.response}</p>
            </div>
          </div>
        )}
      </div>
    </div>
  );
}

TanStack Queryを使用してルーティングの決定とエージェントの応答を表示するReactコンポーネント。

高度な例:マルチステージドキュメント処理パイプライン

1. 追加の依存関係のインストール

npm install @langchain/langgraph @upstash/redis pdf-parse

ステートフルワークフローのためのLanggraphと、分散状態管理のためのUpstash Redisを追加。

2. Langgraphを使用したルーティングステートマシンの定義

// lib/workflows/document-router.ts
import { StateGraph, Annotation } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage } from '@langchain/core/messages';
import { groupBy, chunk } from 'es-toolkit';

const DocumentState = Annotation.Root({
  documentId: Annotation<string>(),
  content: Annotation<string>(),
  documentType: Annotation<string>(),
  confidence: Annotation<number>(),
  processingStage: Annotation<string>(),
  extractedData: Annotation<Record<string, any>>(),
  errors: Annotation<string[]>(),
});

export function createDocumentRoutingWorkflow() {
  const model = new ChatGoogleGenerativeAI({
    model: 'gemini-2.5-pro',
    temperature: 0,
  });

  const workflow = new StateGraph(DocumentState)
    // 分類ノード
    .addNode('classify', async (state) => {
      const response = await model.invoke(
        `このドキュメントタイプを分類してください: ${state.content.substring(0, 1000)}`
      );

      // 分類結果を解析
      const type = extractDocumentType(response.content as string);
      const confidence = calculateConfidence(response.content as string);

      return {
        documentType: type,
        confidence: confidence,
        processingStage: 'classified',
      };
    })

    // 請求書プロセッサー
    .addNode('process_invoice', async (state) => {
      const invoiceData = await extractInvoiceData(state.content);
      return {
        extractedData: invoiceData,
        processingStage: 'completed',
      };
    })

    // 契約書プロセッサー
    .addNode('process_contract', async (state) => {
      const contractData = await extractContractData(state.content);
      return {
        extractedData: contractData,
        processingStage: 'completed',
      };
    })

    // 汎用プロセッサー
    .addNode('process_general', async (state) => {
      const generalData = await extractGeneralData(state.content);
      return {
        extractedData: generalData,
        processingStage: 'completed',
      };
    })

    // 人間レビューノード
    .addNode('human_review', async (state) => {
      await notifyHumanReviewer(state);
      return {
        processingStage: 'pending_review',
      };
    });

  // 条件付きルーティングの追加
  workflow.addConditionalEdges('classify', (state) => {
    if (state.confidence < 0.7) {
      return 'human_review';
    }

    switch (state.documentType) {
      case 'invoice':
        return 'process_invoice';
      case 'contract':
        return 'process_contract';
      default:
        return 'process_general';
    }
  });

  // エントリーポイントの設定
  workflow.setEntryPoint('classify');

  return workflow.compile();
}

ドキュメントを分類し、信頼度に基づいて特殊化プロセッサーにルーティングするLanggraphワークフロー。

3. 状態管理を伴うストリーミングAPIの実装

// app/api/documents/process/route.ts
export const runtime = 'nodejs';
export const maxDuration = 300;

import { createDocumentRoutingWorkflow } from '@/lib/workflows/document-router';
import { Redis } from '@upstash/redis';

const redis = Redis.fromEnv();
const workflow = createDocumentRoutingWorkflow();

export async function POST(req: Request) {
  const { documentId, content } = await req.json();

  const encoder = new TextEncoder();
  const stream = new TransformStream();
  const writer = stream.writable.getWriter();

  // バックグラウンドで処理
  (async () => {
    try {
      // 初期状態
      const initialState = {
        documentId,
        content,
        documentType: '',
        confidence: 0,
        processingStage: 'pending',
        extractedData: {},
        errors: [],
      };

      // Redisに初期状態を保存
      await redis.set(
        `doc:${documentId}:state`,
        JSON.stringify(initialState),
        { ex: 3600 } // 1時間のTTL
      );

      // ワークフローイベントをストリーミング
      const eventStream = await workflow.stream(initialState);

      for await (const event of eventStream) {
        const state = event[Object.keys(event)[0]];

        // Redis状態を更新
        await redis.set(
          `doc:${documentId}:state`,
          JSON.stringify(state),
          { ex: 3600 }
        );

        // クライアントにストリーミング
        await writer.write(
          encoder.encode(`data: ${JSON.stringify({
            stage: state.processingStage,
            type: state.documentType,
            confidence: state.confidence,
            hasData: !!state.extractedData,
          })}\n\n`)
        );

        if (state.processingStage === 'completed' ||
            state.processingStage === 'pending_review') {
          break;
        }
      }

      await writer.write(
        encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)
      );
    } catch (error) {
      console.error('ワークフローエラー:', error);
      await writer.write(
        encoder.encode(`data: ${JSON.stringify({
          error: '処理に失敗しました'
        })}\n\n`)
      );
    } finally {
      await writer.close();
    }
  })();

  return new Response(stream.readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}

ルーティングワークフローを通じてドキュメントを処理し、Redisに状態を保存するストリーミングAPI。

4. ストリーミング更新用のフックを作成

// hooks/useDocumentProcessing.ts
import { useState, useCallback } from 'react';
import { useMutation } from '@tanstack/react-query';

interface ProcessingUpdate {
  stage?: string;
  type?: string;
  confidence?: number;
  done?: boolean;
  error?: string;
}

export function useDocumentProcessing() {
  const [updates, setUpdates] = useState<ProcessingUpdate[]>([]);
  const [isProcessing, setIsProcessing] = useState(false);

  const processDocument = useCallback(async (
    documentId: string,
    content: string
  ) => {
    setIsProcessing(true);
    setUpdates([]);

    try {
      const response = await fetch('/api/documents/process', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ documentId, content }),
      });

      if (!response.ok) throw new Error('処理に失敗しました');

      const reader = response.body?.getReader();
      const decoder = new TextDecoder();

      while (reader) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            try {
              const update = JSON.parse(line.slice(6));
              setUpdates(prev => [...prev, update]);

              if (update.done || update.error) {
                setIsProcessing(false);
                return update;
              }
            } catch {}
          }
        }
      }
    } catch (error) {
      setIsProcessing(false);
      throw error;
    }
  }, []);

  return {
    processDocument,
    updates,
    isProcessing,
    currentStage: updates[updates.length - 1]?.stage,
    documentType: updates[updates.length - 1]?.type,
    confidence: updates[updates.length - 1]?.confidence,
  };
}

ドキュメント処理の状態とストリーミング更新を管理するカスタムフック。

5. ドキュメント処理UIの構築

// components/DocumentProcessor.tsx
'use client';

import { useDocumentProcessing } from '@/hooks/useDocumentProcessing';
import { useState } from 'react';

export default function DocumentProcessor() {
  const [file, setFile] = useState<File | null>(null);
  const {
    processDocument,
    updates,
    isProcessing,
    currentStage,
    documentType,
    confidence
  } = useDocumentProcessing();

  const handleFileSelect = (e: React.ChangeEvent<HTMLInputElement>) => {
    const selectedFile = e.target.files?.[0];
    if (selectedFile) {
      setFile(selectedFile);
    }
  };

  const handleProcess = async () => {
    if (!file) return;

    const content = await file.text();
    await processDocument(file.name, content);
  };

  return (
    <div className="card w-full bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">ドキュメントプロセッサー</h2>

        <div className="form-control">
          <label className="label">
            <span className="label-text">ドキュメントを選択</span>
          </label>
          <input
            type="file"
            className="file-input file-input-bordered"
            onChange={handleFileSelect}
            disabled={isProcessing}
            accept=".pdf,.txt,.docx"
          />
        </div>

        <button
          className="btn btn-primary"
          onClick={handleProcess}
          disabled={!file || isProcessing}
        >
          {isProcessing ? (
            <>
              <span className="loading loading-spinner"></span>
              処理中...
            </>
          ) : 'ドキュメントを処理'}
        </button>

        {updates.length > 0 && (
          <div className="mt-6">
            <h3 className="font-semibold mb-4">処理ステップ</h3>

            <ul className="steps steps-vertical">
              {['classify', 'process', 'complete'].map((step) => (
                <li
                  key={step}
                  className={`step ${
                    updates.some(u => u.stage === step) ? 'step-primary' : ''
                  }`}
                >
                  <div className="text-left">
                    <div className="font-medium capitalize">{step}</div>
                    {step === 'classify' && documentType && (
                      <div className="text-sm opacity-70">
                        タイプ: {documentType} ({Math.round((confidence || 0) * 100)}%)
                      </div>
                    )}
                  </div>
                </li>
              ))}
            </ul>

            {currentStage === 'pending_review' && (
              <div className="alert alert-warning mt-4">
                <span>低い信頼度のため、ドキュメントは人間のレビューに送られました</span>
              </div>
            )}

            {currentStage === 'completed' && (
              <div className="alert alert-success mt-4">
                <span>処理完了!ドキュメントタイプ: {documentType}</span>
              </div>
            )}
          </div>
        )}
      </div>
    </div>
  );
}

リアルタイムのルーティング決定と処理の進行状況を表示するUIコンポーネント。

6. 埋め込みによるセマンティックルーティングの追加

// lib/routing/semantic-router.ts
import { GoogleGenerativeAIEmbeddings } from '@langchain/google-genai';
import { cosineSimilarity } from '@langchain/core/utils/math';
import { memoize } from 'es-toolkit';

interface Route {
  name: string;
  description: string;
  examples: string[];
  handler: string;
}

export class SemanticRouter {
  private embeddings: GoogleGenerativeAIEmbeddings;
  private routeEmbeddings: Map<string, number[]> = new Map();

  constructor(private routes: Route[]) {
    this.embeddings = new GoogleGenerativeAIEmbeddings({
      model: 'embedding-001',
    });
    this.initialize();
  }

  private async initialize() {
    // 各ルートの埋め込みを生成
    for (const route of this.routes) {
      const description = `${route.description} ${route.examples.join(' ')}`;
      const embedding = await this.embeddings.embedQuery(description);
      this.routeEmbeddings.set(route.name, embedding);
    }
  }

  // パフォーマンスのためのメモ化
  findBestRoute = memoize(
    async (query: string): Promise<{ route: Route; similarity: number }> => {
      const queryEmbedding = await this.embeddings.embedQuery(query);

      let bestRoute: Route | null = null;
      let bestSimilarity = -1;

      for (const route of this.routes) {
        const routeEmbedding = this.routeEmbeddings.get(route.name)!;
        const similarity = cosineSimilarity(
          [queryEmbedding],
          [routeEmbedding]
        )[0][0];

        if (similarity > bestSimilarity) {
          bestSimilarity = similarity;
          bestRoute = route;
        }
      }

      return {
        route: bestRoute!,
        similarity: bestSimilarity,
      };
    },
    { ttl: 60000 } // 1分間のキャッシュ
  );
}

キーワードではなく意味に基づいて最も類似したルートを見つけるために埋め込みを使用するセマンティックルーター。

7. 学習による適応型ルーティングの実装

// lib/routing/adaptive-router.ts
import { Redis } from '@upstash/redis';
import { pick, omit } from 'es-toolkit';

interface RoutingDecision {
  query: string;
  selectedRoute: string;
  confidence: number;
  timestamp: number;
  outcome?: 'success' | 'failure';
}

export class AdaptiveRouter {
  private redis = Redis.fromEnv();

  async recordDecision(decision: RoutingDecision) {
    const key = `routing:history`;
    await this.redis.lpush(key, JSON.stringify(decision));
    await this.redis.ltrim(key, 0, 999); // 最後の1000件の決定を保持
  }

  async updateOutcome(
    query: string,
    route: string,
    outcome: 'success' | 'failure'
  ) {
    // 決定を検索して更新
    const history = await this.redis.lrange('routing:history', 0, 99);

    for (let i = 0; i < history.length; i++) {
      const decision = JSON.parse(history[i] as string) as RoutingDecision;

      if (decision.query === query && decision.selectedRoute === route) {
        decision.outcome = outcome;
        await this.redis.lset('routing:history', i, JSON.stringify(decision));
        break;
      }
    }

    // ルート統計を更新
    const statKey = `routing:stats:${route}`;
    await this.redis.hincrby(statKey, outcome, 1);
  }

  async getRoutePerformance(route: string) {
    const stats = await this.redis.hgetall(`routing:stats:${route}`);
    const success = parseInt(stats.success || '0');
    const failure = parseInt(stats.failure || '0');
    const total = success + failure;

    return {
      successRate: total > 0 ? success / total : 0.5,
      totalRequests: total,
    };
  }

  async selectBestRoute(candidates: string[]): Promise<string> {
    const performances = await Promise.all(
      candidates.map(async (route) => ({
        route,
        performance: await this.getRoutePerformance(route),
      }))
    );

    // 最高の成功率を持つルートを選択(探索あり)
    const explorationRate = 0.1;

    if (Math.random() < explorationRate) {
      // 探索:ランダムに選択
      return candidates[Math.floor(Math.random() * candidates.length)];
    } else {
      // 活用:最高のパフォーマーを選択
      return performances.reduce((best, current) =>
        current.performance.successRate > best.performance.successRate
          ? current
          : best
      ).route;
    }
  }
}

結果から学習し、強化学習の原理を使用して時間の経過とともにルーティング決定を改善する適応型ルーター。

まとめ

ルーティングパターンは、静的で線形なAIワークフローを、コンテキストに適応し、結果から学習する動的でインテリジェントなシステムに変換します。Next.js 15でLangchainとLanggraphを使用してルーティングを実装することで、特殊化エージェント間で作業を効率的に分散し、スマートなモデル選択によりコストを削減し、フィードバックループを通じて継続的に改善する、本番環境対応のアプリケーションを構築できます。セマンティック理解、状態管理、適応学習の組み合わせにより、Vercelのサーバーレスプラットフォームの拡張性と開発者体験の利点を維持しながら、時間の経過とともにより効果的になるAIシステムを作成できます。