草案 "智能体设计模式 - 知识检索(RAG)"

ragailangchainlanggraphtypescriptnextjsvercel
By sko X opus 4.19/21/202514 min read

本指南演示了如何使用 TypeScript、Next.js 15、LangChain 和 LangGraph 在 Vercel 平台上实现复杂的 RAG 系统。我们将从基础检索构建到具有自我纠正、智能路由查询和处理复杂多步推理的高级智能体 RAG 模式。

心智模型:RAG 作为智能研究助手

将 RAG 想象成一个不仅仅是获取文档的研究助手,而是理解上下文、评估源质量并综合信息的助手。传统的 RAG 就像图书馆目录系统 - 你查询,它检索。智能体 RAG 就像拥有一位博士生,知道何时搜索、搜索什么、交叉引用源、识别矛盾,甚至知道何时说"我需要在其他地方查找"。在无服务器环境中,这个助手在短时间内(777秒内)工作,但在交互之间保持对话状态。

基本示例:简单的基于向量的 RAG

1. 安装 RAG 依赖

npm install @langchain/pinecone @pinecone-database/pinecone
npm install @langchain/textsplitters
npm install es-toolkit es-toolkit/compat

安装用于向量存储的 Pinecone、用于嵌入的 Google 的 embedding-001、用于分块的文本分割器以及用于实用函数的 es-toolkit。

2. 初始化向量存储

// lib/vector-store.ts
import { Pinecone } from '@pinecone-database/pinecone';
import { PineconeStore } from '@langchain/pinecone';
import { GoogleGenerativeAIEmbeddings } from '@langchain/google-genai';
import { memoize } from 'es-toolkit/compat';

// 记忆化客户端创建以提高无服务器效率
const getPineconeClient = memoize(() =>
  new Pinecone({
    apiKey: process.env.PINECONE_API_KEY!,
  })
);

export async function getVectorStore() {
  const pinecone = getPineconeClient();
  const index = pinecone.index(process.env.PINECONE_INDEX_NAME!);

  const embeddings = new GoogleGenerativeAIEmbeddings({
    modelName: "embedding-001",
    taskType: "RETRIEVAL_DOCUMENT",
  });

  return PineconeStore.fromExistingIndex(embeddings, {
    pineconeIndex: index,
    maxConcurrency: 5, // 针对无服务器优化
  });
}

创建一个记忆化的 Pinecone 客户端以避免在每次无服务器调用时重新初始化,使用 Google 的嵌入进行成本优化。

3. 使用分块的文档摄入

// lib/ingestion.ts
import { RecursiveCharacterTextSplitter } from '@langchain/textsplitters';
import { Document } from '@langchain/core/documents';
import { getVectorStore } from './vector-store';
import { chunk, map } from 'es-toolkit';

interface ChunkingConfig {
  chunkSize: number;
  chunkOverlap: number;
  separators?: string[];
}

export async function ingestDocuments(
  texts: string[],
  metadata: Record<string, any>[] = [],
  config: ChunkingConfig = {
    chunkSize: 1500,
    chunkOverlap: 200,
  }
) {
  const splitter = new RecursiveCharacterTextSplitter({
    chunkSize: config.chunkSize,
    chunkOverlap: config.chunkOverlap,
    separators: config.separators || ['\n\n', '\n', '.', '!', '?'],
  });

  // 并行批处理文档
  const documents = await Promise.all(
    map(texts, async (text, index) => {
      const chunks = await splitter.splitText(text);
      return chunks.map((chunk, chunkIndex) =>
        new Document({
          pageContent: chunk,
          metadata: {
            ...metadata[index],
            chunkIndex,
            originalIndex: index,
          },
        })
      );
    })
  );

  const flatDocs = documents.flat();
  const vectorStore = await getVectorStore();

  // 高效批量插入
  const batches = chunk(flatDocs, 100);
  for (const batch of batches) {
    await vectorStore.addDocuments(batch);
  }

  return flatDocs.length;
}

实现智能文档分块,通过重叠保持上下文,使用优化的并行批处理处理文档,适应无服务器超时限制。

4. 基本 RAG 链

// lib/rag/basic-rag.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { PromptTemplate } from '@langchain/core/prompts';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { RunnablePassthrough, RunnableSequence } from '@langchain/core/runnables';
import { getVectorStore } from '../vector-store';
import { formatDocumentsAsString } from 'langchain/util/document';

