草案 智能体设计模式 - 路由

aiagentslangchainlangraphroutingnextjs
By sko X opus 4.19/20/202512 min read

构建智能路由系统,动态地将查询导向专门的智能体,超越线性工作流程,创建自适应、上下文感知的AI应用程序。

心智模型:AI智能体的交通控制器

将路由模式想象成繁忙十字路口的智能交通控制器。就像交通控制器分析进入的车辆(大小、目的地、紧急程度)并将其引导到最优车道一样,路由智能体检查传入的请求(意图、复杂性、上下文)并将它们导向最合适的处理智能体。在Next.js 15中,这意味着你的API路由充当智能调度器,使用Langchain/Langraph来评估请求并将其路由到专门的处理程序——就像Vercel的Edge中间件基于头部路由请求,但使用AI驱动的决策而不是静态规则。

基础示例:基于意图的客户支持路由器

1. 创建路由器智能体

// lib/agents/router.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { PromptTemplate } from '@langchain/core/prompts';
import { z } from 'zod';
import { memoize } from 'es-toolkit';

const RouteSchema = z.object({
  route: z.enum(['technical', 'billing', 'general']),
  confidence: z.number(),
  reasoning: z.string()
});

export class CustomerSupportRouter {
  private model: ChatGoogleGenerativeAI;

  constructor() {
    this.model = new ChatGoogleGenerativeAI({
      model: 'gemini-2.5-flash',
      temperature: 0,
    });
  }

  // 对5分钟内相同查询进行记忆化
  route = memoize(
    async (query: string) => {
      const prompt = PromptTemplate.fromTemplate(`
        分析这个客户查询并将其路由到适当的部门。

        查询:{query}

        可用路由:
        - technical:产品问题、错误、技术问题
        - billing:付款、订阅、退款问题
        - general:所有其他查询

        以JSON响应:{{ "route": "...", "confidence": 0.0-1.0, "reasoning": "..." }}
      `);

      const formatted = await prompt.format({ query });
      const response = await this.model.invoke(formatted);

      return RouteSchema.parse(JSON.parse(response.content as string));
    },
    { ttl: 300000 } // 5分钟缓存
  );
}

创建一个使用Gemini Flash分析客户查询并返回带有置信度分数的结构化路由决策的路由器。

2. 实现专门的智能体

// lib/agents/specialized.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';

export class TechnicalSupportAgent {
  private model = new ChatGoogleGenerativeAI({
    model: 'gemini-2.5-pro',
    temperature: 0.3,
  });

  async handle(query: string) {
    const response = await this.model.invoke(
      `你是技术支持专家。帮助解决:${query}`
    );
    return { type: 'technical', response: response.content };
  }
}

export class BillingSupportAgent {
  private model = new ChatGoogleGenerativeAI({
    model: 'gemini-2.5-flash',
    temperature: 0,
  });

  async handle(query: string) {
    const response = await this.model.invoke(
      `你是账单专家。解决:${query}`
    );
    return { type: 'billing', response: response.content };
  }
}

export class GeneralSupportAgent {
  private model = new ChatGoogleGenerativeAI({
    model: 'gemini-2.5-flash',
    temperature: 0.5,
  });

  async handle(query: string) {
    const response = await this.model.invoke(
      `你是一个乐于助人的助手。回答:${query}`
    );
    return { type: 'general', response: response.content };
  }
}

每个专门的智能体使用不同的模型和温度,针对其特定领域进行了优化。

3. 创建API路由

// app/api/support/route.ts
export const runtime = 'nodejs';
export const maxDuration = 60;

import { CustomerSupportRouter } from '@/lib/agents/router';
import {
  TechnicalSupportAgent,
  BillingSupportAgent,
  GeneralSupportAgent
} from '@/lib/agents/specialized';
import { NextResponse } from 'next/server';

