草案 智能体设计模式 - 反思

ailangchainlanggraphtypescriptreflectionagents
By sko X opus 4.19/20/202513 min read

了解如何使用 LangChain、LangGraph 和 TypeScript 在 Vercel 无服务器平台上实现自我改进的 AI 智能体。

心智模型:代码审查类比

反思模式可以像拉取请求审查流程一样思考。当你提交代码时,审查者(批评者智能体)会检查它,提供反馈,而你(生产者智能体)会根据该反馈进行修改。这个循环会持续到代码满足质量标准或达到合并截止日期。在 AI 智能体中,同样的原理通过结构化的反馈循环实现迭代式自我改进。就像代码审查能够发现错误并提高质量一样,反思模式帮助智能体识别和纠正自己的错误,从而产生更准确可靠的输出。

基本示例:自我反思智能体

1. 创建反思状态图

// lib/agents/reflection-basic.ts
import { StateGraph, END, START, Annotation } from "@langchain/langgraph";
import { BaseMessage, HumanMessage, AIMessage } from "@langchain/core/messages";
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { ChatPromptTemplate, MessagesPlaceholder } from "@langchain/core/prompts";
import { take } from "es-toolkit";

const ReflectionState = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    reducer: (x, y) => x.concat(y),
  }),
  reflectionCount: Annotation<number>({
    reducer: (x, y) => y ?? x,
    default: () => 0,
  }),
});

const model = new ChatGoogleGenerativeAI({
  modelName: "gemini-2.5-flash",
  temperature: 0.7,
});

const generatePrompt = ChatPromptTemplate.fromMessages([
  ["system", "You are an expert essay writer. Generate a response to the user's request."],
  new MessagesPlaceholder("messages"),
]);

const reflectPrompt = ChatPromptTemplate.fromMessages([
  ["system", `You are a writing critic. Review the essay and provide specific, actionable feedback.
   If the essay is excellent, respond with only "APPROVED".
   Otherwise, list 2-3 specific improvements needed.`],
  new MessagesPlaceholder("messages"),
]);

创建带有消息历史和反思计数器的基本状态结构。状态跟踪对话和已发生的反思循环次数。

2. 实现生成和反思节点

// lib/agents/reflection-basic.ts (续)
async function generateNode(state: typeof ReflectionState.State) {
  const chain = generatePrompt.pipe(model);
  const response = await chain.invoke({
    messages: state.messages
  });

  return {
    messages: [response],
  };
}

async function reflectNode(state: typeof ReflectionState.State) {
  const chain = reflectPrompt.pipe(model);
  const lastMessages = take(state.messages, -2); // 获取最后的用户消息和 AI 响应

  const critique = await chain.invoke({
    messages: lastMessages
  });

  return {
    messages: [new HumanMessage(`Feedback: ${critique.content}`)],
    reflectionCount: state.reflectionCount + 1,
  };
}

function shouldContinue(state: typeof ReflectionState.State) {
  const lastMessage = state.messages[state.messages.length - 1];

  // 如果已批准或达到最大反思次数则停止
  if (lastMessage.content?.toString().includes("APPROVED") ||
      state.reflectionCount >= 3) {
    return END;
  }

  return "reflect";
}

生成节点创建初始内容,反思节点对其进行批评。shouldContinue 函数根据质量批准或迭代限制实现停止逻辑。

3. 构建工作流图

// lib/agents/reflection-basic.ts (续)
export function createReflectionAgent() {
  const workflow = new StateGraph(ReflectionState)
    .addNode("generate", generateNode)
    .addNode("reflect", reflectNode)
    .addEdge(START, "generate")
    .addConditionalEdges("generate", shouldContinue, {
      reflect: "reflect",
      [END]: END,
    })
    .addEdge("reflect", "generate");

  return workflow.compile();
}

使用控制反思循环流程的条件边组装工作流。

4. 创建 API 路由

// app/api/reflection/route.ts
import { createReflectionAgent } from "@/lib/agents/reflection-basic";
import { HumanMessage } from "@langchain/core/messages";
import { NextResponse } from "next/server";