export async function createBasicRAGChain() {
  const vectorStore = await getVectorStore();
  const retriever = vectorStore.asRetriever({
    k: 4, // 检索前4个相关块
    searchType: 'similarity',
  });

  const prompt = PromptTemplate.fromTemplate(`
    仅根据以下上下文回答问题:
    {context}

    问题:{question}

    简洁回答并引用上下文的相关部分。
  `);

  const model = new ChatGoogleGenerativeAI({
    modelName: 'gemini-2.5-flash',
    temperature: 0.3,
    maxOutputTokens: 1024,
  });

  const chain = RunnableSequence.from([
    {
      context: async (input: { question: string }) => {
        const docs = await retriever.invoke(input.question);
        return formatDocumentsAsString(docs);
      },
      question: new RunnablePassthrough(),
    },
    prompt,
    model,
    new StringOutputParser(),
  ]);

  return chain;
}

创建一个基本的 RAG 链,检索上下文,将其与问题格式化,并生成有根据的响应。

5. 基本 RAG 的 API 路由

// app/api/rag/basic/route.ts
import { createBasicRAGChain } from '@/lib/rag/basic-rag';
import { NextResponse } from 'next/server';

export const runtime = 'nodejs';
export const maxDuration = 60;

export async function POST(req: Request) {
  try {
    const { question } = await req.json();

    const chain = await createBasicRAGChain();
    const response = await chain.invoke({ question });

    return NextResponse.json({ answer: response });
  } catch (error) {
    console.error('RAG 错误:', error);
    return NextResponse.json(
      { error: '查询处理失败' },
      { status: 500 }
    );
  }
}

简单的 API 端点,接受问题并返回 RAG 增强的答案,基本查询有60秒超时。

高级示例:具有自我纠正功能的智能体 RAG

1. 使用 CRAG 模式的自纠正 RAG

// lib/rag/corrective-rag.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
import { Document } from '@langchain/core/documents';
import { getVectorStore } from '../vector-store';
import { WebBrowser } from '@langchain/community/tools/webbrowser';
import { GoogleGenerativeAIEmbeddings } from '@langchain/google-genai';
import { filter, map, some } from 'es-toolkit';

interface CRAGState {
  question: string;
  documents: Document[];
  relevanceScores: number[];
  finalAnswer: string;
  needsWebSearch: boolean;
  webResults: Document[];
}

export function createCorrectiveRAG() {
  const model = new ChatGoogleGenerativeAI({
    modelName: 'gemini-2.5-pro',
    temperature: 0,
  });

  const relevanceModel = new ChatGoogleGenerativeAI({
    modelName: 'gemini-2.5-flash',
    temperature: 0,
  });

  const workflow = new StateGraph<CRAGState>({
    channels: {
      question: null,
      documents: null,
      relevanceScores: null,
      finalAnswer: null,
      needsWebSearch: null,
      webResults: null,
    },
  });

  // 节点:检索文档
  workflow.addNode('retrieve', async (state) => {
    const vectorStore = await getVectorStore();
    const retriever = vectorStore.asRetriever({ k: 5 });
    const documents = await retriever.invoke(state.question);

    return { documents };
  });

  // 节点:评估相关性
  workflow.addNode('evaluate_relevance', async (state) => {
    const relevancePrompt = `
      为这个文档对问题的相关性打分(0-10):
      问题:{question}
      文档:{document}

      仅返回0-10之间的数字。
    `;

    const relevanceScores = await Promise.all(
      map(state.documents, async (doc) => {
        const response = await relevanceModel.invoke([
          new HumanMessage(
            relevancePrompt
              .replace('{question}', state.question)
              .replace('{document}', doc.pageContent)
          ),
        ]);
        return parseFloat(response.content as string) || 0;
      })
    );

    // 检查是否需要网络搜索(所有分数低于7)
    const needsWebSearch = !some(relevanceScores, score => score >= 7);

    return { relevanceScores, needsWebSearch };
  });

  // 节点:网络搜索回退
  workflow.addNode('web_search', async (state) => {
    if (!state.needsWebSearch) {
      return { webResults: [] };
    }

    const embeddings = new GoogleGenerativeAIEmbeddings({
      modelName: "embedding-001",
    });

    const browser = new WebBrowser({ model, embeddings });
    const searchResult = await browser.invoke(state.question);

    // 将搜索结果解析为文档
    const webResults = [
      new Document({
        pageContent: searchResult,
        metadata: { source: 'web_search' },
      }),
    ];

    return { webResults };
  });

  // 节点:生成答案
  workflow.addNode('generate', async (state) => {
    // 过滤高相关性文档
    const relevantDocs = filter(
      state.documents,
      (_, index) => state.relevanceScores[index] >= 7
    );

    // 如需要,与网络结果结合
    const allDocs = [...relevantDocs, ...state.webResults];

    const context = map(allDocs, doc => doc.pageContent).join('\n\n');

    const response = await model.invoke([
      new HumanMessage(`
        使用提供的上下文回答这个问题:

        上下文:
        ${context}

        问题:${state.question}

        提供包含引用的全面答案。
      `),
    ]);

    return { finalAnswer: response.content as string };
  });

  // 定义工作流边
  workflow.setEntryPoint('retrieve');
  workflow.addEdge('retrieve', 'evaluate_relevance');
  workflow.addEdge('evaluate_relevance', 'web_search');
  workflow.addEdge('web_search', 'generate');
  workflow.addEdge('generate', END);

  return workflow.compile();
}