const router = new CustomerSupportRouter();
const agents = {
  technical: new TechnicalSupportAgent(),
  billing: new BillingSupportAgent(),
  general: new GeneralSupportAgent(),
};

export async function POST(req: Request) {
  try {
    const { query } = await req.json();

    // 路由查询
    const routing = await router.route(query);

    // 使用选定的智能体执行
    const agent = agents[routing.route];
    const result = await agent.handle(query);

    return NextResponse.json({
      ...result,
      routing: {
        selected: routing.route,
        confidence: routing.confidence,
        reasoning: routing.reasoning
      }
    });
  } catch (error) {
    console.error('路由错误:', error);
    return NextResponse.json(
      { error: '处理请求失败' },
      { status: 500 }
    );
  }
}

基于路由器决策将查询路由到专门智能体的API端点。

4. 使用TanStack Query的前端组件

// components/SupportChat.tsx
'use client';

import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';

export default function SupportChat() {
  const [message, setMessage] = useState('');

  const submitQuery = useMutation({
    mutationFn: async (query: string) => {
      const res = await fetch('/api/support', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ query }),
      });

      if (!res.ok) throw new Error('请求失败');
      return res.json();
    },
  });

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (message.trim()) {
      submitQuery.mutate(message);
      setMessage('');
    }
  };

  return (
    <div className="card w-full bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">客户支持</h2>

        <form onSubmit={handleSubmit}>
          <input
            type="text"
            className="input input-bordered w-full"
            placeholder="我们如何帮助您?"
            value={message}
            onChange={(e) => setMessage(e.target.value)}
            disabled={submitQuery.isPending}
          />

          <button
            type="submit"
            className="btn btn-primary mt-4"
            disabled={submitQuery.isPending || !message.trim()}
          >
            {submitQuery.isPending ? (
              <span className="loading loading-spinner"></span>
            ) : '发送'}
          </button>
        </form>

        {submitQuery.data && (
          <div className="alert mt-4">
            <div>
              <div className="badge badge-secondary">
                {submitQuery.data.routing.selected}
              </div>
              <p className="mt-2">{submitQuery.data.response}</p>
            </div>
          </div>
        )}
      </div>
    </div>
  );
}

使用TanStack Query显示路由决策和智能体响应的React组件。

高级示例:多阶段文档处理管道

1. 安装额外的依赖项

npm install @langchain/langgraph @upstash/redis pdf-parse

添加用于状态管理工作流的Langgraph和用于分布式状态管理的Upstash Redis。

2. 使用Langgraph定义路由状态机

// lib/workflows/document-router.ts
import { StateGraph, Annotation } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage } from '@langchain/core/messages';
import { groupBy, chunk } from 'es-toolkit';

const DocumentState = Annotation.Root({
  documentId: Annotation<string>(),
  content: Annotation<string>(),
  documentType: Annotation<string>(),
  confidence: Annotation<number>(),
  processingStage: Annotation<string>(),
  extractedData: Annotation<Record<string, any>>(),
  errors: Annotation<string[]>(),
});

export function createDocumentRoutingWorkflow() {
  const model = new ChatGoogleGenerativeAI({
    model: 'gemini-2.5-pro',
    temperature: 0,
  });

  const workflow = new StateGraph(DocumentState)
    // 分类节点
    .addNode('classify', async (state) => {
      const response = await model.invoke(
        `分类这个文档类型:${state.content.substring(0, 1000)}`
      );

      // 解析分类结果
      const type = extractDocumentType(response.content as string);
      const confidence = calculateConfidence(response.content as string);

      return {
        documentType: type,
        confidence: confidence,
        processingStage: 'classified',
      };
    })

    // 发票处理器
    .addNode('process_invoice', async (state) => {
      const invoiceData = await extractInvoiceData(state.content);
      return {
        extractedData: invoiceData,
        processingStage: 'completed',
      };
    })

    // 合同处理器
    .addNode('process_contract', async (state) => {
      const contractData = await extractContractData(state.content);
      return {
        extractedData: contractData,
        processingStage: 'completed',
      };
    })

    // 通用处理器
    .addNode('process_general', async (state) => {
      const generalData = await extractGeneralData(state.content);
      return {
        extractedData: generalData,
        processingStage: 'completed',
      };
    })

    // 人工审核节点
    .addNode('human_review', async (state) => {
      await notifyHumanReviewer(state);
      return {
        processingStage: 'pending_review',
      };
    });

  // 添加条件路由
  workflow.addConditionalEdges('classify', (state) => {
    if (state.confidence < 0.7) {
      return 'human_review';
    }

    switch (state.documentType) {
      case 'invoice':
        return 'process_invoice';
      case 'contract':
        return 'process_contract';
      default:
        return 'process_general';
    }
  });

  // 设置入口点
  workflow.setEntryPoint('classify');

  return workflow.compile();
}