export const runtime = "nodejs";
export const maxDuration = 60;

export async function POST(req: Request) {
  try {
    const { prompt } = await req.json();
    const agent = createReflectionAgent();

    const result = await agent.invoke({
      messages: [new HumanMessage(prompt)],
      reflectionCount: 0,
    });

    // 提取最终精炼的输出
    const finalOutput = result.messages
      .filter((m: any) => m._getType() === "ai")
      .pop()?.content;

    return NextResponse.json({
      output: finalOutput,
      iterations: result.reflectionCount,
      messages: result.messages.map((m: any) => ({
        type: m._getType(),
        content: m.content,
      })),
    });
  } catch (error) {
    console.error("Reflection error:", error);
    return NextResponse.json(
      { error: "Reflection process failed" },
      { status: 500 }
    );
  }
}

处理 HTTP 请求并使用适当的错误处理管理反思智能体执行。

5. 构建前端组件

// components/ReflectionDemo.tsx
"use client";

import { useState } from "react";
import { useMutation } from "@tanstack/react-query";
import { groupBy } from "es-toolkit";

interface ReflectionResult {
  output: string;
  iterations: number;
  messages: Array<{ type: string; content: string }>;
}

export default function ReflectionDemo() {
  const [prompt, setPrompt] = useState("");
  const [showProcess, setShowProcess] = useState(false);

  const reflection = useMutation({
    mutationFn: async (userPrompt: string) => {
      const res = await fetch("/api/reflection", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ prompt: userPrompt }),
      });

      if (!res.ok) throw new Error("Reflection failed");
      return res.json() as Promise<ReflectionResult>;
    },
  });

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (prompt.trim()) {
      reflection.mutate(prompt);
    }
  };

  // 按迭代分组消息以供显示
  const messagesByIteration = reflection.data?.messages
    ? groupBy(reflection.data.messages, (_, index) =>
        Math.floor(index / 2).toString()
      )
    : {};

  return (
    <div className="card w-full bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">反思智能体演示</h2>

        <form onSubmit={handleSubmit}>
          <textarea
            className="textarea textarea-bordered w-full"
            placeholder="输入您的写作提示..."
            value={prompt}
            onChange={(e) => setPrompt(e.target.value)}
            rows={3}
            disabled={reflection.isPending}
          />

          <div className="card-actions justify-between mt-4">
            <label className="label cursor-pointer">
              <span className="label-text mr-2">显示过程</span>
              <input
                type="checkbox"
                className="checkbox"
                checked={showProcess}
                onChange={(e) => setShowProcess(e.target.checked)}
              />
            </label>

            <button
              type="submit"
              className="btn btn-primary"
              disabled={reflection.isPending || !prompt.trim()}
            >
              {reflection.isPending ? (
                <>
                  <span className="loading loading-spinner"></span>
                  反思中...
                </>
              ) : "生成"}
            </button>
          </div>
        </form>

        {reflection.data && (
          <div className="mt-6 space-y-4">
            <div className="stats shadow">
              <div className="stat">
                <div className="stat-title">反思迭代次数</div>
                <div className="stat-value">{reflection.data.iterations}</div>
              </div>
            </div>

            {showProcess && (
              <div className="space-y-4">
                {Object.entries(messagesByIteration).map(([iter, msgs]) => (
                  <div key={iter} className="collapse collapse-arrow bg-base-200">
                    <input type="checkbox" />
                    <div className="collapse-title font-medium">
                      迭代 {parseInt(iter) + 1}
                    </div>
                    <div className="collapse-content">
                      {msgs.map((msg, idx) => (
                        <div key={idx} className={`chat chat-${msg.type === "ai" ? "end" : "start"}`}>
                          <div className={`chat-bubble ${msg.type === "human" ? "chat-bubble-primary" : ""}`}>
                            {msg.content}
                          </div>
                        </div>
                      ))}
                    </div>
                  </div>
                ))}
              </div>
            )}

            <div className="divider">最终输出</div>
            <div className="prose max-w-none">
              {reflection.data.output}
            </div>
          </div>
        )}
      </div>
    </div>
  );
}