实现 CRAG 模式,评估文档相关性,必要时回退到网络搜索,并从验证的来源生成答案。

2. 用于复杂问题的多查询 RAG

// lib/rag/multi-query-rag.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { getVectorStore } from '../vector-store';
import { uniqBy, flatten, take } from 'es-toolkit';
import { Document } from '@langchain/core/documents';

interface MultiQueryConfig {
  numQueries: number;
  maxDocsPerQuery: number;
  temperature: number;
}

export class MultiQueryRAG {
  private model: ChatGoogleGenerativeAI;
  private queryGenerator: ChatGoogleGenerativeAI;

  constructor() {
    this.model = new ChatGoogleGenerativeAI({
      modelName: 'gemini-2.5-pro',
      temperature: 0.3,
    });

    this.queryGenerator = new ChatGoogleGenerativeAI({
      modelName: 'gemini-2.5-flash',
      temperature: 0.7, // 较高温度以获得查询多样性
    });
  }

  async generateQueries(
    originalQuery: string,
    config: MultiQueryConfig = {
      numQueries: 3,
      maxDocsPerQuery: 3,
      temperature: 0.7,
    }
  ): Promise<string[]> {
    const prompt = `
      生成${config.numQueries}个不同的搜索查询来查找以下信息:
      "${originalQuery}"

      制作具有以下特点的查询:
      1. 使用不同的关键词和措辞
      2. 关注问题的不同方面
      3. 从具体到一般的范围

      只返回查询,每行一个。
    `;

    const response = await this.queryGenerator.invoke([
      new HumanMessage(prompt),
    ]);

    const queries = (response.content as string)
      .split('\n')
      .filter(q => q.trim())
      .slice(0, config.numQueries);

    return [originalQuery, ...queries];
  }

  async retrieveWithMultiQuery(
    query: string,
    config?: MultiQueryConfig
  ): Promise<Document[]> {
    const queries = await this.generateQueries(query, config);
    const vectorStore = await getVectorStore();

    // 并行检索每个查询
    const allResults = await Promise.all(
      queries.map(q =>
        vectorStore.similaritySearch(q, config?.maxDocsPerQuery || 3)
      )
    );

    // 按内容去重
    const uniqueDocs = uniqBy(
      flatten(allResults),
      doc => doc.pageContent
    );

    // 返回顶部文档
    return take(uniqueDocs, 10);
  }

  async answer(query: string): Promise<string> {
    const documents = await this.retrieveWithMultiQuery(query);

    const context = documents
      .map((doc, idx) => `[${idx + 1}] ${doc.pageContent}`)
      .join('\n\n');

    const response = await this.model.invoke([
      new HumanMessage(`
        基于以下上下文回答:

        ${context}

        问题:${query}

        包含来源的参考编号 [1]、[2] 等。
      `),
    ]);

    return response.content as string;
  }
}

生成多个查询变体以改善检索覆盖率,去重结果,并提供有参考的答案。

3. 自适应 RAG 路由器

// lib/rag/adaptive-rag.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage } from '@langchain/core/messages';
import { createBasicRAGChain } from './basic-rag';
import { MultiQueryRAG } from './multi-query-rag';
import { createCorrectiveRAG } from './corrective-rag';

interface AdaptiveRAGState {
  query: string;
  complexity: 'simple' | 'medium' | 'complex';
  answer: string;
  confidence: number;
}