Langgraph工作流,对文档进行分类并根据置信度将其路由到专门的处理器。

3. 实现带状态管理的流式API

// app/api/documents/process/route.ts
export const runtime = 'nodejs';
export const maxDuration = 300;

import { createDocumentRoutingWorkflow } from '@/lib/workflows/document-router';
import { Redis } from '@upstash/redis';

const redis = Redis.fromEnv();
const workflow = createDocumentRoutingWorkflow();

export async function POST(req: Request) {
  const { documentId, content } = await req.json();

  const encoder = new TextEncoder();
  const stream = new TransformStream();
  const writer = stream.writable.getWriter();

  // 后台处理
  (async () => {
    try {
      // 初始状态
      const initialState = {
        documentId,
        content,
        documentType: '',
        confidence: 0,
        processingStage: 'pending',
        extractedData: {},
        errors: [],
      };

      // 在Redis中存储初始状态
      await redis.set(
        `doc:${documentId}:state`,
        JSON.stringify(initialState),
        { ex: 3600 } // 1小时TTL
      );

      // 流式传输工作流事件
      const eventStream = await workflow.stream(initialState);

      for await (const event of eventStream) {
        const state = event[Object.keys(event)[0]];

        // 更新Redis状态
        await redis.set(
          `doc:${documentId}:state`,
          JSON.stringify(state),
          { ex: 3600 }
        );

        // 流式传输到客户端
        await writer.write(
          encoder.encode(`data: ${JSON.stringify({
            stage: state.processingStage,
            type: state.documentType,
            confidence: state.confidence,
            hasData: !!state.extractedData,
          })}\n\n`)
        );

        if (state.processingStage === 'completed' ||
            state.processingStage === 'pending_review') {
          break;
        }
      }

      await writer.write(
        encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)
      );
    } catch (error) {
      console.error('工作流错误:', error);
      await writer.write(
        encoder.encode(`data: ${JSON.stringify({
          error: '处理失败'
        })}\n\n`)
      );
    } finally {
      await writer.close();
    }
  })();

  return new Response(stream.readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}

通过路由工作流处理文档并将状态存储在Redis中的流式API。

4. 创建流式更新的Hook

// hooks/useDocumentProcessing.ts
import { useState, useCallback } from 'react';
import { useMutation } from '@tanstack/react-query';

interface ProcessingUpdate {
  stage?: string;
  type?: string;
  confidence?: number;
  done?: boolean;
  error?: string;
}

export function useDocumentProcessing() {
  const [updates, setUpdates] = useState<ProcessingUpdate[]>([]);
  const [isProcessing, setIsProcessing] = useState(false);

  const processDocument = useCallback(async (
    documentId: string,
    content: string
  ) => {
    setIsProcessing(true);
    setUpdates([]);

    try {
      const response = await fetch('/api/documents/process', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ documentId, content }),
      });

      if (!response.ok) throw new Error('处理失败');

      const reader = response.body?.getReader();
      const decoder = new TextDecoder();

      while (reader) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            try {
              const update = JSON.parse(line.slice(6));
              setUpdates(prev => [...prev, update]);

              if (update.done || update.error) {
                setIsProcessing(false);
                return update;
              }
            } catch {}
          }
        }
      }
    } catch (error) {
      setIsProcessing(false);
      throw error;
    }
  }, []);

  return {
    processDocument,
    updates,
    isProcessing,
    currentStage: updates[updates.length - 1]?.stage,
    documentType: updates[updates.length - 1]?.type,
    confidence: updates[updates.length - 1]?.confidence,
  };
}