提供具有可折叠迭代视图的交互式 UI 来演示反思过程。

高级示例:带流式传输的生产者-批评者架构

1. 定义生产者和批评者智能体

// lib/agents/producer-critic.ts
import { StateGraph, END, START, Annotation } from "@langchain/langgraph";
import { BaseMessage, HumanMessage, AIMessage } from "@langchain/core/messages";
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { z } from "zod";
import { minBy, maxBy } from "es-toolkit";
import { StructuredOutputParser } from "@langchain/core/output_parsers";

const CritiqueSchema = z.object({
  score: z.number().min(0).max(100),
  approved: z.boolean(),
  issues: z.array(z.object({
    category: z.enum(["accuracy", "clarity", "completeness", "style"]),
    description: z.string(),
    severity: z.enum(["minor", "major", "critical"]),
  })),
  suggestions: z.array(z.string()),
});

const ProducerCriticState = Annotation.Root({
  task: Annotation<string>(),
  drafts: Annotation<string[]>({
    reducer: (x, y) => x.concat(y),
    default: () => [],
  }),
  critiques: Annotation<typeof CritiqueSchema._type[]>({
    reducer: (x, y) => x.concat(y),
    default: () => [],
  }),
  iteration: Annotation<number>({
    reducer: (_, y) => y,
    default: () => 0,
  }),
});

const producer = new ChatGoogleGenerativeAI({
  modelName: "gemini-2.5-pro",
  temperature: 0.7,
  maxOutputTokens: 2048,
});

const critic = new ChatGoogleGenerativeAI({
  modelName: "gemini-2.5-flash",
  temperature: 0.3,
});

为生产者和批评者角色定义单独的模型,使用结构化的批评输出模式。

2. 实现带上下文的生产者节点

// lib/agents/producer-critic.ts (续)
async function producerNode(state: typeof ProducerCriticState.State) {
  const lastCritique = state.critiques[state.critiques.length - 1];

  let prompt = `任务:${state.task}`;

  if (lastCritique) {
    const criticalIssues = lastCritique.issues
      .filter(i => i.severity === "critical")
      .map(i => `- ${i.description}`)
      .join("\n");

    prompt += `\n\n之前的草稿收到了反馈。需要解决的关键问题:\n${criticalIssues}`;
    prompt += `\n\n改进建议:\n${lastCritique.suggestions.join("\n")}`;
    prompt += `\n\n生成一个解决所有反馈的改进版本。`;
  } else {
    prompt += "\n\n生成高质量的响应。";
  }

  const response = await producer.invoke(prompt);

  return {
    drafts: [response.content as string],
    iteration: state.iteration + 1,
  };
}

生产者节点结合之前的批评反馈来生成改进的草稿。

3. 实现带结构化输出的批评者节点

// lib/agents/producer-critic.ts (续)
async function criticNode(state: typeof ProducerCriticState.State) {
  const latestDraft = state.drafts[state.drafts.length - 1];
  const parser = StructuredOutputParser.fromZodSchema(CritiqueSchema);

  const prompt = `你是一位专业批评者。评估这个针对任务"${state.task}"的响应

要评估的响应:
${latestDraft}

按照这个 JSON 模式提供详细的批评:
${parser.getFormatInstructions()}

90 分以上表示响应优秀且被批准。
反馈应该彻底但具有建设性。`;

  const response = await critic.invoke(prompt);
  const critique = await parser.parse(response.content as string);

  return {
    critiques: [critique],
  };
}

批评者提供包含分数、问题分类和改进建议的结构化反馈。

4. 高级路由逻辑

// lib/agents/producer-critic.ts (续)
function routingLogic(state: typeof ProducerCriticState.State) {
  const lastCritique = state.critiques[state.critiques.length - 1];

  // 早期终止条件
  if (!lastCritique) return "critic";

  if (lastCritique.approved || state.iteration >= 5) {
    return END;
  }

  // 基于批评严重性的自适应路由
  const criticalCount = lastCritique.issues.filter(i => i.severity === "critical").length;

  if (criticalCount > 2 && state.iteration < 3) {
    // 需要重大重写
    return "producer";
  } else if (lastCritique.score > 75) {
    // 仅需要小的改进
    return "producer";
  } else {
    // 标准迭代
    return "producer";
  }
}