export function createAdaptiveRAG() {
  const classifier = new ChatGoogleGenerativeAI({
    modelName: 'gemini-2.5-flash',
    temperature: 0,
  });

  const workflow = new StateGraph<AdaptiveRAGState>({
    channels: {
      query: null,
      complexity: null,
      answer: null,
      confidence: null,
    },
  });

  // 节点:分类查询复杂度
  workflow.addNode('classify', async (state) => {
    const prompt = `
      分类此查询的复杂度:
      "${state.query}"

      Simple:事实性、单跳问题
      Medium:需要综合的多方面问题
      Complex:需要推理、验证或多个来源的问题

      仅回复:simple、medium 或 complex
    `;

    const response = await classifier.invoke([
      new HumanMessage(prompt),
    ]);

    const complexity = (response.content as string).trim().toLowerCase() as
      'simple' | 'medium' | 'complex';

    return { complexity };
  });

  // 节点:简单 RAG
  workflow.addNode('simple_rag', async (state) => {
    if (state.complexity !== 'simple') return {};

    const chain = await createBasicRAGChain();
    const answer = await chain.invoke({ question: state.query });

    return { answer, confidence: 0.9 };
  });

  // 节点:多查询 RAG
  workflow.addNode('multi_query_rag', async (state) => {
    if (state.complexity !== 'medium') return {};

    const multiRAG = new MultiQueryRAG();
    const answer = await multiRAG.answer(state.query);

    return { answer, confidence: 0.8 };
  });

  // 节点:纠正性 RAG
  workflow.addNode('corrective_rag', async (state) => {
    if (state.complexity !== 'complex') return {};

    const crag = createCorrectiveRAG();
    const result = await crag.invoke({
      question: state.query,
      documents: [],
      relevanceScores: [],
      finalAnswer: '',
      needsWebSearch: false,
      webResults: [],
    });

    return {
      answer: result.finalAnswer,
      confidence: 0.7
    };
  });

  // 基于复杂度的条件路由
  workflow.setEntryPoint('classify');

  workflow.addConditionalEdges('classify', (state) => {
    switch (state.complexity) {
      case 'simple':
        return 'simple_rag';
      case 'medium':
        return 'multi_query_rag';
      case 'complex':
        return 'corrective_rag';
      default:
        return 'simple_rag';
    }
  });

  workflow.addEdge('simple_rag', END);
  workflow.addEdge('multi_query_rag', END);
  workflow.addEdge('corrective_rag', END);

  return workflow.compile();
}

基于复杂度分类将查询路由到适当的 RAG 策略,优化速度和准确性。

4. 带进度更新的流式 RAG API

// app/api/rag/adaptive/route.ts
import { createAdaptiveRAG } from '@/lib/rag/adaptive-rag';

export const runtime = 'nodejs';
export const maxDuration = 300;