管理文档处理状态和流式更新的自定义Hook。

5. 构建文档处理UI

// components/DocumentProcessor.tsx
'use client';

import { useDocumentProcessing } from '@/hooks/useDocumentProcessing';
import { useState } from 'react';

export default function DocumentProcessor() {
  const [file, setFile] = useState<File | null>(null);
  const {
    processDocument,
    updates,
    isProcessing,
    currentStage,
    documentType,
    confidence
  } = useDocumentProcessing();

  const handleFileSelect = (e: React.ChangeEvent<HTMLInputElement>) => {
    const selectedFile = e.target.files?.[0];
    if (selectedFile) {
      setFile(selectedFile);
    }
  };

  const handleProcess = async () => {
    if (!file) return;

    const content = await file.text();
    await processDocument(file.name, content);
  };

  return (
    <div className="card w-full bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">文档处理器</h2>

        <div className="form-control">
          <label className="label">
            <span className="label-text">选择文档</span>
          </label>
          <input
            type="file"
            className="file-input file-input-bordered"
            onChange={handleFileSelect}
            disabled={isProcessing}
            accept=".pdf,.txt,.docx"
          />
        </div>

        <button
          className="btn btn-primary"
          onClick={handleProcess}
          disabled={!file || isProcessing}
        >
          {isProcessing ? (
            <>
              <span className="loading loading-spinner"></span>
              处理中...
            </>
          ) : '处理文档'}
        </button>

        {updates.length > 0 && (
          <div className="mt-6">
            <h3 className="font-semibold mb-4">处理步骤</h3>

            <ul className="steps steps-vertical">
              {['classify', 'process', 'complete'].map((step) => (
                <li
                  key={step}
                  className={`step ${
                    updates.some(u => u.stage === step) ? 'step-primary' : ''
                  }`}
                >
                  <div className="text-left">
                    <div className="font-medium capitalize">{step}</div>
                    {step === 'classify' && documentType && (
                      <div className="text-sm opacity-70">
                        类型:{documentType} ({Math.round((confidence || 0) * 100)}%)
                      </div>
                    )}
                  </div>
                </li>
              ))}
            </ul>

            {currentStage === 'pending_review' && (
              <div className="alert alert-warning mt-4">
                <span>由于置信度低,文档已发送给人工审核</span>
              </div>
            )}

            {currentStage === 'completed' && (
              <div className="alert alert-success mt-4">
                <span>处理完成!文档类型:{documentType}</span>
              </div>
            )}
          </div>
        )}
      </div>
    </div>
  );
}

显示实时路由决策和处理进度的UI组件。

6. 添加嵌入式语义路由

// lib/routing/semantic-router.ts
import { GoogleGenerativeAIEmbeddings } from '@langchain/google-genai';
import { cosineSimilarity } from '@langchain/core/utils/math';
import { memoize } from 'es-toolkit';

interface Route {
  name: string;
  description: string;
  examples: string[];
  handler: string;
}

export class SemanticRouter {
  private embeddings: GoogleGenerativeAIEmbeddings;
  private routeEmbeddings: Map<string, number[]> = new Map();

  constructor(private routes: Route[]) {
    this.embeddings = new GoogleGenerativeAIEmbeddings({
      model: 'embedding-001',
    });
    this.initialize();
  }