export function createProducerCriticAgent() {
  const workflow = new StateGraph(ProducerCriticState)
    .addNode("producer", producerNode)
    .addNode("critic", criticNode)
    .addEdge(START, "producer")
    .addEdge("producer", "critic")
    .addConditionalEdges("critic", routingLogic, {
      producer: "producer",
      [END]: END,
    });

  return workflow.compile();
}

基于批评严重性和迭代计数实现复杂的路由。

5. 使用服务器发送事件的流式 API

// app/api/producer-critic/route.ts
import { createProducerCriticAgent } from "@/lib/agents/producer-critic";
import { debounce } from "es-toolkit";

export const runtime = "nodejs";
export const maxDuration = 300;

export async function POST(req: Request) {
  const { task } = await req.json();

  const encoder = new TextEncoder();
  const stream = new TransformStream();
  const writer = stream.writable.getWriter();

  // 防抖写入以避免压垮客户端
  const debouncedWrite = debounce(async (data: any) => {
    await writer.write(
      encoder.encode(`data: ${JSON.stringify(data)}\n\n`)
    );
  }, 100);

  const agent = createProducerCriticAgent();

  (async () => {
    try {
      const eventStream = await agent.streamEvents(
        { task, drafts: [], critiques: [], iteration: 0 },
        { version: "v2" }
      );

      for await (const event of eventStream) {
        if (event.event === "on_chain_end" && event.name === "producer") {
          await debouncedWrite({
            type: "draft",
            iteration: event.data.output.iteration,
            content: event.data.output.drafts[event.data.output.drafts.length - 1],
          });
        }

        if (event.event === "on_chain_end" && event.name === "critic") {
          const critique = event.data.output.critiques[event.data.output.critiques.length - 1];
          await debouncedWrite({
            type: "critique",
            iteration: event.data.output.iteration,
            score: critique.score,
            approved: critique.approved,
            issues: critique.issues,
          });
        }
      }

      await writer.write(encoder.encode(`data: ${JSON.stringify({ type: "complete" })}\n\n`));
    } catch (error) {
      await writer.write(
        encoder.encode(`data: ${JSON.stringify({ type: "error", error: String(error) })}\n\n`)
      );
    } finally {
      await writer.close();
    }
  })();

  return new Response(stream.readable, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      "Connection": "keep-alive",
    },
  });
}

使用服务器发送事件实时流式传输反思事件,用于渐进式 UI 更新。

6. 带实时可视化的高级前端

// components/ProducerCriticDemo.tsx
"use client";

import { useState, useCallback } from "react";
import { useMutation } from "@tanstack/react-query";
import { partition, groupBy } from "es-toolkit";

interface StreamEvent {
  type: "draft" | "critique" | "complete" | "error";
  iteration?: number;
  content?: string;
  score?: number;
  approved?: boolean;
  issues?: Array<{
    category: string;
    description: string;
    severity: string;
  }>;
  error?: string;
}