export async function POST(req: Request) {
  const { query } = await req.json();

  const encoder = new TextEncoder();
  const stream = new TransformStream();
  const writer = stream.writable.getWriter();

  const workflow = createAdaptiveRAG();

  (async () => {
    try {
      // 发送进度事件
      await writer.write(
        encoder.encode(`data: ${JSON.stringify({
          type: 'status',
          message: '分析查询复杂度...'
        })}\n\n`)
      );

      const events = await workflow.stream({
        query,
        complexity: 'simple',
        answer: '',
        confidence: 0,
      });

      for await (const event of events) {
        // 发送中间更新
        if (event.complexity) {
          await writer.write(
            encoder.encode(`data: ${JSON.stringify({
              type: 'complexity',
              complexity: event.complexity,
              message: `使用 ${event.complexity} RAG 策略`
            })}\n\n`)
          );
        }

        if (event.answer) {
          await writer.write(
            encoder.encode(`data: ${JSON.stringify({
              type: 'answer',
              content: event.answer,
              confidence: event.confidence
            })}\n\n`)
          );
        }
      }

      await writer.write(
        encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
      );
    } catch (error) {
      await writer.write(
        encoder.encode(`data: ${JSON.stringify({
          type: 'error',
          error: String(error)
        })}\n\n`)
      );
    } finally {
      await writer.close();
    }
  })();

  return new Response(stream.readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}

流式传输 RAG 执行进度,包括复杂度分析、策略选择和带置信度分数的最终答案。

5. 自适应 RAG 的 React 组件

// components/AdaptiveRAGInterface.tsx
'use client';

import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';
import { groupBy } from 'es-toolkit';

interface RAGEvent {
  type: 'status' | 'complexity' | 'answer' | 'error' | 'done';
  message?: string;
  complexity?: string;
  content?: string;
  confidence?: number;
  error?: string;
}

export default function AdaptiveRAGInterface() {
  const [query, setQuery] = useState('');
  const [events, setEvents] = useState<RAGEvent[]>([]);
  const [answer, setAnswer] = useState('');

  const ragMutation = useMutation({
    mutationFn: async (userQuery: string) => {
      setEvents([]);
      setAnswer('');

      const response = await fetch('/api/rag/adaptive', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ query: userQuery }),
      });

      if (!response.ok) throw new Error('RAG 失败');

      const reader = response.body?.getReader();
      const decoder = new TextDecoder();

      while (reader) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            try {
              const event = JSON.parse(line.slice(6)) as RAGEvent;
              setEvents(prev => [...prev, event]);

              if (event.type === 'answer') {
                setAnswer(event.content || '');
              }
            } catch (e) {
              // 忽略解析错误
            }
          }
        }
      }
    },
  });

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (query.trim()) {
      ragMutation.mutate(query);
    }
  };

  // 按类型分组事件以供显示
  const eventGroups = groupBy(events, event => event.type);

  return (
    <div className="w-full max-w-4xl mx-auto">
      <div className="card bg-base-100 shadow-xl">
        <div className="card-body">
          <h2 className="card-title">自适应 RAG 系统</h2>

          <form onSubmit={handleSubmit} className="space-y-4">
            <div className="form-control">
              <label className="label">
                <span className="label-text">您的问题</span>
              </label>
              <textarea
                className="textarea textarea-bordered h-24"
                placeholder="提出问题..."
                value={query}
                onChange={(e) => setQuery(e.target.value)}
                disabled={ragMutation.isPending}
              />
            </div>

            <button
              type="submit"
              className="btn btn-primary"
              disabled={ragMutation.isPending || !query.trim()}
            >
              {ragMutation.isPending ? (
                <>
                  <span className="loading loading-spinner"></span>
                  处理中...
                </>
              ) : '获取答案'}
            </button>
          </form>

          {/* 进度指示器 */}
          {events.length > 0 && (
            <div className="mt-6 space-y-4">
              {eventGroups.complexity && (
                <div className="alert alert-info">
                  <span>
                    查询复杂度:
                    <span className="badge badge-primary ml-2">
                      {eventGroups.complexity[0].complexity}
                    </span>
                  </span>
                </div>
              )}

              {eventGroups.status && (
                <div className="mockup-code">
                  {eventGroups.status.map((event, idx) => (
                    <pre key={idx} data-prefix={`${idx + 1}`}>
                      <code>{event.message}</code>
                    </pre>
                  ))}
                </div>
              )}
            </div>
          )}

          {/* 答案显示 */}
          {answer && (
            <div className="mt-6">
              <div className="divider">答案</div>
              <div className="prose max-w-none">
                <div className="p-4 bg-base-200 rounded-lg">
                  {answer}
                </div>
                {events.find(e => e.confidence) && (
                  <div className="mt-2">
                    <progress
                      className="progress progress-success w-full"
                      value={events.find(e => e.confidence)?.confidence || 0}
                      max="1"
                    />
                    <p className="text-sm text-center mt-1">
                      置信度:{((events.find(e => e.confidence)?.confidence || 0) * 100).toFixed(0)}%
                    </p>
                  </div>
                )}
              </div>
            </div>
          )}

          {ragMutation.isError && (
            <div className="alert alert-error mt-4">
              <span>错误:{ragMutation.error?.message}</span>
            </div>
          )}
        </div>
      </div>
    </div>
  );
}

显示 RAG 执行进度、复杂度分类和带置信度分数答案的交互式 UI 组件。

结论

这个实现展示了从基本 RAG 到复杂智能体模式的演变,这些模式能够智能路由查询、使用网络搜索回退进行自我纠正,并根据复杂度调整策略。Vercel 上的无服务器架构确保了成本效益的扩展,而 LangGraph 的状态机在777秒限制内实现了复杂的工作流。关键模式包括用于自我纠正的 CRAG、用于全面检索的多查询以及用于最佳性能的自适应路由。在整个过程中使用 es-toolkit 确保了干净、函数式的代码模式,而流式响应即使对于复杂查询也能提供出色的用户体验。