  private async initialize() {
    // 为每个路由生成嵌入
    for (const route of this.routes) {
      const description = `${route.description} ${route.examples.join(' ')}`;
      const embedding = await this.embeddings.embedQuery(description);
      this.routeEmbeddings.set(route.name, embedding);
    }
  }

  // 为性能进行记忆化
  findBestRoute = memoize(
    async (query: string): Promise<{ route: Route; similarity: number }> => {
      const queryEmbedding = await this.embeddings.embedQuery(query);

      let bestRoute: Route | null = null;
      let bestSimilarity = -1;

      for (const route of this.routes) {
        const routeEmbedding = this.routeEmbeddings.get(route.name)!;
        const similarity = cosineSimilarity(
          [queryEmbedding],
          [routeEmbedding]
        )[0][0];

        if (similarity > bestSimilarity) {
          bestSimilarity = similarity;
          bestRoute = route;
        }
      }

      return {
        route: bestRoute!,
        similarity: bestSimilarity,
      };
    },
    { ttl: 60000 } // 1分钟缓存
  );
}

使用嵌入基于意义而不是关键词找到最相似路由的语义路由器。

7. 实现带学习的自适应路由

// lib/routing/adaptive-router.ts
import { Redis } from '@upstash/redis';
import { pick, omit } from 'es-toolkit';

interface RoutingDecision {
  query: string;
  selectedRoute: string;
  confidence: number;
  timestamp: number;
  outcome?: 'success' | 'failure';
}

export class AdaptiveRouter {
  private redis = Redis.fromEnv();

  async recordDecision(decision: RoutingDecision) {
    const key = `routing:history`;
    await this.redis.lpush(key, JSON.stringify(decision));
    await this.redis.ltrim(key, 0, 999); // 保留最后1000个决策
  }

  async updateOutcome(
    query: string,
    route: string,
    outcome: 'success' | 'failure'
  ) {
    // 查找并更新决策
    const history = await this.redis.lrange('routing:history', 0, 99);

    for (let i = 0; i < history.length; i++) {
      const decision = JSON.parse(history[i] as string) as RoutingDecision;

      if (decision.query === query && decision.selectedRoute === route) {
        decision.outcome = outcome;
        await this.redis.lset('routing:history', i, JSON.stringify(decision));
        break;
      }
    }

    // 更新路由统计
    const statKey = `routing:stats:${route}`;
    await this.redis.hincrby(statKey, outcome, 1);
  }

  async getRoutePerformance(route: string) {
    const stats = await this.redis.hgetall(`routing:stats:${route}`);
    const success = parseInt(stats.success || '0');
    const failure = parseInt(stats.failure || '0');
    const total = success + failure;

    return {
      successRate: total > 0 ? success / total : 0.5,
      totalRequests: total,
    };
  }

  async selectBestRoute(candidates: string[]): Promise<string> {
    const performances = await Promise.all(
      candidates.map(async (route) => ({
        route,
        performance: await this.getRoutePerformance(route),
      }))
    );

    // 选择成功率最高的路由(带探索)
    const explorationRate = 0.1;

    if (Math.random() < explorationRate) {
      // 探索:随机选择
      return candidates[Math.floor(Math.random() * candidates.length)];
    } else {
      // 利用:选择最佳性能
      return performances.reduce((best, current) =>
        current.performance.successRate > best.performance.successRate
          ? current
          : best
      ).route;
    }
  }
}

自适应路由器,从结果中学习并使用强化学习原理随时间改进路由决策。

结论

路由模式将静态、线性的AI工作流转变为动态、智能的系统,能够适应上下文并从结果中学习。通过在Next.js 15中使用Langchain和Langraph实现路由,你可以构建生产就绪的应用程序,这些应用程序能够高效地在专门的智能体之间分配工作,通过智能模型选择降低成本,并通过反馈循环持续改进。语义理解、状态管理和自适应学习的结合创建了随着时间推移变得更加有效的AI系统,同时保持了Vercel无服务器平台的可扩展性和开发者体验优势。