草案 "智能体设计模式 - 知识检索(RAG)"
本指南演示了如何使用 TypeScript、Next.js 15、LangChain 和 LangGraph 在 Vercel 平台上实现复杂的 RAG 系统。我们将从基础检索构建到具有自我纠正、智能路由查询和处理复杂多步推理的高级智能体 RAG 模式。
心智模型:RAG 作为智能研究助手
将 RAG 想象成一个不仅仅是获取文档的研究助手,而是理解上下文、评估源质量并综合信息的助手。传统的 RAG 就像图书馆目录系统 - 你查询,它检索。智能体 RAG 就像拥有一位博士生,知道何时搜索、搜索什么、交叉引用源、识别矛盾,甚至知道何时说"我需要在其他地方查找"。在无服务器环境中,这个助手在短时间内(777秒内)工作,但在交互之间保持对话状态。
基本示例:简单的基于向量的 RAG
1. 安装 RAG 依赖
npm install @langchain/pinecone @pinecone-database/pinecone
npm install @langchain/textsplitters
npm install es-toolkit es-toolkit/compat
安装用于向量存储的 Pinecone、用于嵌入的 Google 的 embedding-001、用于分块的文本分割器以及用于实用函数的 es-toolkit。
2. 初始化向量存储
// lib/vector-store.ts
import { Pinecone } from '@pinecone-database/pinecone';
import { PineconeStore } from '@langchain/pinecone';
import { GoogleGenerativeAIEmbeddings } from '@langchain/google-genai';
import { memoize } from 'es-toolkit/compat';
// 记忆化客户端创建以提高无服务器效率
const getPineconeClient = memoize(() =>
new Pinecone({
apiKey: process.env.PINECONE_API_KEY!,
})
);
export async function getVectorStore() {
const pinecone = getPineconeClient();
const index = pinecone.index(process.env.PINECONE_INDEX_NAME!);
const embeddings = new GoogleGenerativeAIEmbeddings({
modelName: "embedding-001",
taskType: "RETRIEVAL_DOCUMENT",
});
return PineconeStore.fromExistingIndex(embeddings, {
pineconeIndex: index,
maxConcurrency: 5, // 针对无服务器优化
});
}
创建一个记忆化的 Pinecone 客户端以避免在每次无服务器调用时重新初始化,使用 Google 的嵌入进行成本优化。
3. 使用分块的文档摄入
// lib/ingestion.ts
import { RecursiveCharacterTextSplitter } from '@langchain/textsplitters';
import { Document } from '@langchain/core/documents';
import { getVectorStore } from './vector-store';
import { chunk, map } from 'es-toolkit';
interface ChunkingConfig {
chunkSize: number;
chunkOverlap: number;
separators?: string[];
}
export async function ingestDocuments(
texts: string[],
metadata: Record<string, any>[] = [],
config: ChunkingConfig = {
chunkSize: 1500,
chunkOverlap: 200,
}
) {
const splitter = new RecursiveCharacterTextSplitter({
chunkSize: config.chunkSize,
chunkOverlap: config.chunkOverlap,
separators: config.separators || ['\n\n', '\n', '.', '!', '?'],
});
// 并行批处理文档
const documents = await Promise.all(
map(texts, async (text, index) => {
const chunks = await splitter.splitText(text);
return chunks.map((chunk, chunkIndex) =>
new Document({
pageContent: chunk,
metadata: {
...metadata[index],
chunkIndex,
originalIndex: index,
},
})
);
})
);
const flatDocs = documents.flat();
const vectorStore = await getVectorStore();
// 高效批量插入
const batches = chunk(flatDocs, 100);
for (const batch of batches) {
await vectorStore.addDocuments(batch);
}
return flatDocs.length;
}
实现智能文档分块,通过重叠保持上下文,使用优化的并行批处理处理文档,适应无服务器超时限制。
4. 基本 RAG 链
// lib/rag/basic-rag.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { PromptTemplate } from '@langchain/core/prompts';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { RunnablePassthrough, RunnableSequence } from '@langchain/core/runnables';
import { getVectorStore } from '../vector-store';
import { formatDocumentsAsString } from 'langchain/util/document';
export async function createBasicRAGChain() {
const vectorStore = await getVectorStore();
const retriever = vectorStore.asRetriever({
k: 4, // 检索前4个相关块
searchType: 'similarity',
});
const prompt = PromptTemplate.fromTemplate(`
仅根据以下上下文回答问题:
{context}
问题:{question}
简洁回答并引用上下文的相关部分。
`);
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
maxOutputTokens: 1024,
});
const chain = RunnableSequence.from([
{
context: async (input: { question: string }) => {
const docs = await retriever.invoke(input.question);
return formatDocumentsAsString(docs);
},
question: new RunnablePassthrough(),
},
prompt,
model,
new StringOutputParser(),
]);
return chain;
}
创建一个基本的 RAG 链,检索上下文,将其与问题格式化,并生成有根据的响应。
5. 基本 RAG 的 API 路由
// app/api/rag/basic/route.ts
import { createBasicRAGChain } from '@/lib/rag/basic-rag';
import { NextResponse } from 'next/server';
export const runtime = 'nodejs';
export const maxDuration = 60;
export async function POST(req: Request) {
try {
const { question } = await req.json();
const chain = await createBasicRAGChain();
const response = await chain.invoke({ question });
return NextResponse.json({ answer: response });
} catch (error) {
console.error('RAG 错误:', error);
return NextResponse.json(
{ error: '查询处理失败' },
{ status: 500 }
);
}
}
简单的 API 端点,接受问题并返回 RAG 增强的答案,基本查询有60秒超时。
高级示例:具有自我纠正功能的智能体 RAG
1. 使用 CRAG 模式的自纠正 RAG
// lib/rag/corrective-rag.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
import { Document } from '@langchain/core/documents';
import { getVectorStore } from '../vector-store';
import { WebBrowser } from '@langchain/community/tools/webbrowser';
import { GoogleGenerativeAIEmbeddings } from '@langchain/google-genai';
import { filter, map, some } from 'es-toolkit';
interface CRAGState {
question: string;
documents: Document[];
relevanceScores: number[];
finalAnswer: string;
needsWebSearch: boolean;
webResults: Document[];
}
export function createCorrectiveRAG() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-pro',
temperature: 0,
});
const relevanceModel = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0,
});
const workflow = new StateGraph<CRAGState>({
channels: {
question: null,
documents: null,
relevanceScores: null,
finalAnswer: null,
needsWebSearch: null,
webResults: null,
},
});
// 节点:检索文档
workflow.addNode('retrieve', async (state) => {
const vectorStore = await getVectorStore();
const retriever = vectorStore.asRetriever({ k: 5 });
const documents = await retriever.invoke(state.question);
return { documents };
});
// 节点:评估相关性
workflow.addNode('evaluate_relevance', async (state) => {
const relevancePrompt = `
为这个文档对问题的相关性打分(0-10):
问题:{question}
文档:{document}
仅返回0-10之间的数字。
`;
const relevanceScores = await Promise.all(
map(state.documents, async (doc) => {
const response = await relevanceModel.invoke([
new HumanMessage(
relevancePrompt
.replace('{question}', state.question)
.replace('{document}', doc.pageContent)
),
]);
return parseFloat(response.content as string) || 0;
})
);
// 检查是否需要网络搜索(所有分数低于7)
const needsWebSearch = !some(relevanceScores, score => score >= 7);
return { relevanceScores, needsWebSearch };
});
// 节点:网络搜索回退
workflow.addNode('web_search', async (state) => {
if (!state.needsWebSearch) {
return { webResults: [] };
}
const embeddings = new GoogleGenerativeAIEmbeddings({
modelName: "embedding-001",
});
const browser = new WebBrowser({ model, embeddings });
const searchResult = await browser.invoke(state.question);
// 将搜索结果解析为文档
const webResults = [
new Document({
pageContent: searchResult,
metadata: { source: 'web_search' },
}),
];
return { webResults };
});
// 节点:生成答案
workflow.addNode('generate', async (state) => {
// 过滤高相关性文档
const relevantDocs = filter(
state.documents,
(_, index) => state.relevanceScores[index] >= 7
);
// 如需要,与网络结果结合
const allDocs = [...relevantDocs, ...state.webResults];
const context = map(allDocs, doc => doc.pageContent).join('\n\n');
const response = await model.invoke([
new HumanMessage(`
使用提供的上下文回答这个问题:
上下文:
${context}
问题:${state.question}
提供包含引用的全面答案。
`),
]);
return { finalAnswer: response.content as string };
});
// 定义工作流边
workflow.setEntryPoint('retrieve');
workflow.addEdge('retrieve', 'evaluate_relevance');
workflow.addEdge('evaluate_relevance', 'web_search');
workflow.addEdge('web_search', 'generate');
workflow.addEdge('generate', END);
return workflow.compile();
}
实现 CRAG 模式,评估文档相关性,必要时回退到网络搜索,并从验证的来源生成答案。
2. 用于复杂问题的多查询 RAG
// lib/rag/multi-query-rag.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { getVectorStore } from '../vector-store';
import { uniqBy, flatten, take } from 'es-toolkit';
import { Document } from '@langchain/core/documents';
interface MultiQueryConfig {
numQueries: number;
maxDocsPerQuery: number;
temperature: number;
}
export class MultiQueryRAG {
private model: ChatGoogleGenerativeAI;
private queryGenerator: ChatGoogleGenerativeAI;
constructor() {
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-pro',
temperature: 0.3,
});
this.queryGenerator = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.7, // 较高温度以获得查询多样性
});
}
async generateQueries(
originalQuery: string,
config: MultiQueryConfig = {
numQueries: 3,
maxDocsPerQuery: 3,
temperature: 0.7,
}
): Promise<string[]> {
const prompt = `
生成${config.numQueries}个不同的搜索查询来查找以下信息:
"${originalQuery}"
制作具有以下特点的查询:
1. 使用不同的关键词和措辞
2. 关注问题的不同方面
3. 从具体到一般的范围
只返回查询,每行一个。
`;
const response = await this.queryGenerator.invoke([
new HumanMessage(prompt),
]);
const queries = (response.content as string)
.split('\n')
.filter(q => q.trim())
.slice(0, config.numQueries);
return [originalQuery, ...queries];
}
async retrieveWithMultiQuery(
query: string,
config?: MultiQueryConfig
): Promise<Document[]> {
const queries = await this.generateQueries(query, config);
const vectorStore = await getVectorStore();
// 并行检索每个查询
const allResults = await Promise.all(
queries.map(q =>
vectorStore.similaritySearch(q, config?.maxDocsPerQuery || 3)
)
);
// 按内容去重
const uniqueDocs = uniqBy(
flatten(allResults),
doc => doc.pageContent
);
// 返回顶部文档
return take(uniqueDocs, 10);
}
async answer(query: string): Promise<string> {
const documents = await this.retrieveWithMultiQuery(query);
const context = documents
.map((doc, idx) => `[${idx + 1}] ${doc.pageContent}`)
.join('\n\n');
const response = await this.model.invoke([
new HumanMessage(`
基于以下上下文回答:
${context}
问题:${query}
包含来源的参考编号 [1]、[2] 等。
`),
]);
return response.content as string;
}
}
生成多个查询变体以改善检索覆盖率,去重结果,并提供有参考的答案。
3. 自适应 RAG 路由器
// lib/rag/adaptive-rag.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage } from '@langchain/core/messages';
import { createBasicRAGChain } from './basic-rag';
import { MultiQueryRAG } from './multi-query-rag';
import { createCorrectiveRAG } from './corrective-rag';
interface AdaptiveRAGState {
query: string;
complexity: 'simple' | 'medium' | 'complex';
answer: string;
confidence: number;
}
export function createAdaptiveRAG() {
const classifier = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0,
});
const workflow = new StateGraph<AdaptiveRAGState>({
channels: {
query: null,
complexity: null,
answer: null,
confidence: null,
},
});
// 节点:分类查询复杂度
workflow.addNode('classify', async (state) => {
const prompt = `
分类此查询的复杂度:
"${state.query}"
Simple:事实性、单跳问题
Medium:需要综合的多方面问题
Complex:需要推理、验证或多个来源的问题
仅回复:simple、medium 或 complex
`;
const response = await classifier.invoke([
new HumanMessage(prompt),
]);
const complexity = (response.content as string).trim().toLowerCase() as
'simple' | 'medium' | 'complex';
return { complexity };
});
// 节点:简单 RAG
workflow.addNode('simple_rag', async (state) => {
if (state.complexity !== 'simple') return {};
const chain = await createBasicRAGChain();
const answer = await chain.invoke({ question: state.query });
return { answer, confidence: 0.9 };
});
// 节点:多查询 RAG
workflow.addNode('multi_query_rag', async (state) => {
if (state.complexity !== 'medium') return {};
const multiRAG = new MultiQueryRAG();
const answer = await multiRAG.answer(state.query);
return { answer, confidence: 0.8 };
});
// 节点:纠正性 RAG
workflow.addNode('corrective_rag', async (state) => {
if (state.complexity !== 'complex') return {};
const crag = createCorrectiveRAG();
const result = await crag.invoke({
question: state.query,
documents: [],
relevanceScores: [],
finalAnswer: '',
needsWebSearch: false,
webResults: [],
});
return {
answer: result.finalAnswer,
confidence: 0.7
};
});
// 基于复杂度的条件路由
workflow.setEntryPoint('classify');
workflow.addConditionalEdges('classify', (state) => {
switch (state.complexity) {
case 'simple':
return 'simple_rag';
case 'medium':
return 'multi_query_rag';
case 'complex':
return 'corrective_rag';
default:
return 'simple_rag';
}
});
workflow.addEdge('simple_rag', END);
workflow.addEdge('multi_query_rag', END);
workflow.addEdge('corrective_rag', END);
return workflow.compile();
}
基于复杂度分类将查询路由到适当的 RAG 策略,优化速度和准确性。
4. 带进度更新的流式 RAG API
// app/api/rag/adaptive/route.ts
import { createAdaptiveRAG } from '@/lib/rag/adaptive-rag';
export const runtime = 'nodejs';
export const maxDuration = 300;
export async function POST(req: Request) {
const { query } = await req.json();
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const workflow = createAdaptiveRAG();
(async () => {
try {
// 发送进度事件
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'status',
message: '分析查询复杂度...'
})}\n\n`)
);
const events = await workflow.stream({
query,
complexity: 'simple',
answer: '',
confidence: 0,
});
for await (const event of events) {
// 发送中间更新
if (event.complexity) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'complexity',
complexity: event.complexity,
message: `使用 ${event.complexity} RAG 策略`
})}\n\n`)
);
}
if (event.answer) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'answer',
content: event.answer,
confidence: event.confidence
})}\n\n`)
);
}
}
await writer.write(
encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
);
} catch (error) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'error',
error: String(error)
})}\n\n`)
);
} finally {
await writer.close();
}
})();
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
流式传输 RAG 执行进度,包括复杂度分析、策略选择和带置信度分数的最终答案。
5. 自适应 RAG 的 React 组件
// components/AdaptiveRAGInterface.tsx
'use client';
import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';
import { groupBy } from 'es-toolkit';
interface RAGEvent {
type: 'status' | 'complexity' | 'answer' | 'error' | 'done';
message?: string;
complexity?: string;
content?: string;
confidence?: number;
error?: string;
}
export default function AdaptiveRAGInterface() {
const [query, setQuery] = useState('');
const [events, setEvents] = useState<RAGEvent[]>([]);
const [answer, setAnswer] = useState('');
const ragMutation = useMutation({
mutationFn: async (userQuery: string) => {
setEvents([]);
setAnswer('');
const response = await fetch('/api/rag/adaptive', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query: userQuery }),
});
if (!response.ok) throw new Error('RAG 失败');
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const event = JSON.parse(line.slice(6)) as RAGEvent;
setEvents(prev => [...prev, event]);
if (event.type === 'answer') {
setAnswer(event.content || '');
}
} catch (e) {
// 忽略解析错误
}
}
}
}
},
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (query.trim()) {
ragMutation.mutate(query);
}
};
// 按类型分组事件以供显示
const eventGroups = groupBy(events, event => event.type);
return (
<div className="w-full max-w-4xl mx-auto">
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">自适应 RAG 系统</h2>
<form onSubmit={handleSubmit} className="space-y-4">
<div className="form-control">
<label className="label">
<span className="label-text">您的问题</span>
</label>
<textarea
className="textarea textarea-bordered h-24"
placeholder="提出问题..."
value={query}
onChange={(e) => setQuery(e.target.value)}
disabled={ragMutation.isPending}
/>
</div>
<button
type="submit"
className="btn btn-primary"
disabled={ragMutation.isPending || !query.trim()}
>
{ragMutation.isPending ? (
<>
<span className="loading loading-spinner"></span>
处理中...
</>
) : '获取答案'}
</button>
</form>
{/* 进度指示器 */}
{events.length > 0 && (
<div className="mt-6 space-y-4">
{eventGroups.complexity && (
<div className="alert alert-info">
<span>
查询复杂度:
<span className="badge badge-primary ml-2">
{eventGroups.complexity[0].complexity}
</span>
</span>
</div>
)}
{eventGroups.status && (
<div className="mockup-code">
{eventGroups.status.map((event, idx) => (
<pre key={idx} data-prefix={`${idx + 1}`}>
<code>{event.message}</code>
</pre>
))}
</div>
)}
</div>
)}
{/* 答案显示 */}
{answer && (
<div className="mt-6">
<div className="divider">答案</div>
<div className="prose max-w-none">
<div className="p-4 bg-base-200 rounded-lg">
{answer}
</div>
{events.find(e => e.confidence) && (
<div className="mt-2">
<progress
className="progress progress-success w-full"
value={events.find(e => e.confidence)?.confidence || 0}
max="1"
/>
<p className="text-sm text-center mt-1">
置信度:{((events.find(e => e.confidence)?.confidence || 0) * 100).toFixed(0)}%
</p>
</div>
)}
</div>
</div>
)}
{ragMutation.isError && (
<div className="alert alert-error mt-4">
<span>错误:{ragMutation.error?.message}</span>
</div>
)}
</div>
</div>
</div>
);
}
显示 RAG 执行进度、复杂度分类和带置信度分数答案的交互式 UI 组件。
结论
这个实现展示了从基本 RAG 到复杂智能体模式的演变,这些模式能够智能路由查询、使用网络搜索回退进行自我纠正,并根据复杂度调整策略。Vercel 上的无服务器架构确保了成本效益的扩展,而 LangGraph 的状态机在777秒限制内实现了复杂的工作流。关键模式包括用于自我纠正的 CRAG、用于全面检索的多查询以及用于最佳性能的自适应路由。在整个过程中使用 es-toolkit 确保了干净、函数式的代码模式,而流式响应即使对于复杂查询也能提供出色的用户体验。