export default function ProducerCriticDemo() {
  const [task, setTask] = useState("");
  const [events, setEvents] = useState<StreamEvent[]>([]);
  const [isStreaming, setIsStreaming] = useState(false);

  const startReflection = useCallback(async () => {
    setEvents([]);
    setIsStreaming(true);

    try {
      const response = await fetch("/api/producer-critic", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ task }),
      });

      const reader = response.body?.getReader();
      const decoder = new TextDecoder();

      while (reader) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split("\n");

        for (const line of lines) {
          if (line.startsWith("data: ")) {
            try {
              const event = JSON.parse(line.slice(6));
              setEvents(prev => [...prev, event]);

              if (event.type === "complete" || event.type === "error") {
                setIsStreaming(false);
              }
            } catch {}
          }
        }
      }
    } catch (error) {
      console.error("Stream error:", error);
      setIsStreaming(false);
    }
  }, [task]);

  // 按迭代分组事件
  const [drafts, critiques] = partition(
    events.filter(e => e.type === "draft" || e.type === "critique"),
    e => e.type === "draft"
  );

  const iterations = groupBy(
    [...drafts, ...critiques],
    e => e.iteration?.toString() || "0"
  );

  const finalDraft = drafts[drafts.length - 1];
  const finalCritique = critiques[critiques.length - 1];

  return (
    <div className="container mx-auto p-4">
      <div className="card bg-base-100 shadow-xl">
        <div className="card-body">
          <h2 className="card-title">生产者-批评者反思系统</h2>

          <div className="form-control">
            <textarea
              className="textarea textarea-bordered"
              placeholder="描述您的任务..."
              value={task}
              onChange={(e) => setTask(e.target.value)}
              rows={3}
              disabled={isStreaming}
            />
          </div>

          <div className="card-actions justify-end mt-4">
            <button
              className="btn btn-primary"
              onClick={startReflection}
              disabled={isStreaming || !task.trim()}
            >
              {isStreaming ? (
                <>
                  <span className="loading loading-spinner"></span>
                  处理中...
                </>
              ) : "开始反思"}
            </button>
          </div>
        </div>
      </div>

      {events.length > 0 && (
        <div className="grid grid-cols-1 lg:grid-cols-2 gap-4 mt-6">
          {/* 迭代时间线 */}
          <div className="card bg-base-100 shadow">
            <div className="card-body">
              <h3 className="card-title text-lg">反思过程</h3>

              <ul className="timeline timeline-vertical">
                {Object.entries(iterations).map(([iter, iterEvents]) => {
                  const draft = iterEvents.find(e => e.type === "draft");
                  const critique = iterEvents.find(e => e.type === "critique");

                  return (
                    <li key={iter}>
                      <div className="timeline-middle">
                        <svg
                          xmlns="http://www.w3.org/2000/svg"
                          viewBox="0 0 20 20"
                          fill="currentColor"
                          className="h-5 w-5"
                        >
                          <path
                            fillRule="evenodd"
                            d="M10 18a8 8 0 100-16 8 8 0 000 16zm3.857-9.809a.75.75 0 00-1.214-.882l-3.483 4.79-1.88-1.88a.75.75 0 10-1.06 1.061l2.5 2.5a.75.75 0 001.137-.089l4-5.5z"
                            clipRule="evenodd"
                          />
                        </svg>
                      </div>
                      <div className="timeline-end timeline-box">
                        <div className="text-lg font-black">迭代 {iter}</div>
                        {critique && (
                          <div className="stats stats-horizontal shadow mt-2">
                            <div className="stat">
                              <div className="stat-title">分数</div>
                              <div className="stat-value text-2xl">{critique.score}</div>
                            </div>
                            <div className="stat">
                              <div className="stat-title">状态</div>
                              <div className={`stat-value text-2xl ${critique.approved ? "text-success" : "text-warning"}`}>
                                {critique.approved ? "✓" : "↻"}
                              </div>
                            </div>
                          </div>
                        )}
                        {critique?.issues && (
                          <div className="mt-2">
                            <p className="font-semibold">发现的问题:</p>
                            {critique.issues.map((issue, idx) => (
                              <div key={idx} className={`badge badge-${issue.severity === "critical" ? "error" : "warning"} gap-2 mr-1`}>
                                {issue.category}
                              </div>
                            ))}
                          </div>
                        )}
                      </div>
                      <hr />
                    </li>
                  );
                })}
              </ul>
            </div>
          </div>

          {/* 最终输出 */}
          <div className="card bg-base-100 shadow">
            <div className="card-body">
              <h3 className="card-title text-lg">最终输出</h3>

              {finalCritique?.approved && (
                <div className="alert alert-success">
                  <span>输出已批准,分数:{finalCritique.score}/100</span>
                </div>
              )}

              {finalDraft && (
                <div className="prose max-w-none">
                  <div className="mockup-code">
                    <pre><code>{finalDraft.content}</code></pre>
                  </div>
                </div>
              )}

              {isStreaming && (
                <div className="flex justify-center">
                  <span className="loading loading-dots loading-lg"></span>
                </div>
              )}
            </div>
          </div>
        </div>
      )}
    </div>
  );
}

