草案 智能体设计模式 - 路由
构建智能路由系统,动态地将查询导向专门的智能体,超越线性工作流程,创建自适应、上下文感知的AI应用程序。
心智模型:AI智能体的交通控制器
将路由模式想象成繁忙十字路口的智能交通控制器。就像交通控制器分析进入的车辆(大小、目的地、紧急程度)并将其引导到最优车道一样,路由智能体检查传入的请求(意图、复杂性、上下文)并将它们导向最合适的处理智能体。在Next.js 15中,这意味着你的API路由充当智能调度器,使用Langchain/Langraph来评估请求并将其路由到专门的处理程序——就像Vercel的Edge中间件基于头部路由请求,但使用AI驱动的决策而不是静态规则。
基础示例:基于意图的客户支持路由器
1. 创建路由器智能体
// lib/agents/router.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { PromptTemplate } from '@langchain/core/prompts';
import { z } from 'zod';
import { memoize } from 'es-toolkit';
const RouteSchema = z.object({
route: z.enum(['technical', 'billing', 'general']),
confidence: z.number(),
reasoning: z.string()
});
export class CustomerSupportRouter {
private model: ChatGoogleGenerativeAI;
constructor() {
this.model = new ChatGoogleGenerativeAI({
model: 'gemini-2.5-flash',
temperature: 0,
});
}
// 对5分钟内相同查询进行记忆化
route = memoize(
async (query: string) => {
const prompt = PromptTemplate.fromTemplate(`
分析这个客户查询并将其路由到适当的部门。
查询:{query}
可用路由:
- technical:产品问题、错误、技术问题
- billing:付款、订阅、退款问题
- general:所有其他查询
以JSON响应:{{ "route": "...", "confidence": 0.0-1.0, "reasoning": "..." }}
`);
const formatted = await prompt.format({ query });
const response = await this.model.invoke(formatted);
return RouteSchema.parse(JSON.parse(response.content as string));
},
{ ttl: 300000 } // 5分钟缓存
);
}
创建一个使用Gemini Flash分析客户查询并返回带有置信度分数的结构化路由决策的路由器。
2. 实现专门的智能体
// lib/agents/specialized.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
export class TechnicalSupportAgent {
private model = new ChatGoogleGenerativeAI({
model: 'gemini-2.5-pro',
temperature: 0.3,
});
async handle(query: string) {
const response = await this.model.invoke(
`你是技术支持专家。帮助解决:${query}`
);
return { type: 'technical', response: response.content };
}
}
export class BillingSupportAgent {
private model = new ChatGoogleGenerativeAI({
model: 'gemini-2.5-flash',
temperature: 0,
});
async handle(query: string) {
const response = await this.model.invoke(
`你是账单专家。解决:${query}`
);
return { type: 'billing', response: response.content };
}
}
export class GeneralSupportAgent {
private model = new ChatGoogleGenerativeAI({
model: 'gemini-2.5-flash',
temperature: 0.5,
});
async handle(query: string) {
const response = await this.model.invoke(
`你是一个乐于助人的助手。回答:${query}`
);
return { type: 'general', response: response.content };
}
}
每个专门的智能体使用不同的模型和温度,针对其特定领域进行了优化。
3. 创建API路由
// app/api/support/route.ts
export const runtime = 'nodejs';
export const maxDuration = 60;
import { CustomerSupportRouter } from '@/lib/agents/router';
import {
TechnicalSupportAgent,
BillingSupportAgent,
GeneralSupportAgent
} from '@/lib/agents/specialized';
import { NextResponse } from 'next/server';
const router = new CustomerSupportRouter();
const agents = {
technical: new TechnicalSupportAgent(),
billing: new BillingSupportAgent(),
general: new GeneralSupportAgent(),
};
export async function POST(req: Request) {
try {
const { query } = await req.json();
// 路由查询
const routing = await router.route(query);
// 使用选定的智能体执行
const agent = agents[routing.route];
const result = await agent.handle(query);
return NextResponse.json({
...result,
routing: {
selected: routing.route,
confidence: routing.confidence,
reasoning: routing.reasoning
}
});
} catch (error) {
console.error('路由错误:', error);
return NextResponse.json(
{ error: '处理请求失败' },
{ status: 500 }
);
}
}
基于路由器决策将查询路由到专门智能体的API端点。
4. 使用TanStack Query的前端组件
// components/SupportChat.tsx
'use client';
import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';
export default function SupportChat() {
const [message, setMessage] = useState('');
const submitQuery = useMutation({
mutationFn: async (query: string) => {
const res = await fetch('/api/support', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query }),
});
if (!res.ok) throw new Error('请求失败');
return res.json();
},
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (message.trim()) {
submitQuery.mutate(message);
setMessage('');
}
};
return (
<div className="card w-full bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">客户支持</h2>
<form onSubmit={handleSubmit}>
<input
type="text"
className="input input-bordered w-full"
placeholder="我们如何帮助您?"
value={message}
onChange={(e) => setMessage(e.target.value)}
disabled={submitQuery.isPending}
/>
<button
type="submit"
className="btn btn-primary mt-4"
disabled={submitQuery.isPending || !message.trim()}
>
{submitQuery.isPending ? (
<span className="loading loading-spinner"></span>
) : '发送'}
</button>
</form>
{submitQuery.data && (
<div className="alert mt-4">
<div>
<div className="badge badge-secondary">
{submitQuery.data.routing.selected}
</div>
<p className="mt-2">{submitQuery.data.response}</p>
</div>
</div>
)}
</div>
</div>
);
}
使用TanStack Query显示路由决策和智能体响应的React组件。
高级示例:多阶段文档处理管道
1. 安装额外的依赖项
npm install @langchain/langgraph @upstash/redis pdf-parse
添加用于状态管理工作流的Langgraph和用于分布式状态管理的Upstash Redis。
2. 使用Langgraph定义路由状态机
// lib/workflows/document-router.ts
import { StateGraph, Annotation } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage } from '@langchain/core/messages';
import { groupBy, chunk } from 'es-toolkit';
const DocumentState = Annotation.Root({
documentId: Annotation<string>(),
content: Annotation<string>(),
documentType: Annotation<string>(),
confidence: Annotation<number>(),
processingStage: Annotation<string>(),
extractedData: Annotation<Record<string, any>>(),
errors: Annotation<string[]>(),
});
export function createDocumentRoutingWorkflow() {
const model = new ChatGoogleGenerativeAI({
model: 'gemini-2.5-pro',
temperature: 0,
});
const workflow = new StateGraph(DocumentState)
// 分类节点
.addNode('classify', async (state) => {
const response = await model.invoke(
`分类这个文档类型:${state.content.substring(0, 1000)}`
);
// 解析分类结果
const type = extractDocumentType(response.content as string);
const confidence = calculateConfidence(response.content as string);
return {
documentType: type,
confidence: confidence,
processingStage: 'classified',
};
})
// 发票处理器
.addNode('process_invoice', async (state) => {
const invoiceData = await extractInvoiceData(state.content);
return {
extractedData: invoiceData,
processingStage: 'completed',
};
})
// 合同处理器
.addNode('process_contract', async (state) => {
const contractData = await extractContractData(state.content);
return {
extractedData: contractData,
processingStage: 'completed',
};
})
// 通用处理器
.addNode('process_general', async (state) => {
const generalData = await extractGeneralData(state.content);
return {
extractedData: generalData,
processingStage: 'completed',
};
})
// 人工审核节点
.addNode('human_review', async (state) => {
await notifyHumanReviewer(state);
return {
processingStage: 'pending_review',
};
});
// 添加条件路由
workflow.addConditionalEdges('classify', (state) => {
if (state.confidence < 0.7) {
return 'human_review';
}
switch (state.documentType) {
case 'invoice':
return 'process_invoice';
case 'contract':
return 'process_contract';
default:
return 'process_general';
}
});
// 设置入口点
workflow.setEntryPoint('classify');
return workflow.compile();
}
Langgraph工作流,对文档进行分类并根据置信度将其路由到专门的处理器。
3. 实现带状态管理的流式API
// app/api/documents/process/route.ts
export const runtime = 'nodejs';
export const maxDuration = 300;
import { createDocumentRoutingWorkflow } from '@/lib/workflows/document-router';
import { Redis } from '@upstash/redis';
const redis = Redis.fromEnv();
const workflow = createDocumentRoutingWorkflow();
export async function POST(req: Request) {
const { documentId, content } = await req.json();
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
// 后台处理
(async () => {
try {
// 初始状态
const initialState = {
documentId,
content,
documentType: '',
confidence: 0,
processingStage: 'pending',
extractedData: {},
errors: [],
};
// 在Redis中存储初始状态
await redis.set(
`doc:${documentId}:state`,
JSON.stringify(initialState),
{ ex: 3600 } // 1小时TTL
);
// 流式传输工作流事件
const eventStream = await workflow.stream(initialState);
for await (const event of eventStream) {
const state = event[Object.keys(event)[0]];
// 更新Redis状态
await redis.set(
`doc:${documentId}:state`,
JSON.stringify(state),
{ ex: 3600 }
);
// 流式传输到客户端
await writer.write(
encoder.encode(`data: ${JSON.stringify({
stage: state.processingStage,
type: state.documentType,
confidence: state.confidence,
hasData: !!state.extractedData,
})}\n\n`)
);
if (state.processingStage === 'completed' ||
state.processingStage === 'pending_review') {
break;
}
}
await writer.write(
encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)
);
} catch (error) {
console.error('工作流错误:', error);
await writer.write(
encoder.encode(`data: ${JSON.stringify({
error: '处理失败'
})}\n\n`)
);
} finally {
await writer.close();
}
})();
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
通过路由工作流处理文档并将状态存储在Redis中的流式API。
4. 创建流式更新的Hook
// hooks/useDocumentProcessing.ts
import { useState, useCallback } from 'react';
import { useMutation } from '@tanstack/react-query';
interface ProcessingUpdate {
stage?: string;
type?: string;
confidence?: number;
done?: boolean;
error?: string;
}
export function useDocumentProcessing() {
const [updates, setUpdates] = useState<ProcessingUpdate[]>([]);
const [isProcessing, setIsProcessing] = useState(false);
const processDocument = useCallback(async (
documentId: string,
content: string
) => {
setIsProcessing(true);
setUpdates([]);
try {
const response = await fetch('/api/documents/process', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ documentId, content }),
});
if (!response.ok) throw new Error('处理失败');
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const update = JSON.parse(line.slice(6));
setUpdates(prev => [...prev, update]);
if (update.done || update.error) {
setIsProcessing(false);
return update;
}
} catch {}
}
}
}
} catch (error) {
setIsProcessing(false);
throw error;
}
}, []);
return {
processDocument,
updates,
isProcessing,
currentStage: updates[updates.length - 1]?.stage,
documentType: updates[updates.length - 1]?.type,
confidence: updates[updates.length - 1]?.confidence,
};
}
管理文档处理状态和流式更新的自定义Hook。
5. 构建文档处理UI
// components/DocumentProcessor.tsx
'use client';
import { useDocumentProcessing } from '@/hooks/useDocumentProcessing';
import { useState } from 'react';
export default function DocumentProcessor() {
const [file, setFile] = useState<File | null>(null);
const {
processDocument,
updates,
isProcessing,
currentStage,
documentType,
confidence
} = useDocumentProcessing();
const handleFileSelect = (e: React.ChangeEvent<HTMLInputElement>) => {
const selectedFile = e.target.files?.[0];
if (selectedFile) {
setFile(selectedFile);
}
};
const handleProcess = async () => {
if (!file) return;
const content = await file.text();
await processDocument(file.name, content);
};
return (
<div className="card w-full bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">文档处理器</h2>
<div className="form-control">
<label className="label">
<span className="label-text">选择文档</span>
</label>
<input
type="file"
className="file-input file-input-bordered"
onChange={handleFileSelect}
disabled={isProcessing}
accept=".pdf,.txt,.docx"
/>
</div>
<button
className="btn btn-primary"
onClick={handleProcess}
disabled={!file || isProcessing}
>
{isProcessing ? (
<>
<span className="loading loading-spinner"></span>
处理中...
</>
) : '处理文档'}
</button>
{updates.length > 0 && (
<div className="mt-6">
<h3 className="font-semibold mb-4">处理步骤</h3>
<ul className="steps steps-vertical">
{['classify', 'process', 'complete'].map((step) => (
<li
key={step}
className={`step ${
updates.some(u => u.stage === step) ? 'step-primary' : ''
}`}
>
<div className="text-left">
<div className="font-medium capitalize">{step}</div>
{step === 'classify' && documentType && (
<div className="text-sm opacity-70">
类型:{documentType} ({Math.round((confidence || 0) * 100)}%)
</div>
)}
</div>
</li>
))}
</ul>
{currentStage === 'pending_review' && (
<div className="alert alert-warning mt-4">
<span>由于置信度低,文档已发送给人工审核</span>
</div>
)}
{currentStage === 'completed' && (
<div className="alert alert-success mt-4">
<span>处理完成!文档类型:{documentType}</span>
</div>
)}
</div>
)}
</div>
</div>
);
}
显示实时路由决策和处理进度的UI组件。
6. 添加嵌入式语义路由
// lib/routing/semantic-router.ts
import { GoogleGenerativeAIEmbeddings } from '@langchain/google-genai';
import { cosineSimilarity } from '@langchain/core/utils/math';
import { memoize } from 'es-toolkit';
interface Route {
name: string;
description: string;
examples: string[];
handler: string;
}
export class SemanticRouter {
private embeddings: GoogleGenerativeAIEmbeddings;
private routeEmbeddings: Map<string, number[]> = new Map();
constructor(private routes: Route[]) {
this.embeddings = new GoogleGenerativeAIEmbeddings({
model: 'embedding-001',
});
this.initialize();
}
private async initialize() {
// 为每个路由生成嵌入
for (const route of this.routes) {
const description = `${route.description} ${route.examples.join(' ')}`;
const embedding = await this.embeddings.embedQuery(description);
this.routeEmbeddings.set(route.name, embedding);
}
}
// 为性能进行记忆化
findBestRoute = memoize(
async (query: string): Promise<{ route: Route; similarity: number }> => {
const queryEmbedding = await this.embeddings.embedQuery(query);
let bestRoute: Route | null = null;
let bestSimilarity = -1;
for (const route of this.routes) {
const routeEmbedding = this.routeEmbeddings.get(route.name)!;
const similarity = cosineSimilarity(
[queryEmbedding],
[routeEmbedding]
)[0][0];
if (similarity > bestSimilarity) {
bestSimilarity = similarity;
bestRoute = route;
}
}
return {
route: bestRoute!,
similarity: bestSimilarity,
};
},
{ ttl: 60000 } // 1分钟缓存
);
}
使用嵌入基于意义而不是关键词找到最相似路由的语义路由器。
7. 实现带学习的自适应路由
// lib/routing/adaptive-router.ts
import { Redis } from '@upstash/redis';
import { pick, omit } from 'es-toolkit';
interface RoutingDecision {
query: string;
selectedRoute: string;
confidence: number;
timestamp: number;
outcome?: 'success' | 'failure';
}
export class AdaptiveRouter {
private redis = Redis.fromEnv();
async recordDecision(decision: RoutingDecision) {
const key = `routing:history`;
await this.redis.lpush(key, JSON.stringify(decision));
await this.redis.ltrim(key, 0, 999); // 保留最后1000个决策
}
async updateOutcome(
query: string,
route: string,
outcome: 'success' | 'failure'
) {
// 查找并更新决策
const history = await this.redis.lrange('routing:history', 0, 99);
for (let i = 0; i < history.length; i++) {
const decision = JSON.parse(history[i] as string) as RoutingDecision;
if (decision.query === query && decision.selectedRoute === route) {
decision.outcome = outcome;
await this.redis.lset('routing:history', i, JSON.stringify(decision));
break;
}
}
// 更新路由统计
const statKey = `routing:stats:${route}`;
await this.redis.hincrby(statKey, outcome, 1);
}
async getRoutePerformance(route: string) {
const stats = await this.redis.hgetall(`routing:stats:${route}`);
const success = parseInt(stats.success || '0');
const failure = parseInt(stats.failure || '0');
const total = success + failure;
return {
successRate: total > 0 ? success / total : 0.5,
totalRequests: total,
};
}
async selectBestRoute(candidates: string[]): Promise<string> {
const performances = await Promise.all(
candidates.map(async (route) => ({
route,
performance: await this.getRoutePerformance(route),
}))
);
// 选择成功率最高的路由(带探索)
const explorationRate = 0.1;
if (Math.random() < explorationRate) {
// 探索:随机选择
return candidates[Math.floor(Math.random() * candidates.length)];
} else {
// 利用:选择最佳性能
return performances.reduce((best, current) =>
current.performance.successRate > best.performance.successRate
? current
: best
).route;
}
}
}
自适应路由器,从结果中学习并使用强化学习原理随时间改进路由决策。
结论
路由模式将静态、线性的AI工作流转变为动态、智能的系统,能够适应上下文并从结果中学习。通过在Next.js 15中使用Langchain和Langraph实现路由,你可以构建生产就绪的应用程序,这些应用程序能够高效地在专门的智能体之间分配工作,通过智能模型选择降低成本,并通过反馈循环持续改进。语义理解、状态管理和自适应学习的结合创建了随着时间推移变得更加有效的AI系统,同时保持了Vercel无服务器平台的可扩展性和开发者体验优势。