创建具有时间线可视化和实时流式更新的复杂 UI。

7. 通过缓存进行性能优化

// lib/cache/reflection-cache.ts
import { kv } from "@vercel/kv";
import { hash } from "es-toolkit/compat";

interface CacheEntry {
  task: string;
  output: string;
  score: number;
  timestamp: number;
}

export class ReflectionCache {
  private readonly ttl = 3600; // 1 小时

  async get(task: string): Promise<CacheEntry | null> {
    const key = `reflection:${hash(task)}`;
    const cached = await kv.get<CacheEntry>(key);

    if (cached && Date.now() - cached.timestamp < this.ttl * 1000) {
      return cached;
    }

    return null;
  }

  async set(task: string, output: string, score: number): Promise<void> {
    const key = `reflection:${hash(task)}`;
    const entry: CacheEntry = {
      task,
      output,
      score,
      timestamp: Date.now(),
    };

    await kv.set(key, entry, { ex: this.ttl });
  }

  async getSimilar(task: string, threshold = 0.8): Promise<CacheEntry[]> {
    // 实现语义相似性搜索
    const allKeys = await kv.keys("reflection:*");
    const similar: CacheEntry[] = [];

    for (const key of allKeys) {
      const entry = await kv.get<CacheEntry>(key);
      if (entry) {
        // 简单的相似性检查(实现适当的语义相似性)
        const similarity = this.calculateSimilarity(task, entry.task);
        if (similarity > threshold) {
          similar.push(entry);
        }
      }
    }

    return similar;
  }

  private calculateSimilarity(a: string, b: string): number {
    // 简化的相似性计算
    const wordsA = new Set(a.toLowerCase().split(" "));
    const wordsB = new Set(b.toLowerCase().split(" "));
    const intersection = new Set([...wordsA].filter(x => wordsB.has(x)));
    const union = new Set([...wordsA, ...wordsB]);

    return intersection.size / union.size;
  }
}

实现缓存以减少类似任务的冗余反思循环。

8. 成本跟踪和优化

// lib/monitoring/cost-tracker.ts
interface ReflectionMetrics {
  totalTokens: number;
  inputTokens: number;
  outputTokens: number;
  iterations: number;
  duration: number;
  estimatedCost: number;
}

export class CostTracker {
  private metrics: ReflectionMetrics = {
    totalTokens: 0,
    inputTokens: 0,
    outputTokens: 0,
    iterations: 0,
    duration: 0,
    estimatedCost: 0,
  };

  private readonly costPerToken = {
    "gemini-2.5-pro": { input: 0.00125, output: 0.005 },
    "gemini-2.5-flash": { input: 0.00015, output: 0.0006 },
  };

  trackIteration(model: string, inputTokens: number, outputTokens: number): void {
    this.metrics.inputTokens += inputTokens;
    this.metrics.outputTokens += outputTokens;
    this.metrics.totalTokens += inputTokens + outputTokens;
    this.metrics.iterations += 1;

    const modelCost = this.costPerToken[model as keyof typeof this.costPerToken];
    if (modelCost) {
      this.metrics.estimatedCost +=
        (inputTokens * modelCost.input + outputTokens * modelCost.output) / 1000;
    }
  }

  shouldContinue(maxCost: number = 0.10): boolean {
    return this.metrics.estimatedCost < maxCost;
  }

  getMetrics(): ReflectionMetrics {
    return { ...this.metrics };
  }
}

跟踪令牌使用和成本以实现预算感知的反思循环。

结论

反思模式将 AI 智能体从单次响应系统转变为能够自我改进的迭代学习者。通过实现具有适当状态管理、流式传输功能和成本优化的生产者-批评者架构,您可以在 Vercel 的无服务器平台上部署复杂的反思系统。关键是通过智能缓存、提前停止和自适应路由策略来平衡质量改进与计算成本。从简单任务的基本自我反思开始,然后扩展到需要更高质量输出的复杂场景的多智能体生产者-批评者系统。